The Java Fork/Join Framework

Categories: Java

Overview

This article discusses the “fork/join” framework introduced into the Java standard libraries in version 1.7, and method Collection.parallelStream which performs parallel processing on collections using this fork/join framework internally. This is not a simple “how to use” article; there are plenty of those around already. Instead this article presents the motivations, pros/cons and internal details which are relevant for understanding its performance and limitations (at least as I understand them after a day’s research).

Java version 1.7 introduced the “fork/join” framework into the standard library; see java.util.concurrent.ForkJoinPool and related classes.

Java version 1.8 added methods stream and parallelStream to the interface java.util.Collection which is an ancestor of most of the standard collection-types (eg List). Method stream provides various ways to apply a lambda to the elements of a collection, providing operations well-known to functional programmers including map, flatMap and reduce. Method parallelStream does the same but uses a threadpool to execute the operations in parallel where possible, via the fork/join framework.

Class ForkJoinPool implements the standard ExecutorService interface, and so for some types of workload can be used as a replacement for other kinds of threadpool such as that returned by ExecutorService.newFixedThreadPool. In fact, ExecutorService.newWorkStealingPool creates a ForkJoinPool (returning it as interface ExecutorService). However in most cases a ForkJoinPool will be created directly in order to have access to methods not on the standard ExecutorService interface such as fork and join.

The javadoc for classes ForkJoinPool and ForkJoinTask is verbose but nevertheless was never entirely clear to me, and it was only when I read the original academic paper on fork/join that the whole thing started to make sense. The Fork Join Calamity article also motivated me to research the topic more thoroughly as it is a very detailed, though not very clearly written, critique that I initially struggled to understand. The calamity article is discussed further below.

Work Stealing

A standard ExecutorService which wraps a dynamic-size threadpool has a single queue of tasks to be executed, and a set of threads with which to execute them. When a task is added to the queue and an already-existing thread is idle, then that idle thread executes the task. When no thread is idle, then a new thread is created and that thread executes the task.

A standard ExecutorService which wraps a fixed-size threadpool has a single queue of tasks to be executed, and a set of threads with which to execute them. When a task is added to the queue and an existing thread is idle, then that idle thread executes the task. When no thread is idle, the task is added to a work-queue. When a thread has nothing to do (is idle), it takes the oldest (head) task from the work-queue and executes it.

A “work stealing” ExecutorService is a kind of fixed-size threadpool in which there is a work-queue for each thread. Method ForkJoinPool.submit effectively places the task on a randomly-selected work-queue (though it always uses the same target queue for a specific calling thread). When a thread finishes its previous task (ie becomes idle) then it takes the oldest (head) item from its local work-queue. If its local work-queue is empty then it tries to take the newest (tail) item from some other thread’s workqueue. The general idea is that this approach minimises locking contention between the threads in a threadpool; for long-running tasks this is minimal but where a workqueue contains a large number of quickly-executed tasks the benefit can be significant.

A work-stealing executor service can be created via Executors.newWorkStealingThread. However that method isn’t often used; if there are not huge volumes of tasks being processed, or they are relatively long-running tasks (eg might block), then normal thread-pools work just as well. And if there is a need to process large volumes of quickly-executed (and non-blocking) tasks, then a ForkJoinPool should be created directly in order to be able to use its extended APIs. Internally, work-stealing is still being used as the fork/join pattern generally generates large numbers of quick-running tasks which is exactly where the low-lock-contention behaviour of the work-stealing approach works best.

Note that it isn’t forbidden to perform long-running or blocking operations in a task submitted to a “work-stealing” ExecutorService. However given that it is a fixed-size pool of threads, too many blocking tasks will significantly slow the system. In general it is better for such workloads to be run in a dynamic-size threadpool so that tasks can still be processed while other tasks are blocking their threadpool-threads.

A threadpool running a large number of non-blocking tasks is CPU-bound, and maximum throughput is reached with a threadpool-size which matches the number of physical CPUs on the local system. Adding more processing threads to a CPU-bound system only slows it further, due to pointless context-switching. Therefore “work stealing” ExecutorService instances are fixed-size threadpools and default to 1 thread per physical CPU on the local system.

Fork/Join Motivation

Native threads are heavy-weight objects, slow to start, and take lots of memory, and are slow to “schedule”.

Sometimes we just need a thread and don’t care about the performance. In particular, when processing user input we need a thread in order to be responsive and “throughput” is not a relevant concept.

Sometimes we do care about performance, but the tasks that need to be processed take various amounts of time, and sometimes block for significant amounts of time waiting for disk, network, shared locks, and all sorts of other things. In this case, a threadpool can be useful but tuning its settings is a matter of experimentation and guesswork.

And sometimes we simply need to perform a very CPU-intensive piece of work as fast as possible. If the work can be parallelised, then (as described above) the best throughput can be obtained by dividing the work into N equal tasks where N is the number of CPUs on the local system. Unfortunately, even when the algorithm can be parallelised, generating exactly N equal tasks can be hard - and N will vary from host to host. Fortunately, dividing the work into a large number of tasks (hundreds or thousands) and then running these tasks via a threadpool of size N will have almost the same performance as running N tasks. This is exactly the approach of the fork/join framework. There is a little overhead each time a task is completed and the next one allocated to a thread but, as long as each task does at least a moderate amount of CPU-intensive work before completing, this overhead is a small fraction of the overall processing time.

A CPU-intensive algorithm using the fork/join framework can generate a large number of tasks directly, queue them for processing, wait for completion, and merge the results. However it is often more convenient to generate just a few initial tasks, and have them generate more tasks, etc., until reaching a point where tasks are “small enough” to not need further splitting. The first approach leads to a “flat” structure while the second leads to a deep tree of tasks. Either is fine.

Note that this applies only to CPU-intensive code which doesn’t block; any blocking of a fixed-size threadpool will obviously break the whole concept. To quote the javadoc for class ForkJoinTask:

The efficiency of ForkJoinTasks stems from a set of restrictions (that are only partially statically enforceable) reflecting their main use as computational tasks calculating pure functions or operating on purely isolated objects.

In summary, fork/join is really a way of partitioning problems for parallel processing. This approach is not limited to Java; Intel Cilk is a c/c++ library and compiler extension which includes support for the same approach, and the cilk_spawn/cilk_sync api is one of the inspirations for the fork/join implementation in the Java standard libraries.

Continuations

Later in this article I refer to continuations, so for those who are not familiar with them, here is a quick summary (though the Wikipedia article linked to is better than any description I can make)..

Java provides a threading model that maps 1:1 to threads provided by the underlying operating system. Calls from a thread to IO operations cross from the JVM into the operating system kernel, and are suspended there (blocked) until the necessary data is available. This has obvious consequences for threadpools.

Each native thread has an associated block of memory used as a call stack, on which local variables and return addresses for the current method-call-chain are stored. Each stack also has a block of data holding the “program counter” (current address within the application) and various flags.

Native threads only expose this thread context information to the application in limited form. In Java, Throwable.getStackTrace provides access to some of this information. In other languages, it is possible to (indirectly) capture the entire state of the current thread in a form which can allows it to later be restarted at the same point; Python generators are a limited type of continuation where the state is ‘captured’ at the point a yield statement is executed.

There are programming languages where calls to blocking operations are effectively handled by continuations. The language itself starts only a small number of native threads (eg one per pysical CPU) with which to execute user code. The runtime library that the language provides to access operating-system operations handles all calls to “blocking” operations by creating a closure, handling off the actual kernel-level blocking operation to some internal threadpool, then transferring control to some other continuation, effectively “context switching” to another task in the user code. This is similar to what the operating system kernel would do, but “above” the kernel in user-space. This is often called “userspace threads”, “lightweight threads” or “green threads”.

There are also programming languages which make continuations available to the programmer. Unfortunately Java is not one of them (and Python generators are only a very limited form of continuation).

The Fork/Join Pattern

The fork/join (partitioning) approach is described well in the javadoc, but to paraphrase:

while (input data is large enough to be partitioned for parallelisation)
  create multiple tasks representing subsets of the input data
  start the multiple tasks and wait for them to finish
  merge the results

Because the optimal size of the underlying threadpool for any fork/join framework is the number of CPUs, the JVM standard library can just create a single threadpool and use it for all callers; having more threadpools will not improve the overall throughput. Thus the concept of a commonPool as mentioned in the javadoc. If a developer really wants to, they can define their own threadpool (ExecutorService) but it won’t make anything faster (unless someone has misused the common pool by scheduling blocking tasks in it).

Of course, as soon as such a framework exists, developers will start to misuse it. Ideally any “fixed-size work stealing threadpool” (such as the fork/join framework provides) should catch any blocking IO and terminate the entire tree of operations with an UnsupportedOperationException or similar. However this is not done; such code will just block one of the available threads and reduce overall throughput. Javadoc for class ForkJoinTask states that blocking operations are acceptable “as long as no other tasks wait on the blocking one” and there are not too many of them. In other words, they should be simple “top level” operations that are not generated from other tasks. Still should be avoided though. As an escape-hatch, a task can use ForkJoinPool.managedBlock(..). This method allows control over the threading behaviour, including being able to start a dedicated thread. Obviously that has performance issues - and the whole point of fork/join is really to avoid such stuff.

Sadly, the existence of method Collections.parallelStream which internally implements the operation over the fork/join framework using the commonPool threadpool is likely to tempt many developers into using fork/join inappropriately (by invoking blocking operations).

Every call to ForkJoinTask.fork() queues the task on the workqueue associated with the calling thread - pretty simple and obvious.

A call to ForkJoinTask.join() on a task that has already been completely executed by this or some other thread just returns immediately. A call to a task which is running or not yet scheduled instead causes the calling thread to execute the oldest (head) task on the calling thread’s workqueue (which might happen to be the task being waited on). When that executed task completes, the status of the task upon which the join is being done is tested again; if still not done repeat. At some point the caller will discover that the task being joined-on is complete, and so the caller will then continue with its current method.

Note that this kind of ‘join’ is completely different from Thread.join in which the calling thread simply blocks, waiting for the target to complete.

Class ForkJoinTask sadly uses the “template class” antipattern where code extends a base class with significant functionality. The result is a large number of methods on the class, of which most are irrelevant for most developers. Methods fork, join, invoke* and exec are the only relevant methods for most developers. Actually, most developers will use the RecursiveTask or RecursiveAction subclasses of ForkJoinTask, in which case compute replaces exec.

One of the benefits of divide-and-conquer algorithms (of which fork/join is an implementation) is better use of caches. Each subtask usually operates over a range of values related to values being accessed by its parent and sibling tasks. Due to the fact that a “work stealing” threadpool generally schedules a task on the same thread as its parent and sibling tasks, this potentially results in better use of CPU caches. This benefit is not so easily achieved in Java (compared to other languages such as C) due to Java’s heap-management and garbage-collection but at least some benefit is available.

Thread Explosions and Stack Depth Restrictions

Searching for fork/join on the internet immediately returns links to a number of pages describing problems with large numbers of threads being generated. There is code in the fork/join frameworks for both Java1.7 and Java1.8 which does create additional threads under specific circumstances. However the cases which trigger this behaviour are not clear from reading the code (it is somewhat obscure) and the test application shown later in this article does not trigger such behaviour in either Java1.7 or Java1.8.

The “Fork/Join Calamity” article referenced earlier (and discussed further below) claims that the join method implementation in Java1.7 works differently from 1.8; rather than invoking another task in the same thread it supposedly spawns a new thread (called a “continuation thread”) for processing tasks from the original thread’s workqueue then lets the original thread block. In this approach there is still just one non-blocked thread per workqueue (and thus the “low contention” implementation of the associated workqueue still works) but the extra spawned threads can consume significant system resources for deep trees of fork/join tasks. However in tests I could not verify this; Java1.7 appears to work in (approximately) the same way as Java1.8.

The approach where join on a not-yet-complete task actually invokes some task in the same thread does have some disadvantages (over spawning additional threads), including increased stack-depth and arbitrary delays for join operations (the tasks executed by the calling thread may not be related to the task that is performing the join). However although local predictability is worse, overall throughput is better and the number of threads is fixed.

The stack depth issue is probably not a significant issue in production use. As described above, a join operation can trigger invocation of a new task in the same thread. That new task can perform a join which triggers invocation of yet another task, etc. A Java thread only has a fixed amount of “stack space” so the nested-calls solution can potentially lead to a StackOverflowException. However tasks are objects invoked once in a single-threaded way; they therefore can safely store data in fields rather than in local variables - and the task itself is stored on the heap. Therefore, with a little care, task objects should not require much data on the stack and so can be nested quite deeply before stack overflow occurs. On the other hand, the fact that a call to join can lead to invocation of any task queued in the same fork-join-pool, not just tasks related to the caller, can potentially lead to unpredictable stack usage - adding a poorly-designed task to the same pool counld cause stack overflow problems at rare, unpredictable, and unrepeatable intervals.

The Calamity article also states that the framework (both 1.7 and 1.8 versions) creates additional “continuation” or “compensation” threads. The source-code does indeed contain code to create additional threads under some circumstances (see ForkJoinPool.tryCompensate in Java1.8) but it is not clear what these circumstances are. The Calamity article also mentions that use of standard library concurrency classes Phaser or CompletableFuture together with the fork/join framework can trigger creation of additional threads. I haven’t evaluated this.

Thread Locals

It should be reasonably obvious that using thread-locals within tasks running within a threadpool is not a good idea. Care also has to be taken not to call into libraries that use thread-locals or store the result of Thread.currentThread() or similar, as totally unrelated code may be executed with that same thread.

Embarassingly Parallel

The term embarassingly parallel is used to describe problems for which efficient parallel implementations are easy and obvious. The quicksort algorithm is a good example: divide the input into chunks, sort each chunk, then sort those results. Each sorting pass operates on a completely distinct subset of the overall data, so there is no interaction between tasks running in parallel.

Not all algorithms can be parallelised so easily; some require sharing of datastructures between threads (requiring locking) or synchronization of different processing phases.

Embarassingly parallel algorithms are good candidates for implementation via the fork/join framework.

The Fork/Join Calamity Article

With that background information, this article criticising fork/join in Java now makes more sense.

First, the author is also the implementor of the TymeacDSE library which is released under a dual-source licence; it is open-source for some uses but the author also sells licenses to the software for other purposes and so has an interest in promoting it. This doesn’t make the arguments invalid, but the information should be read with care.

The primary issue in the article appears to be a complaint that the Java fork/join framework in the standard SDK does not implement a general-purpose event-processing framework. A general-purpose framework would certainly be useful, allowing all sorts of things such as non-blocking webservers and transaction-processing systems. However I am of the opinion that such things belong in external libraries rather than being integrated into the standard library runtime - or at least it should be proven as an external library before such integration should be considered. The Scala runtime integrated an Actor-based framework which is perhaps similar in principle to what the author is proposing; later an external competing framework was shown to be a far better solution and so that was also integrated - and now there are two such frameworks in the Scala standard runtime. The fork/join framework that has been added to the Java standard libraries is already one of the more complex pieces of code within the SDK, despite being for a specific purpose only (CPU-intensive processing) and I am glad it was not made more complex.

The article also uses the term “application server” to refer to a full-featured general-purpose solution - which may well be a good idea in general, but is not the goal of the fork/join framework and IMO doesn’t belong in the Java standard libraries. The term “application server” is possibly being used here in the sense of Tuxedo/Microsoft Transaction Server/Actor framework/etc.

The article talks about the fork/join framework “making joins outside of a controlled environment” and not being able to correctly “context switch” which appears to be referencing the lack of proper continuation support in Java. It is true that the fork/join framework encourages code to make calls to join in the middle of a parent task, which causes some problems (stack usage). It is also true that both the Java1.7 and Java1.8 implementations have code to create additional threads under some circumstances (though from the code it is not clear what these circumstances are). However the typical usecase does not seem to trigger the creation of additional threads.

Proper continuation support in Java would allow a correct solution to the join problem. However given that continuations are not available and are not likely to ever be available it might have been better for the designers of the fork/join framework to have omitted the join method altogether and instead do something like have a task return the list of other tasks it should wait for together with a “callback” object to be invoked when those tasks have completed. This would probably allow the same set of algorithms to be implemented while avoiding “blocking joins” at all. It is not clear if this or a similar approach would also solve the problems which (apparently) are caused by the Phaser class and related synchronization primitives. This may be what the author is referring to by “using a separate structure to hold intermediate results”.

The author also appears to be stating that “work stealing” (queue all work on the thread that generated it, and let idle threads grab tasks from queues associated with other threads) is inferior to placing new work items (tasks) on the queues of idle threads when the work items are first created (“scatter/gather”). Presumably he has statistics to support this, but I expect this extra optimisation is only measurable in truly specific high-volume processing cases.

The inability of the fork/join framework to “track the request”, “time and cancel requests” and “gather statistics” are criticised as well as its lack of “error detection/recovery/reporting”, stall detection, monitoring and logging. For a general-purpose “application server” these are indeed necessary features. For the fork/join framework whose sole purpose is to accelerate CPU-intensive non-blocking “number crunching” algorithms this does not seem to be a real problem.

The author also criticises the fork/join API as being too large and complicated. I have to agree 100% here.

Verifying Join Behaviour

The following program was used to verify the described behaviour of method ForkJoinTask.join. It creates a ForkJoinPool with a single thread in it, then creates a large number of tasks; each task forks a single child task then joins on it, ie creates a “tree” of tasks N elements deep.

With Java1.8 the result of ForkJoinTask.getPool().getActiveThreadCount() is 1 as expected; no additional threads have been created. However the stack trace at the leaf task (the one with no children) is N*5 method calls deep, and is of the following pattern:

        ...
	at net.vonos.ForkJoinDemo$NestingTask.compute(ForkJoinDemo.java:29)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:389)
	at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
	at net.vonos.ForkJoinDemo$NestingTask.compute(ForkJoinDemo.java:29)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:389)
	at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
	at net.vonos.ForkJoinDemo$NestingTask.compute(ForkJoinDemo.java:29)
        ....

A call to join on a task which is not yet started causes the calling thread to execute some other queued task, as described earlier - method compute calls join which calls three internal fork/join framework methods which then calls compute on the next task.

With the older Oracle Java 1.7.0_09, I found the same behaviour. The stack depth was only N*4 rather than N*5 (one less internal method in the fork/join framework), but the number of threads was still one - there was no trace of the “thread explosion” problem or of the “continuation threads” described in the Calamity article.

package net.vonos;

import java.util.concurrent.*;

/**
 * Demonstrate the behaviour of method ForkJoinPool.join().
 */
public class ForkJoinDemo {

    public static class NestingTask extends RecursiveAction {
        private final int count;
        NestingTask(int count) {
            this.count = count;
        }

        @Override
        protected void compute() {
            if (count == 0) {
                System.out.println("nthreads:" + this.getPool().getActiveThreadCount());

                Throwable t = new Throwable();
                t.fillInStackTrace();

                StackTraceElement stackInfo[] = t.getStackTrace();
                System.out.println("stack depth:" + stackInfo.length);
                System.out.println(t);
                t.printStackTrace();

            } else {
                NestingTask child = new NestingTask(count-1);
                child.fork();
                child.join();
            }
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(1);
        NestingTask baseTask = new NestingTask(200);
        pool.invoke(baseTask);
    }
}

References

Actually, the javadoc for ForkJoinPool and ForkJoinTask are probably the best references available on this topic.