From 0fb03265e423259229ca220b413679f7f470a2d4 Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Tue, 15 Feb 2011 12:00:44 +0000
Subject: [PATCH] change: improve handling of IProcessIOHandler-based process
 I/O add: method readBytesIfAvailable(), readTextIfAvailable()

SVN: 19966
---
 .../process/ProcessExecutionHelper.java       | 253 ++++++++++++------
 .../cisd/common/process/ProcessResult.java    |  30 ++-
 2 files changed, 194 insertions(+), 89 deletions(-)

diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java
index 7ebf7249615..9d7a44fd046 100644
--- a/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java
+++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -52,7 +53,10 @@ import ch.systemsx.cisd.common.utilities.ITerminable;
 public final class ProcessExecutionHelper
 {
 
-    private static final int BUFFER_SIZE = 4096;
+    /**
+     * A good size for I/O buffers.
+     */
+    public static final int RECOMMENDED_BUFFER_SIZE = 4096;
 
     /**
      * Strategy on whether to read the process output or not.
@@ -74,15 +78,28 @@ public final class ProcessExecutionHelper
 
     /**
      * Role for handling the process I/O.
+     * <p>
+     * Recommended pattern for output-only processes in
+     * {@link IProcessIOHandler#handle(AtomicBoolean, OutputStream, InputStream, InputStream)}:
+     * 
+     * <pre>
+     * while (processRunning.get())
+     * {
+     *     ProcessExecutionHelper.readBytesIfAvailable(stdout, out, buf, -1, false);
+     *     ProcessExecutionHelper.readTextIfAvailable(bufStderr, errLines, false);
+     * }
+     * </pre>
      */
     public interface IProcessIOHandler
     {
         /**
          * Method that gets the process' <code>stdin</code>, <code>stdout</code> and
-         * <code>stderr</code> and is expected to handlt the I/O of the process.
+         * <code>stderr</code> and is expected to handlt the I/O of the process. If
+         * <var>processRunning</var> is <code>false</code>, the process has been finished or
+         * terminated.
          */
-        public void handle(OutputStream stdin, InputStream stdout, InputStream stderr)
-                throws IOException;
+        public void handle(AtomicBoolean processRunning, OutputStream stdin, InputStream stdout,
+                InputStream stderr) throws IOException;
     }
 
     /**
@@ -98,9 +115,16 @@ public final class ProcessExecutionHelper
 
         private final ByteArrayOutputStream processBinaryOutput;
 
-        ProcessRecord(final Process process)
+        private final Future<?> processIOFutureOrNull;
+
+        private final AtomicBoolean processRunningForIO;
+
+        ProcessRecord(final Process process, final Future<?> processIOFutureOrNull,
+                final AtomicBoolean processRunningForIO)
         {
             this.process = process;
+            this.processRunningForIO = processRunningForIO;
+            this.processIOFutureOrNull = processIOFutureOrNull;
             if (binaryOutput)
             {
                 this.processTextOutput = null;
@@ -114,6 +138,16 @@ public final class ProcessExecutionHelper
             }
         }
 
+        Future<?> tryGetProcessIOFuture()
+        {
+            return processIOFutureOrNull;
+        }
+
+        AtomicBoolean getProcessRunningForIO()
+        {
+            return processRunningForIO;
+        }
+
         Process getProcess()
         {
             return process;
@@ -489,17 +523,18 @@ public final class ProcessExecutionHelper
     /**
      * Reads the <code>stdout</code> and <code>stderr</code> of <var>process</var>.
      */
-    private final void readProcessOutput(final ProcessRecord processRecord, final boolean discard)
+    private final void readProcessOutput(final ProcessRecord processRecord, final byte[] buffer,
+            final boolean discard)
     {
-        readProcessOutput(processRecord, -1, discard);
+        readProcessOutput(processRecord, buffer, -1, discard);
     }
 
     /**
      * Reads the <code>stdout</code> and <code>stderr</code> of <var>process</var>. If
      * <code>maxBytes > 0</code>, read not more than so many bytes.
      */
-    private final void readProcessOutput(final ProcessRecord processRecord, final long maxBytes,
-            final boolean discard)
+    private final void readProcessOutput(final ProcessRecord processRecord, final byte[] buffer,
+            final long maxBytes, final boolean discard)
     {
         assert processRecord != null;
         assert machineLog != null;
@@ -509,7 +544,8 @@ public final class ProcessExecutionHelper
         {
             try
             {
-                copy(process, processRecord, maxBytes, discard);
+                readBytesIfAvailable(process.getInputStream(),
+                        processRecord.getBinaryProcessOutput(), buffer, maxBytes, discard);
             } catch (final IOException e)
             {
                 machineLog.warn(String.format("IOException when reading stdout/stderr, msg='%s'.",
@@ -528,25 +564,6 @@ public final class ProcessExecutionHelper
         }
     }
 
-    protected void copy(final Process process, final ProcessRecord processRecord,
-            final long maxBytes, final boolean discard) throws IOException
-    {
-        final InputStream input = process.getInputStream();
-        final OutputStream output = processRecord.getBinaryProcessOutput();
-        final byte[] buffer = new byte[BUFFER_SIZE];
-        long count = 0;
-        int n = 0;
-        while ((maxBytes <= 0 || count < maxBytes) && input.available() > 0
-                && -1 != (n = input.read(buffer)))
-        {
-            if (discard == false)
-            {
-                output.write(buffer, 0, n);
-            }
-            count += n;
-        }
-    }
-
     /**
      * Returns the <code>stdout</code> and <code>stderr</code> of the <var>process</var> in
      * <var>processRecord.getProcessOutput()</var>.
@@ -560,18 +577,7 @@ public final class ProcessExecutionHelper
 
         try
         {
-            while (reader.ready())
-            {
-                final String line = reader.readLine();
-                if (line == null)
-                {
-                    break;
-                }
-                if (discard == false)
-                {
-                    processOutput.add(line);
-                }
-            }
+            readTextIfAvailable(reader, processOutput, discard);
         } catch (final IOException e)
         {
             machineLog.warn(String.format("IOException when reading stdout/stderr, msg='%s'.",
@@ -579,6 +585,58 @@ public final class ProcessExecutionHelper
         }
     }
 
+    /**
+     * Helper method for non-blocking reading text from a reader and add it to a list of strings,
+     * or, alternatively, discard it.
+     * 
+     * @param outputOrNull Must not be <code>null</code> if <var>discard</var> is <code>false</code>
+     *            .
+     * @return <code>true</code> if reader is end-of-file and <code>false</code> otherwise.
+     */
+    public static boolean readTextIfAvailable(final BufferedReader reader,
+            final List<String> outputOrNull, final boolean discard) throws IOException
+    {
+        while (reader.ready())
+        {
+            final String line = reader.readLine();
+            if (line == null)
+            {
+                return true;
+            }
+            if (discard == false)
+            {
+                outputOrNull.add(line);
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Helper method for non-blocking reading of bytes from an input stream, if available, and
+     * writing them to an output stream, or, alternatively, discard them.
+     * 
+     * @param outputOrNull Must not be <code>null</code> if <var>discard</var> is <code>false</code>
+     *            .
+     * @return The number of bytes being read in this call.
+     */
+    public static long readBytesIfAvailable(final InputStream input,
+            final OutputStream outputOrNull, final byte[] buffer, final long maxBytes,
+            final boolean discard) throws IOException
+    {
+        long count = 0;
+        int n = 0;
+        while ((maxBytes <= 0 || count < maxBytes) && input.available() > 0
+                && -1 != (n = input.read(buffer)))
+        {
+            if (discard == false)
+            {
+                outputOrNull.write(buffer, 0, n);
+            }
+            count += n;
+        }
+        return count;
+    }
+
     /**
      * Returns <code>true</code>, if an I/O handler for the process I/O is available.
      */
@@ -619,46 +677,50 @@ public final class ProcessExecutionHelper
                 final Process process = launch();
                 try
                 {
-                    ProcessRecord processRecord = new ProcessRecord(process);
-                    processWrapper.set(processRecord);
-
                     int exitValue = ProcessResult.NO_EXIT_VALUE;
                     if (hasIOHandler() == false)
                     {
                         final boolean discardOutput =
                                 (outputReadingStrategy == OutputReadingStrategy.NEVER);
+                        final ProcessRecord processRecord = new ProcessRecord(process, null, null);
+
+                        processWrapper.set(processRecord);
+                        final byte[] buffer = new byte[RECOMMENDED_BUFFER_SIZE];
                         while (exitValue == ProcessResult.NO_EXIT_VALUE)
                         {
-                            readProcessOutput(processRecord, discardOutput);
+                            readProcessOutput(processRecord, buffer, discardOutput);
                             exitValue = getExitValue(process);
                             if (exitValue == ProcessResult.NO_EXIT_VALUE)
                             {
                                 ConcurrencyUtilities.sleep(PAUSE_MILLIS);
                             }
                         }
+                        readProcessOutput(processRecord, buffer, discardOutput);
                         processWrapper.set(null);
-                        readProcessOutput(processRecord, discardOutput);
+
                         if (binaryOutput)
                         {
                             return new ProcessResult(commandLine, processNumber,
-                                    ExecutionStatus.COMPLETE, "", exitValue, processRecord
+                                    ExecutionStatus.COMPLETE, null, "", exitValue, processRecord
                                             .getBinaryProcessOutput().toByteArray(),
                                     processRecord.getErrorProcessOutput(), operationLog, machineLog);
                         } else
                         {
                             return new ProcessResult(commandLine, processNumber,
-                                    ExecutionStatus.COMPLETE, "", exitValue,
+                                    ExecutionStatus.COMPLETE, null, "", exitValue,
                                     processRecord.getTextProcessOutput(), operationLog, machineLog);
                         }
                     } else
                     {
+                        final AtomicBoolean processRunning = new AtomicBoolean(true);
                         final Future<?> future = executor.submit(new Runnable()
                             {
                                 public void run()
                                 {
                                     try
                                     {
-                                        processIOHandlerOrNull.handle(process.getOutputStream(),
+                                        processIOHandlerOrNull.handle(processRunning,
+                                                process.getOutputStream(),
                                                 process.getInputStream(), process.getErrorStream());
                                     } catch (IOException ex)
                                     {
@@ -666,37 +728,19 @@ public final class ProcessExecutionHelper
                                     }
                                 }
                             });
+                        final ProcessRecord processRecord =
+                                new ProcessRecord(process, future, processRunning);
+
+                        processWrapper.set(processRecord);
                         exitValue = process.waitFor();
-                        processWrapper.set(null);
-                        final ExecutionResult<?> result =
-                                ConcurrencyUtilities.getResult(future, SHORT_TIMEOUT);
-                        switch (result.getStatus())
-                        {
-                            case COMPLETE:
-                                break;
-                            case EXCEPTION:
-                                final Throwable th = result.tryGetException();
-                                final Throwable cause =
-                                        (th == null) ? new RuntimeException("Unknown exception.")
-                                                : (th instanceof Error) ? (Error) th
-                                                        : CheckedExceptionTunnel
-                                                                .unwrapIfNecessary((Exception) th);
-                                machineLog
-                                        .warn(String
-                                                .format("Exception when reading stdout/stderr, type='%s', msg='%s'.",
-                                                        cause.getClass().getSimpleName(),
-                                                        cause.getMessage()));
-                                break;
-                            case INTERRUPTED:
-                                machineLog.warn("Interrupted when reading stdout/stderr.");
-                                break;
-                            case TIMED_OUT:
-                                machineLog.warn("Timeout when reading stdout/stderr.");
-                                break;
-                        }
+                        final boolean stillInCharge = (processWrapper.getAndSet(null) != null);
+
+                        final ExecutionResult<?> processIOResultOrNull =
+                                stillInCharge ? tryGetAndLogProcessIOResult(processRecord,
+                                        SHORT_TIMEOUT) : null;
                         return new ProcessResult(commandLine, processNumber,
-                                ExecutionStatus.COMPLETE, "", exitValue, null, operationLog,
-                                machineLog);
+                                ExecutionStatus.COMPLETE, processIOResultOrNull, "", exitValue,
+                                null, operationLog, machineLog);
                     }
                 } finally
                 {
@@ -719,6 +763,7 @@ public final class ProcessExecutionHelper
             return callingThreadName + "::run-P" + processNumber + "-{"
                     + getCommandName(commandLine) + "}";
         }
+
     }
 
     /**
@@ -744,6 +789,8 @@ public final class ProcessExecutionHelper
             final ProcessRecord processRecord = processWrapper.getAndSet(null);
             if (processRecord != null)
             {
+                final ExecutionResult<?> processIOResultOrNull =
+                        tryGetAndLogProcessIOResult(processRecord, SHORT_TIMEOUT / 4);
                 final Process process = processRecord.getProcess();
                 process.destroy(); // Note: this also closes the I/O streams.
                 if (machineLog.isInfoEnabled())
@@ -753,12 +800,14 @@ public final class ProcessExecutionHelper
                 final int exitValue = getExitValue(processRecord.getProcess());
                 if (binaryOutput)
                 {
-                    return new ProcessResult(commandLine, processNumber, status, "", exitValue,
-                            processRecord.getBinaryProcessOutput().toByteArray(),
+                    return new ProcessResult(commandLine, processNumber, status,
+                            processIOResultOrNull, "", exitValue, processRecord
+                                    .getBinaryProcessOutput().toByteArray(),
                             processRecord.getErrorProcessOutput(), operationLog, machineLog);
                 } else
                 {
-                    return new ProcessResult(commandLine, processNumber, status, "", exitValue,
+                    return new ProcessResult(commandLine, processNumber, status,
+                            processIOResultOrNull, "", exitValue,
                             processRecord.getTextProcessOutput(), operationLog, machineLog);
                 }
             } else
@@ -828,6 +877,44 @@ public final class ProcessExecutionHelper
             };
     }
 
+    ExecutionResult<?> tryGetAndLogProcessIOResult(ProcessRecord record, long timeout)
+    {
+        if (record.tryGetProcessIOFuture() == null)
+        {
+            return null;
+        }
+        record.getProcessRunningForIO().set(false);
+        final ExecutionResult<?> processIOResult =
+                ConcurrencyUtilities.getResult(record.tryGetProcessIOFuture(), timeout);
+        logProcessIOFailures(processIOResult);
+        return processIOResult;
+    }
+
+    void logProcessIOFailures(final ExecutionResult<?> processIOResult)
+    {
+        switch (processIOResult.getStatus())
+        {
+            case COMPLETE:
+                break;
+            case EXCEPTION:
+                final Throwable th = processIOResult.tryGetException();
+                final Throwable cause =
+                        (th == null) ? new RuntimeException("Unknown exception.")
+                                : (th instanceof Error) ? (Error) th : CheckedExceptionTunnel
+                                        .unwrapIfNecessary((Exception) th);
+                machineLog.warn(String.format(
+                        "Exception when doing process I/O, type='%s', msg='%s'.", cause.getClass()
+                                .getSimpleName(), cause.getMessage()));
+                break;
+            case INTERRUPTED:
+                machineLog.warn("Interrupted when doing process I/O.");
+                break;
+            case TIMED_OUT:
+                machineLog.warn("Timeout when doing process I/O.");
+                break;
+        }
+    }
+
     private final ProcessResult run(final boolean stopOnInterrupt)
     {
         final Future<ProcessResult> runnerFuture = launchProcessExecutor();
@@ -870,12 +957,12 @@ public final class ProcessExecutionHelper
             }
             if (binaryOutput)
             {
-                return new ProcessResult(commandLine, processNumber, status,
+                return new ProcessResult(commandLine, processNumber, status, null,
                         tryGetStartupFailureMessage(executionResult.tryGetException()),
                         ProcessResult.NO_EXIT_VALUE, null, null, operationLog, machineLog);
             } else
             {
-                return new ProcessResult(commandLine, processNumber, status,
+                return new ProcessResult(commandLine, processNumber, status, null,
                         tryGetStartupFailureMessage(executionResult.tryGetException()),
                         ProcessResult.NO_EXIT_VALUE, null, operationLog, machineLog);
             }
diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java
index 604e8249799..5afaa804260 100644
--- a/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java
+++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 import ch.systemsx.cisd.base.utilities.OSUtilities;
+import ch.systemsx.cisd.common.concurrent.ExecutionResult;
 import ch.systemsx.cisd.common.concurrent.ExecutionStatus;
 
 /**
@@ -82,6 +83,8 @@ public final class ProcessResult
 
     private final boolean isBinaryOutput;
 
+    private final ExecutionResult<?> processIOResultOrNull;
+
     /**
      * Returns <code>true</code> if the <var>exitValue</var> indicates that the process has been
      * terminated on the Operating System level.
@@ -103,14 +106,16 @@ public final class ProcessResult
     }
 
     public ProcessResult(final List<String> commandLine, final int processNumber,
-            final ExecutionStatus status, final String startupFailureMessageOrNull,
-            final int exitValue, final List<String> processOutputOrNull, final Logger operationLog,
+            final ExecutionStatus status, ExecutionResult<?> processIOResult,
+            final String startupFailureMessageOrNull, final int exitValue,
+            final List<String> processOutputOrNull, final Logger operationLog,
             final Logger machineLog)
     {
         this.commandLine = commandLine;
         this.commandName = ProcessExecutionHelper.getCommandName(commandLine);
         this.processNumber = processNumber;
         this.status = status;
+        this.processIOResultOrNull = processIOResult;
         this.startupFailureMessage =
                 (startupFailureMessageOrNull == null) ? "" : startupFailureMessageOrNull;
         this.exitValue = exitValue;
@@ -132,15 +137,16 @@ public final class ProcessResult
     }
 
     public ProcessResult(final List<String> commandLine, final int processNumber,
-            final ExecutionStatus status, final String startupFailureMessageOrNull,
-            final int exitValue, final byte[] processBinaryOutputOrNull,
-            final List<String> processErrorOutputOrNull, final Logger operationLog,
-            final Logger machineLog)
+            final ExecutionStatus status, ExecutionResult<?> processIOResult,
+            final String startupFailureMessageOrNull, final int exitValue,
+            final byte[] processBinaryOutputOrNull, final List<String> processErrorOutputOrNull,
+            final Logger operationLog, final Logger machineLog)
     {
         this.commandLine = commandLine;
         this.commandName = ProcessExecutionHelper.getCommandName(commandLine);
         this.processNumber = processNumber;
         this.status = status;
+        this.processIOResultOrNull = processIOResult;
         this.startupFailureMessage =
                 (startupFailureMessageOrNull == null) ? "" : startupFailureMessageOrNull;
         this.exitValue = exitValue;
@@ -247,6 +253,18 @@ public final class ProcessResult
         return output;
     }
 
+    /**
+     * Returns the IO process result, or <code>null</code>, if this information is not available.
+     * <p>
+     * <b>Note that {@link ExecutionStatus#COMPLETE} does <i>not</i> necessarily mean that all
+     * output has been read from the process. You need to check {@link #isTimedOut()} in addition to
+     * see whether the process got terminated due to a timeout condition.</b>
+     */
+    public ExecutionResult<?> tryGetProcessIOResult()
+    {
+        return processIOResultOrNull;
+    }
+
     /**
      * Returns the exit value of the process, or {@link #NO_EXIT_VALUE}, if the value is not
      * available.
-- 
GitLab