Categories: Java
Overview
Sometimes it is necessary to start a local native-code application from a Java application. The Java standard library provides basic support for this, but it has two problems:
- the Java app should read from the app’s stdout and stderr while simultaneously blocking on the application exit status; and
- the code should be unit-testable.
The requirement to do two or three things at once requires either threads or some very clever selects on filedescriptors; the standard API does not help with this. And the standard API is extremely unit-testing-unfriendly, being based on a set of concrete and final classes.
There are a couple of execution-libraries available, including:
The above libraries are good, but “overkill” for simpler requirements. Rather than add an extra dependency to my current project, I implemented a basic wrapper around the standard process-execution APIs which has been successfully used in production. Here is the code..
Why Reading Process Output is Important
The standard way to pass data between applications under Unix is a pipe
, eg :
grep wanted somefile.txt | wc -l
The writing process sees the pipe as a file it can write to, and the reading process sees it as a file it can read from. However a pipe has some unusual features, most important of which is that it has a limited size. When a pipe is full and an application tries to write to it, it blocks until either some other process reads from the pipe (freeing up space) or all readers of the pipe exit. In normal use, this elegantly provides a kind of performance-throttling for applications in the pipeline, automatically handling cases where one application in the pipeline naturally runs faster than others. However in our use-case the Java application is typically both a writer into the pipeline and a reader from it. If the parent Java process does not read regularly from the external process STDOUT and STDERR, and the external app writes at least one pipe-full of data (typically 4KB) then it will block - forever.
The code below deals with this by:
- accepting a closure which should be invoked for each line of data from STDOUT/STDERR, and
- spawning threads to read from STDOUT/STDERR and execute the closure.
Of course the closure must be thread-safe.
A Simple Process Management Framework
ProcessDesc wraps java.lang.ProcessBuilder (but is an interface rather than a concrete final class):
// Author: Simon Kitching
// This code is in the public domain
package net.vonos.process;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
* An interface equivalent to java.lang.ProcessBuilder. See ProcessFactory for more information.
*/
public interface ProcessDesc {
/** Sets the application to start, and its commandline-arguments. */
ProcessDesc command(List<String> args);
/** Sets the current-working-directory for the new process. */
ProcessDesc directory(File dir);
/** Sets a callback which is invoked for each LF-terminated line that the process writes to its STDOUT. */
ProcessDesc inputConsumer(Consumer<String> consumer);
/** Sets a callback which is invoked for each LF-terminated line that the process writes to its STDERR. */
ProcessDesc errorConsumer(Consumer<String> consumer);
/** Start the external process (which also starts callbacks to the inputConsumer and errorConsumer). */
ProcessHandle start() throws IOException;
}
ProcessHandle wraps java.lang.Process (but is an interface rather than a concrete final class):
package net.vonos.process;
import java.util.concurrent.TimeUnit;
/**
* An interface equivalent to java.lang.Process. See ProcessFactory for more information.
* <p>
* There are no getInputStream() or getErrorStream() methods here because the ProcessDesc class takes
* "consumer" objects as parameters instead; that is a safer way to handle processing of output from
* an external process.
* </p>
*/
public interface ProcessHandle {
/**
* Return true if the process is still running.
*/
boolean isAlive();
/**
* Block until the process terminates or the timeout expires.
*
* @return true when the process has terminated (ie false when timeout expired)
*/
boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Return the exit-status of the application. Only valid after waitFor(..) returns true.
*/
int exitValue();
/**
* Terminate the process.
* <p>
* Note: on unix this sends a SIGTERM to the process. If the started application spawns child processes, then the
* parent should trap SIGTERM and explicitly forward it to the child processes, eg as described here:
* http://veithen.github.io/2014/11/16/sigterm-propagation.html
* </p>
* <p>
* If the started process is a shell which starts exactly one child process then instead of catching/relaying
* signals it may use "exec ..." to replace itself with another process.
* </p>
*/
void destroyForcibly();
}
ProcessFactory has no equivalent in the standard Java APIs; this is an interface through which ProcessDesc instances can be created:
package net.vonos.process;
/**
* Interface through which external processes can be started.
* <p>
* The standard Java ProcessBuilder class is unfortunately extremely unit-test-unfriendly; this is better.
* </p>
*/
public interface ProcessFactory {
ProcessDesc newProcessDesc();
}
InputHandler is a thread which reads from a pipe and executes a closure per line of input:
package net.vonos.process;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.function.Consumer;
/**
* Continuously reads data from an input-stream, splits it into lines, and passes it to a Consumer object.
* <p>
* External processes need to have their input and error streams regularly read, as they may be fixed-size
* buffers which will cause the external process to block when they are full. An instance of this type,
* run as a thread, will ensure that is done - and each line is passed to the specified consumer (if any).
* When no consumer is set, the data read is just discarded.
* </p>
* <p>
* This thread terminates automatically when EOF is encountered - which is guaranteed when the external
* process being read from has been terminated.
* </p>
*/
public class InputHandler extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(InputHandler.class);
// Stream to read from
private final InputStream is;
// Consumer to pass each line to (may be null)
private final Consumer<String> consumer;
private final BufferedReader reader;
/** Constructor. */
public InputHandler(InputStream is, Consumer<String> consumer) {
super("Process input"); // name thread for debugging purposes
this.is = is;
this.consumer = consumer;
this.reader = new BufferedReader(new InputStreamReader(is));
}
/**
* Thread run method, which keeps processing output from the inputstream until EOF or exception.
*/
@Override
public void run() {
try {
for(;;) {
String line = reader.readLine();
if (line == null) {
break; // EOF
}
if (consumer != null) {
consumer.accept(line);
}
}
} catch (IOException e) {
LOG.info("IOException", e);
} finally {
try {
reader.close();
} catch(IOException e) {
LOG.warn("Exception while closing reader", e);
}
}
}
}
ProcessFactoryImpl provides implementations of all the above interfaces, allowing ProcessDesc instances to be created, configured with handlers for their STDOUT and STDERR, then started.
package net.vonos.process;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* A simple factory for ProcessBuilder instances and its related types.
* <p>
* The inner-classes defined here are 1:1 wrappers around java.lang.ProcessBuilder and java.lang.Process. However
* unlike the standard classes, these implement interfaces so they can be mocked for testing; the native classes
* are both concrete and final, so any code that uses them directly cannot be unit-tested with Mockito or similar.
* </p>
*/
@Component
public class ProcessFactoryImpl implements ProcessFactory {
private static final Logger LOG = LoggerFactory.getLogger(ProcessFactoryImpl.class);
public ProcessDesc newProcessDesc() {
return new ProcessDescImpl();
}
/** Wraps a java.lang.ProcessBuilder instance. */
private static class ProcessDescImpl implements ProcessDesc {
private final ProcessBuilder builder = new ProcessBuilder();
private Consumer<String> inputConsumer;
private Consumer<String> errorConsumer;
@Override
public ProcessDesc command(List<String> args) {
builder.command(args);
return this;
}
@Override
public ProcessDesc directory(File dir) {
builder.directory(dir);
return this;
}
@Override
public ProcessDesc inputConsumer(Consumer<String> consumer) {
this.inputConsumer = consumer;
return this;
}
@Override
public ProcessDesc errorConsumer(Consumer<String> consumer) {
this.errorConsumer = consumer;
return this;
}
@Override
public ProcessHandle start() throws IOException {
return new ProcessHandleImpl(builder, inputConsumer, errorConsumer);
}
}
/** Wraps a java.lang.ProcessBuilder instance. */
private static class ProcessHandleImpl implements ProcessHandle {
private final Process process;
private final InputHandler inputConsumer;
private final InputHandler errorConsumer;
ProcessHandleImpl(ProcessBuilder builder, Consumer<String> inputConsumer, Consumer<String> errorConsumer)
throws IOException {
process = builder.start();
this.inputConsumer = new InputHandler(process.getInputStream(), inputConsumer);
this.errorConsumer = new InputHandler(process.getErrorStream(), errorConsumer);
this.inputConsumer.start();
this.errorConsumer.start();
}
@Override
public boolean isAlive() {
return process.isAlive();
}
@Override
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
boolean finished = process.waitFor(timeout, unit);
if (finished) {
inputConsumer.join();
errorConsumer.join();
try {
process.getInputStream().close();
process.getOutputStream().close();
process.getErrorStream().close();
} catch(IOException e) {
LOG.warn("Exception while closing process streams", e);
}
}
return finished;
}
@Override
public int exitValue() {
return process.exitValue();
}
@Override
public void destroyForcibly() {
// Note: on unix this sends a SIGTERM to the process.
// If the started application spawns child processes, then the parent should trap SIGTERM and
// explicitly forward it to the child processes, eg as described here:
// http://veithen.github.io/2014/11/16/sigterm-propagation.html
process.destroyForcibly();
}
}
}
ProcessMock is a test-helper class which provides implementations of all the above interfaces which are suitable for unit-testing; the implementations can be preconfigured with desired behaviour and then during testing they capture their inputs for later assertions. Because both ProcessFactoryImpl and ProcessMock implement the ProcessFactory interface, code which uses ProcessFactory can be passed either the real or mock implementation without code-changes. This class is expected to be in the “test classpath” rather than the standard one.
package net.vonos.process;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Implementations of the ProcessFactory interface and related types which are convenient for unit-testing.
* <p>
* These implementations can be preconfigured to return specific values (like mock-objects) and save parameters
* passed to them for later verification by unit-test assertions.
* </p>
*/
public class ProcessMock {
// Simple factory methods
public static Factory newFactory(Desc... descs) {
Factory pf = new Factory();
pf.pds.addAll(Arrays.asList(descs));
return pf;
}
public static Desc newDesc(Handle... handles) {
Desc pd = new Desc();
pd.phs.addAll(Arrays.<Handle>asList(handles));
return pd;
}
public static Handle newHandle() {
return new Handle();
}
public static class Factory implements ProcessFactory {
// List of preconfigured objects to be returned from newProcessDesc.
private List<Desc> pds = new ArrayList<>();
private int pdIndex;
public Factory add(Desc pd) {
pds.add(pd);
return this;
}
@Override
public ProcessDesc newProcessDesc() {
if (pdIndex < pds.size()) {
return pds.get(pdIndex++);
}
throw new IllegalStateException("No more pds available");
}
}
public static class Desc implements ProcessDesc {
// Mocked outputs
private List<Handle> phs = new ArrayList<>();
private int phIndex;
// Captured Inputs
private List<String> command;
private File dir;
private Consumer<String> inputConsumer;
private Consumer<String> errorConsumer;
// Configuration methods
public void add(Handle ph) {
phs.add(ph);
}
public Handle mockHandle() {
Handle ph = new Handle();
phs.add(ph);
return ph;
}
// Query methods
public List<String> command() {
return command;
}
public File directory() {
return dir;
}
// ProcessDesc implementations
@Override
public ProcessDesc command(List<String> args) {
this.command = new ArrayList<>(args);
return this;
}
@Override
public ProcessDesc directory(File dir) {
this.dir = dir;
return this;
}
@Override
public ProcessDesc inputConsumer(Consumer<String> consumer) {
this.inputConsumer = consumer;
return this;
}
@Override
public ProcessDesc errorConsumer(Consumer<String> consumer) {
this.errorConsumer = consumer;
return this;
}
@Override
public ProcessHandle start() throws IOException {
if (phIndex < phs.size()) {
Handle ph = phs.get(phIndex++);
ph.init(inputConsumer, errorConsumer);
return ph;
}
throw new IllegalStateException("No more phs available");
}
}
public static class Handle implements ProcessHandle {
// Mocked Outputs
private int exitValue;
private long durationMillis;
private List<String> inputs;
private List<String> errors;
private long terminatesAt;
// Captured Inputs
private boolean isStarted;
private boolean isDestroyed;
// Configuration methods
public Handle exitValue(int val) {
exitValue = val;
return this;
}
public Handle duration(long timeout, TimeUnit unit) {
durationMillis = unit.toMillis(timeout);
return this;
}
public Handle inputs(List<String> val) {
inputs = val;
return this;
}
public Handle errors(List<String> val) {
errors = val;
return this;
}
void init(Consumer<String> inputConsumer, Consumer<String> errorConsumer) {
isStarted = true;
terminatesAt = System.currentTimeMillis() + durationMillis;
if (inputs != null) {
// Assume inputConsumer is not null; if a test defines inputs it is assuming there is something
// to consume it, and it would be a valid test failure if there is no such consumer.
inputs.forEach(inputConsumer::accept);
}
if (errors != null) {
// Assume errorConsumer is not null; if a test defines inputs it is assuming there is something
// to consume it, and it would be a valid test failure if there is no such consumer.
errors.forEach(errorConsumer::accept);
}
}
// Query methods
public boolean isStarted() {
return isStarted;
}
public boolean isDestroyed() {
return isDestroyed;
}
// ProcessHandle implementations
@Override
public boolean isAlive() {
return System.currentTimeMillis() < terminatesAt;
}
@Override
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
long untilTermination = terminatesAt - System.currentTimeMillis();
long millisToSleep = Math.min(unit.toMillis(timeout), untilTermination);
if (millisToSleep > 0) {
Thread.sleep(millisToSleep);
}
return !isAlive(); // finished --> true
}
@Override
public int exitValue() {
return exitValue;
}
@Override
public void destroyForcibly() {
isDestroyed = true;
}
}
}
Using the Framework
How to use the interfaces to start processes and manage their output from regular code should be obvious.
How to use the ProcessMock class is perhaps not quite so obvious, so here is some example unit-test code:
@Test
public void testSomeMethodWhichInvokesExternalProcess() throws Exception {
// Configure the emulated STDOUT output of the first process started
// The exit-status is not defined, so defaults to immediately returning 0 (success)
ProcessHandle process1 = ProcessMock.newHandle().inputs(Arrays.asList("stdout-line1", "stdout-line2", "stdout-line3"));
ProcessDesc process1Desc = ProcessMock.newDesc(process1);
// Configure the emulated STDOUT output of the second process started
// The exit-status is not defined, so defaults to immediately returning 0 (success)
ProcessMock.Handle process2 = ProcessMock.newHandle().inputs("just-one-line-of-stdout");
ProcessMock.Desc process2Desc = ProcessMock.newDesc(process2);
// Configure a processFactory which first returns process1 then process2.
ProcessFactory processFactory = ProcessMock.newFactory().add(process1).add(process2);
// Invoke method-under-test. This will:
// * call processFactory.newProcessDesc() to obtain a ProcessDesc
// * set arguments on the ProcessDesc then call start() on it to obtain a ProcessHandle
// * call ProcessHandle.waitFor()
// And then repeat the above for a second process.
objectToTest.methodWhichStartsProcess(processFactory, ...);
// Verify that the method-under-test really did obtain and start a process, and that the commandline it used
// had the expected format.
//
// In a real test, there would also be assertions here that the emulated outputs of process1 had the expected
// effect on objectToTest (whatever that might be)...
Assert.assertEquals(true, process1.isStarted());
Assert.assertEquals("/path/to/process1::someArg1::someArg2", String.join("::", process1Desc.command()));
// Verify that the method-under-test obtained and started a second process, and that the commandline it used
// had the expected format.
Assert.assertEquals(true, process2.isStarted());
Assert.assertEquals("/path/to/process2::someArg", String.join("::", process2Desc.command()));
}
One limitation of ProcessMock is that it does not emulate the threaded behaviour of the STDOUT/STDERR callbacks, ie the callbacks are run in the test thread, and thus race-conditions will not be detected.