Skip to content
Snippets Groups Projects
Commit 0fb03265 authored by brinn's avatar brinn
Browse files

change: improve handling of IProcessIOHandler-based process I/O

add: method readBytesIfAvailable(), readTextIfAvailable()

SVN: 19966
parent a86ae744
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
......@@ -52,7 +53,10 @@ import ch.systemsx.cisd.common.utilities.ITerminable;
public final class ProcessExecutionHelper
{
private static final int BUFFER_SIZE = 4096;
/**
* A good size for I/O buffers.
*/
public static final int RECOMMENDED_BUFFER_SIZE = 4096;
/**
* Strategy on whether to read the process output or not.
......@@ -74,15 +78,28 @@ public final class ProcessExecutionHelper
/**
* Role for handling the process I/O.
* <p>
* Recommended pattern for output-only processes in
* {@link IProcessIOHandler#handle(AtomicBoolean, OutputStream, InputStream, InputStream)}:
*
* <pre>
* while (processRunning.get())
* {
* ProcessExecutionHelper.readBytesIfAvailable(stdout, out, buf, -1, false);
* ProcessExecutionHelper.readTextIfAvailable(bufStderr, errLines, false);
* }
* </pre>
*/
public interface IProcessIOHandler
{
/**
* Method that gets the process' <code>stdin</code>, <code>stdout</code> and
* <code>stderr</code> and is expected to handlt the I/O of the process.
* <code>stderr</code> and is expected to handlt the I/O of the process. If
* <var>processRunning</var> is <code>false</code>, the process has been finished or
* terminated.
*/
public void handle(OutputStream stdin, InputStream stdout, InputStream stderr)
throws IOException;
public void handle(AtomicBoolean processRunning, OutputStream stdin, InputStream stdout,
InputStream stderr) throws IOException;
}
/**
......@@ -98,9 +115,16 @@ public final class ProcessExecutionHelper
private final ByteArrayOutputStream processBinaryOutput;
ProcessRecord(final Process process)
private final Future<?> processIOFutureOrNull;
private final AtomicBoolean processRunningForIO;
ProcessRecord(final Process process, final Future<?> processIOFutureOrNull,
final AtomicBoolean processRunningForIO)
{
this.process = process;
this.processRunningForIO = processRunningForIO;
this.processIOFutureOrNull = processIOFutureOrNull;
if (binaryOutput)
{
this.processTextOutput = null;
......@@ -114,6 +138,16 @@ public final class ProcessExecutionHelper
}
}
Future<?> tryGetProcessIOFuture()
{
return processIOFutureOrNull;
}
AtomicBoolean getProcessRunningForIO()
{
return processRunningForIO;
}
Process getProcess()
{
return process;
......@@ -489,17 +523,18 @@ public final class ProcessExecutionHelper
/**
* Reads the <code>stdout</code> and <code>stderr</code> of <var>process</var>.
*/
private final void readProcessOutput(final ProcessRecord processRecord, final boolean discard)
private final void readProcessOutput(final ProcessRecord processRecord, final byte[] buffer,
final boolean discard)
{
readProcessOutput(processRecord, -1, discard);
readProcessOutput(processRecord, buffer, -1, discard);
}
/**
* Reads the <code>stdout</code> and <code>stderr</code> of <var>process</var>. If
* <code>maxBytes > 0</code>, read not more than so many bytes.
*/
private final void readProcessOutput(final ProcessRecord processRecord, final long maxBytes,
final boolean discard)
private final void readProcessOutput(final ProcessRecord processRecord, final byte[] buffer,
final long maxBytes, final boolean discard)
{
assert processRecord != null;
assert machineLog != null;
......@@ -509,7 +544,8 @@ public final class ProcessExecutionHelper
{
try
{
copy(process, processRecord, maxBytes, discard);
readBytesIfAvailable(process.getInputStream(),
processRecord.getBinaryProcessOutput(), buffer, maxBytes, discard);
} catch (final IOException e)
{
machineLog.warn(String.format("IOException when reading stdout/stderr, msg='%s'.",
......@@ -528,25 +564,6 @@ public final class ProcessExecutionHelper
}
}
protected void copy(final Process process, final ProcessRecord processRecord,
final long maxBytes, final boolean discard) throws IOException
{
final InputStream input = process.getInputStream();
final OutputStream output = processRecord.getBinaryProcessOutput();
final byte[] buffer = new byte[BUFFER_SIZE];
long count = 0;
int n = 0;
while ((maxBytes <= 0 || count < maxBytes) && input.available() > 0
&& -1 != (n = input.read(buffer)))
{
if (discard == false)
{
output.write(buffer, 0, n);
}
count += n;
}
}
/**
* Returns the <code>stdout</code> and <code>stderr</code> of the <var>process</var> in
* <var>processRecord.getProcessOutput()</var>.
......@@ -560,18 +577,7 @@ public final class ProcessExecutionHelper
try
{
while (reader.ready())
{
final String line = reader.readLine();
if (line == null)
{
break;
}
if (discard == false)
{
processOutput.add(line);
}
}
readTextIfAvailable(reader, processOutput, discard);
} catch (final IOException e)
{
machineLog.warn(String.format("IOException when reading stdout/stderr, msg='%s'.",
......@@ -579,6 +585,58 @@ public final class ProcessExecutionHelper
}
}
/**
* Helper method for non-blocking reading text from a reader and add it to a list of strings,
* or, alternatively, discard it.
*
* @param outputOrNull Must not be <code>null</code> if <var>discard</var> is <code>false</code>
* .
* @return <code>true</code> if reader is end-of-file and <code>false</code> otherwise.
*/
public static boolean readTextIfAvailable(final BufferedReader reader,
final List<String> outputOrNull, final boolean discard) throws IOException
{
while (reader.ready())
{
final String line = reader.readLine();
if (line == null)
{
return true;
}
if (discard == false)
{
outputOrNull.add(line);
}
}
return true;
}
/**
* Helper method for non-blocking reading of bytes from an input stream, if available, and
* writing them to an output stream, or, alternatively, discard them.
*
* @param outputOrNull Must not be <code>null</code> if <var>discard</var> is <code>false</code>
* .
* @return The number of bytes being read in this call.
*/
public static long readBytesIfAvailable(final InputStream input,
final OutputStream outputOrNull, final byte[] buffer, final long maxBytes,
final boolean discard) throws IOException
{
long count = 0;
int n = 0;
while ((maxBytes <= 0 || count < maxBytes) && input.available() > 0
&& -1 != (n = input.read(buffer)))
{
if (discard == false)
{
outputOrNull.write(buffer, 0, n);
}
count += n;
}
return count;
}
/**
* Returns <code>true</code>, if an I/O handler for the process I/O is available.
*/
......@@ -619,46 +677,50 @@ public final class ProcessExecutionHelper
final Process process = launch();
try
{
ProcessRecord processRecord = new ProcessRecord(process);
processWrapper.set(processRecord);
int exitValue = ProcessResult.NO_EXIT_VALUE;
if (hasIOHandler() == false)
{
final boolean discardOutput =
(outputReadingStrategy == OutputReadingStrategy.NEVER);
final ProcessRecord processRecord = new ProcessRecord(process, null, null);
processWrapper.set(processRecord);
final byte[] buffer = new byte[RECOMMENDED_BUFFER_SIZE];
while (exitValue == ProcessResult.NO_EXIT_VALUE)
{
readProcessOutput(processRecord, discardOutput);
readProcessOutput(processRecord, buffer, discardOutput);
exitValue = getExitValue(process);
if (exitValue == ProcessResult.NO_EXIT_VALUE)
{
ConcurrencyUtilities.sleep(PAUSE_MILLIS);
}
}
readProcessOutput(processRecord, buffer, discardOutput);
processWrapper.set(null);
readProcessOutput(processRecord, discardOutput);
if (binaryOutput)
{
return new ProcessResult(commandLine, processNumber,
ExecutionStatus.COMPLETE, "", exitValue, processRecord
ExecutionStatus.COMPLETE, null, "", exitValue, processRecord
.getBinaryProcessOutput().toByteArray(),
processRecord.getErrorProcessOutput(), operationLog, machineLog);
} else
{
return new ProcessResult(commandLine, processNumber,
ExecutionStatus.COMPLETE, "", exitValue,
ExecutionStatus.COMPLETE, null, "", exitValue,
processRecord.getTextProcessOutput(), operationLog, machineLog);
}
} else
{
final AtomicBoolean processRunning = new AtomicBoolean(true);
final Future<?> future = executor.submit(new Runnable()
{
public void run()
{
try
{
processIOHandlerOrNull.handle(process.getOutputStream(),
processIOHandlerOrNull.handle(processRunning,
process.getOutputStream(),
process.getInputStream(), process.getErrorStream());
} catch (IOException ex)
{
......@@ -666,37 +728,19 @@ public final class ProcessExecutionHelper
}
}
});
final ProcessRecord processRecord =
new ProcessRecord(process, future, processRunning);
processWrapper.set(processRecord);
exitValue = process.waitFor();
processWrapper.set(null);
final ExecutionResult<?> result =
ConcurrencyUtilities.getResult(future, SHORT_TIMEOUT);
switch (result.getStatus())
{
case COMPLETE:
break;
case EXCEPTION:
final Throwable th = result.tryGetException();
final Throwable cause =
(th == null) ? new RuntimeException("Unknown exception.")
: (th instanceof Error) ? (Error) th
: CheckedExceptionTunnel
.unwrapIfNecessary((Exception) th);
machineLog
.warn(String
.format("Exception when reading stdout/stderr, type='%s', msg='%s'.",
cause.getClass().getSimpleName(),
cause.getMessage()));
break;
case INTERRUPTED:
machineLog.warn("Interrupted when reading stdout/stderr.");
break;
case TIMED_OUT:
machineLog.warn("Timeout when reading stdout/stderr.");
break;
}
final boolean stillInCharge = (processWrapper.getAndSet(null) != null);
final ExecutionResult<?> processIOResultOrNull =
stillInCharge ? tryGetAndLogProcessIOResult(processRecord,
SHORT_TIMEOUT) : null;
return new ProcessResult(commandLine, processNumber,
ExecutionStatus.COMPLETE, "", exitValue, null, operationLog,
machineLog);
ExecutionStatus.COMPLETE, processIOResultOrNull, "", exitValue,
null, operationLog, machineLog);
}
} finally
{
......@@ -719,6 +763,7 @@ public final class ProcessExecutionHelper
return callingThreadName + "::run-P" + processNumber + "-{"
+ getCommandName(commandLine) + "}";
}
}
/**
......@@ -744,6 +789,8 @@ public final class ProcessExecutionHelper
final ProcessRecord processRecord = processWrapper.getAndSet(null);
if (processRecord != null)
{
final ExecutionResult<?> processIOResultOrNull =
tryGetAndLogProcessIOResult(processRecord, SHORT_TIMEOUT / 4);
final Process process = processRecord.getProcess();
process.destroy(); // Note: this also closes the I/O streams.
if (machineLog.isInfoEnabled())
......@@ -753,12 +800,14 @@ public final class ProcessExecutionHelper
final int exitValue = getExitValue(processRecord.getProcess());
if (binaryOutput)
{
return new ProcessResult(commandLine, processNumber, status, "", exitValue,
processRecord.getBinaryProcessOutput().toByteArray(),
return new ProcessResult(commandLine, processNumber, status,
processIOResultOrNull, "", exitValue, processRecord
.getBinaryProcessOutput().toByteArray(),
processRecord.getErrorProcessOutput(), operationLog, machineLog);
} else
{
return new ProcessResult(commandLine, processNumber, status, "", exitValue,
return new ProcessResult(commandLine, processNumber, status,
processIOResultOrNull, "", exitValue,
processRecord.getTextProcessOutput(), operationLog, machineLog);
}
} else
......@@ -828,6 +877,44 @@ public final class ProcessExecutionHelper
};
}
ExecutionResult<?> tryGetAndLogProcessIOResult(ProcessRecord record, long timeout)
{
if (record.tryGetProcessIOFuture() == null)
{
return null;
}
record.getProcessRunningForIO().set(false);
final ExecutionResult<?> processIOResult =
ConcurrencyUtilities.getResult(record.tryGetProcessIOFuture(), timeout);
logProcessIOFailures(processIOResult);
return processIOResult;
}
void logProcessIOFailures(final ExecutionResult<?> processIOResult)
{
switch (processIOResult.getStatus())
{
case COMPLETE:
break;
case EXCEPTION:
final Throwable th = processIOResult.tryGetException();
final Throwable cause =
(th == null) ? new RuntimeException("Unknown exception.")
: (th instanceof Error) ? (Error) th : CheckedExceptionTunnel
.unwrapIfNecessary((Exception) th);
machineLog.warn(String.format(
"Exception when doing process I/O, type='%s', msg='%s'.", cause.getClass()
.getSimpleName(), cause.getMessage()));
break;
case INTERRUPTED:
machineLog.warn("Interrupted when doing process I/O.");
break;
case TIMED_OUT:
machineLog.warn("Timeout when doing process I/O.");
break;
}
}
private final ProcessResult run(final boolean stopOnInterrupt)
{
final Future<ProcessResult> runnerFuture = launchProcessExecutor();
......@@ -870,12 +957,12 @@ public final class ProcessExecutionHelper
}
if (binaryOutput)
{
return new ProcessResult(commandLine, processNumber, status,
return new ProcessResult(commandLine, processNumber, status, null,
tryGetStartupFailureMessage(executionResult.tryGetException()),
ProcessResult.NO_EXIT_VALUE, null, null, operationLog, machineLog);
} else
{
return new ProcessResult(commandLine, processNumber, status,
return new ProcessResult(commandLine, processNumber, status, null,
tryGetStartupFailureMessage(executionResult.tryGetException()),
ProcessResult.NO_EXIT_VALUE, null, operationLog, machineLog);
}
......
......@@ -24,6 +24,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import ch.systemsx.cisd.base.utilities.OSUtilities;
import ch.systemsx.cisd.common.concurrent.ExecutionResult;
import ch.systemsx.cisd.common.concurrent.ExecutionStatus;
/**
......@@ -82,6 +83,8 @@ public final class ProcessResult
private final boolean isBinaryOutput;
private final ExecutionResult<?> processIOResultOrNull;
/**
* Returns <code>true</code> if the <var>exitValue</var> indicates that the process has been
* terminated on the Operating System level.
......@@ -103,14 +106,16 @@ public final class ProcessResult
}
public ProcessResult(final List<String> commandLine, final int processNumber,
final ExecutionStatus status, final String startupFailureMessageOrNull,
final int exitValue, final List<String> processOutputOrNull, final Logger operationLog,
final ExecutionStatus status, ExecutionResult<?> processIOResult,
final String startupFailureMessageOrNull, final int exitValue,
final List<String> processOutputOrNull, final Logger operationLog,
final Logger machineLog)
{
this.commandLine = commandLine;
this.commandName = ProcessExecutionHelper.getCommandName(commandLine);
this.processNumber = processNumber;
this.status = status;
this.processIOResultOrNull = processIOResult;
this.startupFailureMessage =
(startupFailureMessageOrNull == null) ? "" : startupFailureMessageOrNull;
this.exitValue = exitValue;
......@@ -132,15 +137,16 @@ public final class ProcessResult
}
public ProcessResult(final List<String> commandLine, final int processNumber,
final ExecutionStatus status, final String startupFailureMessageOrNull,
final int exitValue, final byte[] processBinaryOutputOrNull,
final List<String> processErrorOutputOrNull, final Logger operationLog,
final Logger machineLog)
final ExecutionStatus status, ExecutionResult<?> processIOResult,
final String startupFailureMessageOrNull, final int exitValue,
final byte[] processBinaryOutputOrNull, final List<String> processErrorOutputOrNull,
final Logger operationLog, final Logger machineLog)
{
this.commandLine = commandLine;
this.commandName = ProcessExecutionHelper.getCommandName(commandLine);
this.processNumber = processNumber;
this.status = status;
this.processIOResultOrNull = processIOResult;
this.startupFailureMessage =
(startupFailureMessageOrNull == null) ? "" : startupFailureMessageOrNull;
this.exitValue = exitValue;
......@@ -247,6 +253,18 @@ public final class ProcessResult
return output;
}
/**
* Returns the IO process result, or <code>null</code>, if this information is not available.
* <p>
* <b>Note that {@link ExecutionStatus#COMPLETE} does <i>not</i> necessarily mean that all
* output has been read from the process. You need to check {@link #isTimedOut()} in addition to
* see whether the process got terminated due to a timeout condition.</b>
*/
public ExecutionResult<?> tryGetProcessIOResult()
{
return processIOResultOrNull;
}
/**
* Returns the exit value of the process, or {@link #NO_EXIT_VALUE}, if the value is not
* available.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment