ExecutorService, Callable and Future
Get startedজয় শ্রী রাম
🕉Note: This chapter is written keeping in mind all the struggles a beginner programmer has to go through while writing bug-free efficient Multithreaded program. If you are already proficient in writing production ready advanced concurrent program then feel free to skip this chapter.
At the very heart of writing Concurrent programming is the concept of tasks. You identify task boundaries, divide the work you are trying to achieve in your application into different tasks and then spawn threads to execute those tasks concurrently to achieve that amazing throughput you have in your mind and have been eagerly trying to achieve.
But here is where things go awry:
You try to spawn threads and manage their lifecycle yourself, and this is where it gets complicated and Concurrent Programming becomes a nightmare. This distinguishes the spaghetti code written by a newbie and a production ready code written by a PRO. "The Algorists" is all about teaching how to write production grade code at any time no matter what. This article will teach you how to write production grade Concurrent Program in no time, even during interviews in just few minutes. All you need to know is: Don't manage the life cycle of the threads yourself. Use THREAD POOL.
By using thread pool you focus on the tasks that you want the threads to perform, instead of the thread mechanics.
The rest of the chapter will be very much focused on the Thread Pool concept in Java, but the overall high level concept will be similar in most other languages.
Let's see why managing thread yourself may not be a good idea for production use:
- Overhead of thread creation: Thread creation and teardown are not free. They come with a lot of overhead. If you are not reusing the already available but idle threads prudently then, especially in scenarios where the requests or tasks are lightweight and very frequent, creating threads too frequently would consume significant computing resources.
- Resource Consumption: Active threads consume system resources, especially memory. When there are more runnable threads than available processors, threads sit idle. Having many idle threads can tie up a lot of memory, putting pressure on the garbage collector. If you have enough threads to keep all the CPUs busy, creating more threads won't help. Context switching is costly.
- Stability of your application: For a server, there is a limit to how many threads can be created. If you commit the mistake of implementing something like thread-per-task i.e, unbounded thread creation where you create a thread for every request, you risk of hitting this limit and it could lead to huge instability of your deployed application. The most likely result would be OutOfMemoryError. Trying to recover from such an error is very risky; it is far easier to structure your program to avoid hitting the limit.
Thread Pool takes care of all the risks/issues described above.
Java provides its own implementations of the thread pool pattern through objects called executors which is part of Executor framework, among several other Java implementations. The primary abstraction for task execution in the Java class libraries is NOT thread, it is EXECUTOR.
public interface Executor {
void execute(Runnable command);
}
-
Executor decouples task submission from task execution, describing tasks with Runnable.
Executor is based on the concept of producer-consumer pattern where activities that submit tasks are the Producers and the threads that execute tasks are the Consumers .
On a different note, using an Executor is usually the easiest path to implementing a Producer-Consumer design in an application.
Thread Pools:
A thread pool is a pool of worker threads. Every thread pool has a work queue that holds tasks waiting to be executed. Worker threads have a very simple life cycle:- Request the next task from the work queue,
- Execute the task,
- Go back to waiting for next task.
Advantages of executing tasks in Thread Pool over thread-per-task approach:
- Reusing an existing thread instead of creating a new one amortizes thread creation and teardown costs over multiple requests.
- Since worker threads often already exist at the time the requests arrive, the latency associated with thread creation does not delay task execution, thus improving responsiveness.
- By properly tuning the size of the thread pool, you can have enough threads to keep the processors busy while not having so many that your application runs out of memory or thrashes due to competition among threads for resources.
Types of Thread pools in Java:
- newFixedThreadPool: A fixed-size thread pool creates threads as tasks are submitted, up to maximum pool size, and then attempts to keep the pool size constant (adding new threads if a thread dies due to an unexpected exception).
- newCachedThreadPool: A cached thread pool has more flexibility to reap idle threads when the current size of the pool exceeds the demand for processing, and to add new threads when demand increases, but places no bounds on the size of the pool.
- newSingleThreadExecutor: A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly. Tasks are guaranteed to be processed sequentially according to the order imposed by the task queue (FIFO, LIFO, priority order).
- newScheduledThreadPool: A fixed-size thread pool that supports delayed and periodic task execution, similar to Timer.
Execution Policies:
The value of decoupling submission of request or task from actual task execution is that it lets you easily specify, and subsequently change without great difficulty, the execution policy for a given class of tasks. An execution policy specifies the "what, where, when, and how" of task execution, including:
- In what thread will tasks be executed?
- In what order should tasks be executed (FIFO, LIFO, priority order)?
- How many tasks may execute concurrently?
- How many tasks may be queued pending execution?
- If a task has to be rejected because the system is overloaded, which task should be selected as the victim, and how should the application be notified?
- What actions should be taken before or after executing a task?
Whenever you see code of the form:
new Thread(runnable).start()
and you think you might at some point want a more flexible execution policy, seriously consider replacing it with the use of an Executor.
Drawbacks of Executor and need for ExecutorService:
We have seen before how the Executor interface looks like, it contains only overridable method t execute() and that's it. It does not contain any method that lets you manage its lifecycle like terminating or shutting down the Executor.JVM (Java Virtual Machine) can't exit until all the nondaemon threads have terminated, so failing to shut down an Executor could prevent the JVM from exiting.
Executors are nondaemon in nature unless you explicitly change it for whatever reason you might have. You might need to change an executor from nondaemon to daemon if your runtime environment does not let you shutdown executor, then you might need to make the executor daemon to let JVM exit. We have shown how to set an executor as daemon in Multithreaded Web Crawler.
Because an Executor processes tasks asynchronously, at any given time the state of previously submitted tasks is not immediately obvious. Some may have completed, some may be currently running, and others may be queued awaiting execution. In shutting down an application, there is a spectrum from GRACEFUL SHUTDOWN (finish what you've started but don't accept any new work) to ABRUPT SHUTDOWN (just like abruptly turning off the power to the machine room), and various points in between. Since Executors provide a service to applications, they should be able to be shut down as well, both GRACEFULLY and ABRUPTLY, and feed back to the application about the status of the tasks that were affected by the shutdown. Unfortunately Executor has none of these must-have features, and that is where ExecutorService comes in, yo address all these issues of execution lifecycle management. ExecutorService extends Executor, adding a number of methods for lifecycle management, as well as some methods for task submission, as shown below:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
.
.
.
// ... convenience methods for task submission
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task)
}
Lifecycle of ExecutorService:
The lifecycle of ExecutorService has three states:
- Running
- Shutting
- Terminated
ExecutorServices are initially created in the running state. The shutdown method initiates a GRACEFUL SHUTDOWN: no new tasks are accepted but previously submitted tasks are allowed to complete, including those that have not yet begun execution.
The shutdownNow method initiates an ABRUPT SHUTDOWN: it attempts to cancel all outstanding tasks and does not start any tasks that are queued but not begun.
Tasks submitted to an ExecutorService after it has been shut down are handled by the rejected execution handler, which might silently discard the task or might cause ExecutorService to throw the unchecked RejectedExecutionException.
Once all the tasks have completed, the ExecutorService transitions to the terminated state.
Result-bearing tasks: Callable and Future:
Recall from the Executor interface, in order to use an Executor, you have to be able to describe your task as a Runnable. But Runnable comes with a number of limitations. Runnable is a fairly limiting abstraction: run cannot return a value or throw checked exceptions, although it can have side effects such as writing to a log file or placing a result in a shared data structure.Callable:
Before going into in-depth discussion on Callable let's look at the Callable interface:
public interface Callable<V> {
V call() throws Exception;
}
Many tasks are effectively deferred computations:
- Executing a database query
- Fetching a resource over the network
- Computing a complicated or time-consuming function, that might have latency associated with it.
Callable encapsulates a TASK, and you can say Callable is similar to Runnable. To express a non-value-returning Task, us Callable<Void>.
The lifecycle of the tasks executed by an Executor has four states:
- Created
- Submitted
- Started
- Completed
Future:
Future represents the lifecycle of a task and provides methods to:
- Test whether the task has completed or been cancelled,
- Retrieve its (the task's) result,
- Cancel the task.
Let's take a look at the Future interface:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}
get() is to get the result of an executed task. The behavior of get() varies depending on the task state (not yet started, running, completed). It returns immediately or throws an exception if the task has already completed, but if not it blocks until the task completes.
If the task completed by throwing an exception, get() rethrows it wrapped in an ExecutionException. If the task was cancelled, get() throws CancellationException.
There are several ways to create a Future to describe a task. All the submit() methods in ExecutorService interface return Future. This way you can submit a Runnable or a Callable to a ExecutorService and get back a Future that can be use to retrieve result of a task or cancel the task.
Template:
In summary, here is how Executor framework, ExecutorService, Callable and Future tie together with each other and work together to achieve efficient concurrency:
// Create appropriate Thread Pool
private final ExecutorService pool = Executor.newCachedThreadPool(); // call 1 of 4 static factory methods in Executor as appropriate:
// fixed / cached / scheduled / single thread
// Define Task (generally tasks have associated latency, that is why tasks should be run in background and let the tasks complete)
Callable<V> task = new Callable<V>() {public V call() { ..define task.. }};
// Submit the task to the thread pool for execution in background
// and get a Future back to get the state of the task at any point of time
Future<V> future = pool.submit(task);
// Get the result of the task to actually be able to utilize the result of the task
try {
V taskResult = future.get();
}
catch (InterruptedException | ExecutionException ex) {
}
Armed with this powerful yet easy-to-understand template, whenever you have to solve a complex concurrent problem, you can just focus on one thing: identifying all things in the given problem which are blocking and time-consuming in nature which you would like to execute asynchronously in the background, because these blocking activities will become your tasks. Once you have identified all the asynchronous tasks, then :
- You just need to get the appropriate thread pool (executor).
- Next, you define the task(s) using Callable, submit the Callable(s) to the executor so that they can be asynchronously executed in the background (while you do other stuff (mostly sequential) which could be done before you need the results of the defines tasks) and get Future(s) back.
- When you need the result(s) of the defined task(s) you just call future.get().
Now let's put our learnings into action:
Let us say we are trying to render a webpage / HTML page consisting only of texts and images. We are trying to improve the responsiveness of the page as much as possible and give the visitors of the webpage as good of an experience as possible.
Images are generally stored in blob storage containers and depending on image size it might take just few milliseconds to several seconds to download the images. Whereas the latency of rendering the texts generally takes way way less time compared to that of images. So we should not wait for images to download, which might take several seconds, to render the texts, which might be almost instantaneous. Instead we should define downloading images as task and submit to thread pool and let it download the images in background asynchronously. We can access the state of the task any time and get the downloaded images from Future by calling get().
So, to make the page rendering more concurrent, we divided the whole page rendering into two tasks:
- one that renders the text (this is largely CPU bound, you already get the character sequence, all you need to do is render the texts)
- one that downloads all the images (this is largely I/O bound: you might need to download the images from cloud based blob storage container)
By doing the above mentioned Concurrency improvement what we achieved is: the page is more responsive now. The page does not freeze until all the images are downloaded and then render the texts and images together once all the images are downloaded.
The above mentioned approach is no where close to the best we can do, but is definitely a good improvement. As we go on publishing more content on Concurrency we would see better ways to achieve Concurrency. As a quick further improvement, instead of creating just one task that downloads all the images we can create more than one tasks (each task task instead of downloading all images will download small number of images parallelly) and put the Futures in a queue as we have done in Multithreaded Web Crawler
public class FutureRenderer {
private final ExecutorService executor = ...; // take appropriate executor like newFixedThreadPool, newCachedThreadPool etc
void renderPage(CharSequence htmlSource) {
final List<ImageInfo> imageInfos = scanForImageInfo(htmlSource);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result
= new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
// render texts while images are still downloading
renderText(htmlSource);
try {
List<ImageData> imageData = future.get(); // it blocks till the time images are downloaded
for (ImageData data : imageData)
renderImage(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Re-assert the thread’s interrupted status
// We don’t need the result, so cancel the task too
future.cancel(true);
} catch (ExecutionException e) {
// when get() throws ExecutionException
// the underlying exception can be retrieved
// by calling getCause()
throw launderThrowable(e.getCause());
}
}
private RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}
Is this the ultimate Concurrency Solution ? Heck NO! But this is a good starting point to writing concurrent program that you'd actually see in Production.
The main advantage of using ExecutorService is that you do not have to worry about threads lifecycle management (specifically when a thread dies unexpectedly, which is not very uncommon) and scheduling.ExecutorService works well for applications which get moderate number of requests. For applications getting hugely large traffic ExecutorService would have the same issues as not using Thread Pool and managing threads ourselves, as described below:
- For newFixedThreadPool we would still have specify the size of the thread pool ourselves and if we put the number of maximum threads way more than the number of processors available, we would have a problem (as mentioned before).
- newCachedThreadPool is Unbounded in nature and suffers from all the same issues as in thread-per-task mechanism: getting OutOfMemoryError if we run out of memory since cached Thread Pool as no limit to how many thread it would create since it would go on creating threads to accommodate new requests if there is no idle thread to reuse, when we have huge number of requests coming.
We would discuss about more improved and advanced concurrency mechanism in later chapters, as we go on adding more content on Concurrency.
Being said that (everything in last couple paragraphs), we can still make it efficiently work for large traffic if we move away from scaling up just one machine, and embrace distributed computing and a scalable architecture where we scale out. We can achieve this in multiple ways:
-
Scale Out with Multiple Instances of the Application and a Load Balancer :
We should provision more than one servers and there should be a load balancer which would route the requests homogeneously to the servers based on the percentage CPU resources being used at the servers. Server with the least CPU resource consumption (like, memory) at a given time gets to serve a request at that time.
-
Auto-Scale and a Queue:
We do not have a load balancer in this architecture. Rather all the requests are queue as they come and the application picks up the requests as they come for processing. The application would auto-scale based on certain criteria, example: provision one more instance whenever the CPU consumption (memory) becomes more than 80%. Similarly, it scales back in when CPU consumption gets less than, say, 40%. In Azure Cloud this can be achieved through Consumption Plan.
newCachedThreadPool would work really well with the above two architectures. Both of the above architecture design would prevent OutOfMemoryError.
To know more about applications of ExecutorService, Callable and Future, don't forget to read the chapter Multithreaded Web Crawler.