The Java Fork/Join Framework

Categories: Java

Overview

This article discusses the “fork/join” framework introduced into the Java standard libraries in version 1.7, and method Collection.parallelStream which performs parallel processing on collections using this fork/join framework internally. This is not a simple “how to use” article; there are plenty of those around already. Instead this article presents the motivations, pros/cons and internal details which are relevant for understanding its performance and limitations (at least as I understand them after a day’s research).

Java version 1.7 introduced the “fork/join” framework into the standard library; see java.util.concurrent.ForkJoinPool and related classes.

Java version 1.8 added methods stream and parallelStream to the interface java.util.Collection which is an ancestor of most of the standard collection-types (eg List). Method stream provides various ways to apply a lambda to the elements of a collection, providing operations well-known to functional programmers including map, flatMap and reduce. Method parallelStream does the same but uses a threadpool to execute the operations in parallel where possible, via the fork/join framework.

Class ForkJoinPool implements the standard ExecutorService interface, and so for some types of workload can be used as a replacement for other kinds of threadpool such as that returned by ExecutorService.newFixedThreadPool. In fact, ExecutorService.newWorkStealingPool creates a ForkJoinPool (returning it as interface ExecutorService). However in most cases a ForkJoinPool will be created directly in order to have access to methods not on the standard ExecutorService interface such as fork and join.

The javadoc for classes ForkJoinPool and ForkJoinTask is verbose but nevertheless was never entirely clear to me, and it was only when I read the original academic paper on fork/join that the whole thing started to make sense. The Fork Join Calamity article also motivated me to research the topic more thoroughly as it is a very detailed, though not very clearly written, critique that I initially struggled to understand. The calamity article is discussed further below.

I’m no expert in this area; corrections to errors below are very welcome.

Work Stealing

A standard ExecutorService which wraps a dynamic-size threadpool has a single queue of tasks to be executed, and a set of threads with which to execute them. When a task is added to the queue and an already-existing thread is idle, then that idle thread executes the task. When no thread is idle, then a new thread is created and that thread executes the task.

A standard ExecutorService which wraps a fixed-size threadpool has a single queue of tasks to be executed, and a set of threads with which to execute them. When a task is added to the queue and an existing thread is idle, then that idle thread executes the task. When no thread is idle, the task is added to a work-queue. When a thread has nothing to do (is idle), it takes the oldest (head) task from the work-queue and executes it.

A “work stealing” ExecutorService is a kind of fixed-size threadpool in which there is a work-queue for each thread. In addition, there is a set of “submission queues” which are not bound to any thread. When a thread finishes its previous task (ie becomes idle) then it takes an item from its local work-queue. If its local work-queue is empty then it tries to take an item from a submission queue or some other thread’s workqueue. The workqueues are actually deque structures, ie elements can be taken from either end; by default a thread takes the newest item from its local queue (LIFO) but steals the oldest item from remote queues (FIFO). In effect, a thread treats its own worklist as a stack (push/pop) but treats remote worklists as a queue. The idea behind the standard behaviour is that it minimises locking contention between the threads in a threadpool; for long-running tasks this is minimal but where a workqueue contains a large number of quickly-executed tasks the benefit can be significant.

Initial (top-level) tasks submitted by threads outside of the workstealing pool are written to one of the “submission queues”, from which it is then taken by an idle workpool thread looking to “steal” some work.

A work-stealing executor service can be created via Executors.newWorkStealingThread. However that method isn’t often used; if there are not huge volumes of tasks being processed, or they are relatively long-running tasks (eg might block), then normal thread-pools work just as well. And if there is a need to process large volumes of quickly-executed (and non-blocking) tasks, then a ForkJoinPool should be created directly in order to be able to use its extended APIs. Internally, work-stealing is still being used as the fork/join pattern generally generates large numbers of quick-running tasks which is exactly where the low-lock-contention behaviour of the work-stealing approach works best.

Note that it isn’t forbidden to perform long-running or blocking operations in a task submitted to a “work-stealing” ExecutorService. However given that it is a fixed-size pool of threads, too many blocking tasks will significantly slow the system. In general it is better for such workloads to be run in a dynamic-size threadpool so that tasks can still be processed while other tasks are blocking their threadpool-threads.

A threadpool running a large number of non-blocking tasks is CPU-bound, and maximum throughput is reached with a threadpool-size which matches the number of physical CPUs on the local system. Adding more processing threads to a CPU-bound system only slows it further, due to pointless context-switching. Therefore “work stealing” ExecutorService instances are fixed-size threadpools and default to 1 thread per physical CPU on the local system.

Fork/Join Motivation

Native threads are heavy-weight objects, slow to start, and take lots of memory, and are slow to “schedule”.

Sometimes we just need a thread and don’t care about the performance. In particular, when processing user input we need a thread in order to be responsive and “throughput” is not a relevant concept.

Sometimes we do care about performance, but the tasks that need to be processed take various amounts of time, and sometimes block for significant amounts of time waiting for disk, network, shared locks, and all sorts of other things. In this case, a threadpool can be useful but tuning its settings is a matter of experimentation and guesswork.

And sometimes we simply need to perform a very CPU-intensive piece of work as fast as possible. If the work can be parallelised, then (as described above) the best throughput can be obtained by dividing the work into N equal tasks where N is the number of CPUs on the local system. Unfortunately, even when the algorithm can be parallelised, generating exactly N equal tasks can be hard - and N will vary from host to host. Fortunately, dividing the work into a large number of tasks (hundreds or thousands) and then running these tasks via a threadpool of size N will have almost the same performance as running N tasks. This is exactly the approach of the fork/join framework. There is a little overhead each time a task is completed and the next one allocated to a thread but, as long as each task does at least a moderate amount of CPU-intensive work before completing, this overhead is a small fraction of the overall processing time.

Actually, given that CPUs are not likely to be 100% dedicated to the parallel algorithm being run, generating exactly N tasks is not optimal; one of those N tasks may taks longer due to its CPU being used to perform other work. Given that the job takes as long as its slowest task, that delays the overall completion. Generating more tasks (eg N*5) will allow the fastest CPUs to perform several tasks while the slowest CPUs complete fewer, resulting in an overall quicker completion time. The more tasks generated, the better this “smoothing” of allocated CPU time works - but at the price of greater overhead for task-management. As noted, even a quite high number of tasks is acceptable before the overhead dominates, as long as each task does a moderate amount of work.

Note that this applies only to CPU-intensive code which doesn’t block; any blocking of a fixed-size threadpool will obviously break the whole concept. To quote the javadoc for class ForkJoinTask:

The efficiency of ForkJoinTasks stems from a set of restrictions (that are only partially statically enforceable) reflecting their main use as computational tasks calculating pure functions or operating on purely isolated objects.

In summary, Java’s built-in fork/join support is really a way of partitioning problems for parallel processing. This approach is not limited to Java; Intel Cilk is a c/c++ library and compiler extension which includes support for the same approach, and the cilk_spawn/cilk_sync api is one of the inspirations for the fork/join implementation in the Java standard libraries. Fork/join is useful for things such as performing a parallel search over a large data-structure or a parallel transformation of a large dataset (eg graphical filter algorithms applied to arrays of pixels).

Continuations

Later in this article I refer to continuations, so for those who are not familiar with them, here is a quick summary (though the Wikipedia article linked to is better than any description I can make)..

Java provides a threading model that maps 1:1 to threads provided by the underlying operating system. Calls from a thread to IO operations cross from the JVM into the operating system kernel, and are suspended there (blocked) until the necessary data is available. This has obvious consequences for threadpools.

Each native thread has an associated block of memory used as a call stack, on which local variables and return addresses for the current method-call-chain are stored. Each stack also has a block of data holding the “program counter” (current address within the application) and various flags.

Native threads only expose this thread context information to the application in limited form. In Java, Throwable.getStackTrace provides access to some of this information. In other languages, it is possible to (indirectly) capture the entire state of the current thread in a form which allows it to later be restarted at the same point; Python generators are a limited type of continuation where the state is ‘captured’ at the point a yield statement is executed.

There are programming languages where calls to blocking operations are effectively handled by continuations. The language itself starts only a small number of native threads (eg one per pysical CPU) with which to execute user code. The runtime library that the language provides to access operating-system operations handles all calls to “blocking” operations by creating a continuation, handling off the actual kernel-level blocking operation to some internal threadpool, then transferring control to some other continuation, effectively “context switching” to another task in the user code. This is similar to what the operating system kernel would do, but “above” the kernel in user-space. This is often called “userspace threads”, “lightweight threads” or “green threads”.

There are also programming languages which make continuations available to the programmer. Unfortunately Java is not one of them (and Python generators are only a very limited form of continuation).

The Fork/Join Pattern

The fork/join (partitioning) approach is described well in the javadoc, but to paraphrase:

while (input data is large enough to be partitioned for parallelisation)
  create multiple tasks representing subsets of the input data
  start the multiple tasks and wait for them to finish
  merge the results

Because the optimal size of the underlying threadpool for any fork/join framework is the number of CPUs, the JVM standard library can just create a single threadpool and use it for all callers; having more threadpools will not improve the overall throughput. Thus the concept of a commonPool as mentioned in the javadoc. If a developer really wants to, they can define their own threadpool (ExecutorService) but it won’t make anything faster (unless someone has misused the common pool by scheduling blocking tasks in it).

Of course, as soon as such a framework exists, developers will start to misuse it. Ideally any “fixed-size work stealing threadpool” (such as the fork/join framework provides) should catch any blocking IO and terminate the entire tree of operations with an UnsupportedOperationException or similar. However this is not done; such code will just block one of the available threads and reduce overall throughput. Javadoc for class ForkJoinTask states that blocking operations are acceptable “as long as no other tasks wait on the blocking one” and there are not too many of them. In other words, they should be simple “top level” operations that are not generated from other tasks. Still should be avoided though. As an escape-hatch, a task can use ForkJoinPool.managedBlock(..). This method allows control over the threading behaviour, including being able to start a dedicated thread. Obviously that has performance issues - and the whole concept of fork/join isn’t really meant for such stuff.

Sadly, the existence of method Collections.parallelStream which internally implements the operation over the fork/join framework using the commonPool threadpool is likely to tempt many developers into using fork/join inappropriately (by invoking blocking operations).

Every call to ForkJoinTask.fork() queues the task on the workqueue associated with the calling thread - pretty simple and obvious.

A call to ForkJoinTask.join() on a task that has already been completely executed by this or some other thread just returns immediately. A call to a task which is running or not yet scheduled instead causes the calling thread to execute the oldest (head) task on the calling thread’s workqueue (which might happen to be the task being waited on). When that executed task completes, the status of the task upon which the join is being done is tested again; if still not done repeat. At some point the caller will discover that the task being joined-on is complete, and so the caller will then continue with its current method.

Note that this kind of ‘join’ is completely different from Thread.join in which the calling thread simply blocks, waiting for the target to complete.

Class ForkJoinTask sadly uses the “template class” antipattern where code extends a base class with significant functionality. The result is a large number of methods on the class, of which most are irrelevant for most developers. Methods fork, join, invoke* and exec are the only relevant methods for most developers. Actually, most developers will use the RecursiveTask or RecursiveAction subclasses of ForkJoinTask, in which case compute replaces exec.

One of the benefits of divide-and-conquer algorithms (of which fork/join is an implementation) is better use of caches. Each subtask usually operates over a range of values related to values being accessed by its parent and sibling tasks. Due to the fact that a “work stealing” threadpool generally schedules a task on the same thread as its parent and sibling tasks, this potentially results in better use of CPU caches. This benefit is not so easily achieved in Java (compared to other languages such as C) due to Java’s heap-management and garbage-collection but at least some benefit is available.

Generating the Task Tree

A CPU-intensive algorithm using the fork/join framework can generate a large number of tasks directly, queue them for processing, wait for completion, and merge the results. However it is often more convenient to generate just a few initial tasks, and have them generate more tasks, etc., until reaching a point where tasks are “small enough” to not need further splitting. The first approach leads to a “flat” structure while the second leads to a deep tree of tasks.

Methods fork and join may be called by threads belonging to the threadpool, or by external threads. The behaviour is subtly but importantly different in these two cases.

When an external thread submits just one initial task, and that task then generates a large number of “leaf” subtasks and joins on them, processing of the resulting “flat” tree of tasks is suboptimal. The initial task runs on one workpool thread, and the child tasks are all queued on the local workqueue. In order for other threads in the pool to process those leaf tasks, they must “steal” them, resulting in additional contention. In this case, a deeper tree of tasks would be better; assuming there are some idle tasks in the pool, those next-level non-leaf children will be quickly stolen, and their children will then be queued on the local workqueue of whichever task stole them, thus scattering leaf tasks across multiple workqueues and reducing contention later. This could be called a case of “leaky abstraction” - the internal implementation dictates how using code should best be structured, which is somewhat ugly.

The alternative to having fork place tasks on the thread’s local work-queue would be to have method fork proactively scatter tasks across all work-queues, resulting in higher initial lock contention but possibly lower contention later. Or simply have one work-queue. Presumably the authors of the fork/join framework have measured the approaches and have statistics to back their choice. The author of the “calamity” article and the TymeacDSE framework asserts that the “scatter” approach is superior. Fortunately this behaviour of fork is not explicitly part of the API and thus could change in future releases of the built-in fork/join framework if necessary.

When an external thread submits the initial task via ForkJoinPool.invoke rather than ForkJoinPool.submit, then the task is “bound” to the specified pool but its callback is executed by the calling thread. Any child tasks generated via fork then get written to the submission queue for that external thread, from which they can be stolen by available idle workpool threads. When an external thread calls join then the “execute another task” behaviour is not triggered; instead the calling code simply blocks until the joined-on task completes.

As noted in the “calamity” article part 2, this “submitter runs tasks” behaviour can have some undesirable side-effects. However it can be easily avoided - just use ForkJoinPool.submit if such side-effects could be a concern, and all task logic will be executed exclusively by workpool threads.

By default, a thread processes items on its local workqueue in LIFO order (ie treats the workqueue as a stack), while other threads steal items from that workqueue in FIFO order. A workqueue created with the “async” flag set will also process items on its local workqueue in FIFO order, which may be more suitable when the goal of the threadpool is predictable processing order rather than maximum throughput.

Compensation Threads

Work-stealing threadpools have a fixed number of threads. Clearly, performing any blocking operation in one of these threads will reduce overall performance, as any tasks in the associated workqueue will continue to wait instead of making use of the blocked thread. Fork/join pools are always backed by a work-stealing threadpool so the same problem applies to them too.

One special case of blocking operations has been optimised for work-stealing threadpools: the “synchronization primitive” classes in package java.util.concurrent recognise when a thread belonging to a work-stealing threadpool is about to perform a blocking operation (eg wait on a mutex). These classes then tell the associated threadpool to create an additional “compensation thread” before the current thread blocks. The number of “active threads” in the threadpool thus remains the same.

This behaviour of the concurrency classes is rather magical, special-case, and not clearly documented.

In early versions of the fork/join framework, there was also no limit to the number of “compensation threads” that would be created in this way. In the worst case, a poorly-written user of fork/join which performed a blocking synchronization operation in each task could create a number of threads equal to the number of tasks - which could be hundreds. Searching for fork/join on the internet immediately returns links to a number of pages describing problems with large numbers of threads being generated.

For the fork/join “common pool”, there is a limit on the number of threads which can be spawned in this way; exceeding this limit triggers the exception java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker. The limit is set by private constant ForkJoinPool.DEFAULT_COMMON_MAX_SPARES=256. The following information is from the javadoc of class ForkJoinPool:

The compensation mechanism may be bounded. Bounds for the commonPool (see commonMaxSpares) better enable JVMs to cope with programming errors and abuse before running out of resources to do so. In other cases, users may supply factories that limit thread construction.

The conclusion seems clear: blocking operations should not be performed in fork/join tasks, including use of the java.util.concurrent synchronization primitives. You might get away with a few such calls used carefully, but in general the standard fork/join framework is not friendly to blocking in any way. In my personal opinion, the hacks to the synchronization primitives to spawn “compensation threads” were a bad idea - be careful when using synchronization of any sort! It would also be wise to always use a custom ThreadFactory with each new ForkJoinPool, and to enforce a sane thread limit in that ThreadFactory - a shame that isn’t default behaviour.

According to other sources (including the “Fork/Join Calamity” article referenced earlier), in early versions of the fork/join implementation in Java1.7, method join also created a “compensation thread” while it waited for the task it was joining on to complete. However I have not been able to verify this; recent versions of Java1.7 certainly behave the same as Java1.8 in which join on a not-started or still-running task instead causes the caller to execute the “head” task from the workqueue instead of just waiting.

For further information, see the source-code for ForkJoinPool.tryCompensate in Java 1.8.

Stack Depth Restrictions

The way that join on a not-yet-complete task actually invokes some task in the same thread does have some disadvantages, including increased stack-depth and arbitrary delays for join operations (the tasks executed by the calling thread may not be related to the task that is performing the join). That new task can itself perform a join which triggers invocation of yet another task, etc. A Java thread only has a fixed amount of “stack space” so the nested-calls solution can potentially lead to a StackOverflowException. However tasks are objects invoked once in a single-threaded way; they therefore can safely store data in fields rather than in local variables - and the task itself is stored on the heap. Therefore, with a little care, task objects should not require much data on the stack and so can be nested quite deeply before stack overflow occurs. On the other hand, the fact that a call to join can lead to invocation of any task queued in the same fork-join-pool, not just tasks related to the caller, can potentially lead to unpredictable stack usage - adding a poorly-designed task to the same pool could cause stack overflow problems at rare, unpredictable, and unrepeatable intervals.

Using a dedicated fork/join pool rather than the common pool will at least isolate tasks from the dangers of calling into other tasks requiring excessive stack space (perhaps ones queued indirectly by libraries used in the application).

The stack depth issue is nevertheless probably not a significant issue in production use.

Error Handling

When a fork/join task throws an exception, this is caught by the fork/join framework and the task is marked as having state “exceptional”. Calling join on such a task triggers another exception, thus marking the caller as having state “exceptional” too; the failure therefore by default propagates up through the chain of parent tasks. Other tasks running in the same threadpool are not affected; in particular when a call to join triggers processing of an arbirary unrelated task on the work-queue, failure of that task will not affect the caller because the caller is not explicitly joining to that task, just evaluating it.

This is not a sophisticated error-handling strategy; there is no error-logging facility, or retry facility. However for the kinds of algorithms which the built-in fork/join framework is intended to be used for (CPU-intensive parallel searches, transforms, etc) there is really no need for anything more - the overall job succeeds or fails.

The CountedCompleter Class

Proper continuation support in Java would allow a correct solution to the join problem, and for blocking in general. However given that continuations are not available and are not likely to ever be available it might have been better for the designers of the fork/join framework to have omitted the join method altogether and instead do something like have a task return the list of other tasks it should wait for together with a “callback” object to be invoked when those tasks have completed. This would allow some (all?) algorithms to be implemented while avoiding “blocking joins” at all.

And in fact, in Java1.8 this pattern has been implemented as class CountedCompleter, which subclasses ForkJoinTask.

(Thanks to Ed Harned for pointing this class out)

Thread Locals

It should be reasonably obvious that using thread-locals within tasks running within a threadpool is not a good idea. Care also has to be taken not to call into libraries that use thread-locals or store the result of Thread.currentThread() or similar, as totally unrelated code may be executed with that same thread.

Embarassingly Parallel

The term embarassingly parallel is used to describe problems for which efficient parallel implementations are easy and obvious. The quicksort algorithm is a good example: divide the input into chunks, sort each chunk, then sort those results. Each sorting pass operates on a completely distinct subset of the overall data, so there is no interaction between tasks running in parallel.

Not all algorithms can be parallelised so easily; some require sharing of datastructures between threads (requiring locking) or synchronization of different processing phases.

Embarassingly parallel algorithms are good candidates for implementation via the fork/join framework.

Thread Management

Although the “work stealing threadpool” is described as a fixed-size pool above, it is lazily initialised. A new thread is created when a task is written to a submission queue, there is no existing idle thread, and the number of existing threads is less than the threadpool maximum.

When a pool thread becomes idle, it uses an “unsafe” Java method to just suspend itself (like Object.wait() but more efficient); on submission of a new task, when an idle thread exists then it is “resumed” via a call to Unsafe.unpark(Object) (like Object.notify() but more efficient). This avoids permanent looping of idle tasks.

Parallel Streams

Java 1.8 provides method Collection.parallelStream which provides a set of methods for processing collections of data which execute the logic via a ForkJoinPool so that all available CPUs can be applied to the operation in parallel.

The “calamity” article part 2 discusses a number of problems related to the use of fork/join by Collection.parallelStream. I have not assessed these problems; this article is about core fork/join only. However I will note that the restrictions on fork/join (in particular, don’t block) also apply to any closure passed used with parallel streams - the consequences of breaking this rule could be unexpected.

The Fork/Join Calamity Article

With that background information, this article criticising fork/join in Java now makes more sense.

First, the author Ed Harned is also the implementor of the TymeacDSE library which is released under a dual-source licence; it is open-source for some uses but the author also sells licenses to the software for other purposes and so has an interest in promoting it. This doesn’t make the arguments invalid, but the information should be read with care.

The primary issue in the article appears to be a complaint that the Java fork/join framework in the standard SDK does not implement a general-purpose event-processing framework. A general-purpose framework would certainly be useful, allowing all sorts of things such as non-blocking webservers and transaction-processing systems. However I am of the opinion that such things belong in external libraries rather than being integrated into the standard library runtime - or at least it should be proven as an external library before such integration should be considered. The Scala runtime integrated an Actor-based framework which is perhaps similar in principle to what the author is proposing; later an external competing framework was shown to be a far better solution and so that was also integrated - and now there are two such frameworks in the Scala standard runtime. The fork/join framework that has been added to the Java standard libraries is already one of the more complex pieces of code within the SDK, despite being for a specific purpose only (CPU-intensive processing) and I am glad it was not made more complex.

The article also uses the term “application server” to refer to a full-featured general-purpose solution - which may well be a good idea in general, but is not the goal of the fork/join framework and IMO doesn’t belong in the Java standard libraries. The term “application server” is possibly being used here in the sense of Tuxedo/Microsoft Transaction Server/Actor framework/etc.

The article talks about the fork/join framework “making joins outside of a controlled environment” and not being able to correctly “context switch” which appears to be referencing the lack of proper continuation support in Java. It is true that the fork/join framework encourages code to make calls to join in the middle of a parent task, which causes some problems (stack usage). It is also true that both the Java1.7 and Java1.8 implementations have code to create additional threads under some circumstances. However the typical usecase of “cpu-intensive processing” does not trigger the creation of additional threads.

The lack of continuation support in Java is the primary cause of many of the limitations/issues of the fork/join framework. The (late) addition of class CountedCompleter provides a rather clumsy and limited equivalent to continuations. This kind of class may be what the author is referring to as “using a separate structure to hold intermediate results”.

The author also appears to be stating that “work stealing” (queue all work on the thread that generated it, and let idle threads grab tasks from queues associated with other threads) is inferior to placing new work items (tasks) on the queues of idle threads when the work items are first created (“scatter/gather”). Presumably he has statistics to support this, but I expect this extra optimisation is only measurable in truly specific high-volume processing cases.

The inability of the fork/join framework to “track the request”, “time and cancel requests” and “gather statistics” are criticised as well as its lack of “error detection/recovery/reporting”, stall detection, monitoring and logging. For a general-purpose “application server” these are indeed necessary features. For the fork/join framework whose sole purpose is to accelerate CPU-intensive non-blocking “number crunching” algorithms this does not seem to be a real problem.

The author also criticises the fork/join API as being too large and complicated. I have to agree 100% here.

The PDF “summary” linked to from the (updated) articles is more readable than the HTML pages, and more up-to-date; if you are interested in reading the article I would recommend starting there.

Fork/Join Calamity Test Programs

The article provides a link “Download the source code for the article here” which provides a number of test programs. I have tried some of these out with the following results. The test system was a 4-core laptop using AMD64, Linux, Java 1.8.0_131.

I’ll put my conclusions before the results:

  • For simple CPU-intensive problems, the framework works acceptably.
  • Leaf tasks should perform reasonable amounts of work - not too much (otherwise load doesn’t get distributed well over multiple CPUs) and not too little (otherwise the framework overhead is too high relative to the actual work done).
  • Blocking in fork/join tasks is bad; don’t do it (including using synchronization primitives)

Test: MultiRecurSubmit

This application creates a 16-level-deep tree of tasks within a ForkJoin pool, with a total of 16factorial tasks (which is a very large number).

The test is intended to demonstrate “excessive thread creation” which can be seen with JConsole and similar. However on my test environment (Java8) I saw no problem with excess threads. It would appear that whatever problem this application previously demonstrated has been resolved in current releases of Java.

Test: LongSum

This test is intended to demonstrate that problems which do very little work in each task are faster to compute in a single thread than via the ForkJoin framework.

This was indeed the case on my test system; the serial implementation completed in 5msecs while the parallel implementation consisting of 32 tasks required 20msecs.

This is really to be expected. Clearly using parallelism to generate many trivial tasks is not effective - so don’t do that. From javadoc for ForkJoinTask:

ForkJoinTasks should perform relatively small amounts of computation. Large tasks should be split into smaller subtasks, usually via recursive decomposition. As a very rough rule of thumb, a task should perform more than 100 and less than 10000 basic computational steps, and should avoid indefinite looping. If tasks are too big, then parallelism cannot improve throughput. If too small, then memory and internal task maintenance overhead may overwhelm processing.

Could an alternative design perform better? Possibly. For developers who have parallel problems, and where the built-in fork/join implementation is not fast enough to satisfy their performance requirements, alternative implementations could be worth trying - though of course the cost in time and complexity of adding an external lib to the application need to be considered. For developers where the builtin implementation is fast enough, well - I would suggest it is probably best to just go with it, regardless of whether more efficient alternatives exist.

Test: MultiCompletables

This application promptly triggers an exception under Java8:

  java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
	at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2011)

The cause is described earlier in the article: blocking synchronization primitives from package java.util.concurrent try to be helpful by spawning a new thread in the threadpool in order to compensate for the fact that the calling thread is about to block. Probably a bad idea on the part of the package designers. Solution: design your fork/join task implementation so that the overall set of tasks don’t make too many such blocking calls. Or simply avoid the fork/join framework for algorithms that require significant inter-task synchronization.

Verifying Join Behaviour

The following program was used to verify the described behaviour of method ForkJoinTask.join. It creates a ForkJoinPool with a single thread in it, then creates a large number of tasks; each task forks a single child task then joins on it, ie creates a “tree” of tasks N elements deep.

With Java1.8 the result of ForkJoinTask.getPool().getActiveThreadCount() is 1 as expected; no additional threads have been created. However the stack trace at the leaf task (the one with no children) is N*5 method calls deep, and is of the following pattern:

        ...
	at net.vonos.ForkJoinDemo$NestingTask.compute(ForkJoinDemo.java:29)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:389)
	at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
	at net.vonos.ForkJoinDemo$NestingTask.compute(ForkJoinDemo.java:29)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:389)
	at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
	at net.vonos.ForkJoinDemo$NestingTask.compute(ForkJoinDemo.java:29)
        ....

A call to join on a task which is not yet started causes the calling thread to execute some other queued task, as described earlier - method compute calls join which calls three internal fork/join framework methods which then calls compute on the next task.

With the older Oracle Java 1.7.0_09, I found the same behaviour. The stack depth was only N*4 rather than N*5 (one less internal method in the fork/join framework), but the number of threads was still one - there was no trace of the “thread explosion” problem or of the “continuation threads” described in the Calamity article.

package net.vonos;

import java.util.concurrent.*;

/**
 * Demonstrate the behaviour of method ForkJoinPool.join().
 */
public class ForkJoinDemo {

    public static class NestingTask extends RecursiveAction {
        private final int count;
        NestingTask(int count) {
            this.count = count;
        }

        @Override
        protected void compute() {
            if (count == 0) {
                System.out.println("nthreads:" + this.getPool().getActiveThreadCount());

                Throwable t = new Throwable();
                t.fillInStackTrace();

                StackTraceElement stackInfo[] = t.getStackTrace();
                System.out.println("stack depth:" + stackInfo.length);
                System.out.println(t);
                t.printStackTrace();

            } else {
                NestingTask child = new NestingTask(count-1);
                child.fork();
                child.join();
            }
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(1);
        NestingTask baseTask = new NestingTask(200);
        pool.invoke(baseTask);
    }
}

References

Actually, the javadoc for ForkJoinPool and ForkJoinTask are probably the best references available on this topic.