From afa47d77ab2c063058077e0d6d67f8c5e4d2f3c5 Mon Sep 17 00:00:00 2001 From: ribeaudc <ribeaudc> Date: Mon, 2 Jun 2008 11:02:21 +0000 Subject: [PATCH] change: - Forward porting of Bernd changes in concurrent package. - A couple of test put in the broken group. Will be fixed very soon. SVN: 6399 --- .../tiff/TiffZipCompressionMethod.java | 6 +- .../concurrent/ConcurrencyUtilities.java | 215 ++++++-- .../common/concurrent/ExecutionResult.java | 105 ++++ .../common/concurrent/ExecutionStatus.java | 41 ++ .../cisd/common/concurrent/NamedCallable.java | 30 ++ .../common/concurrent/NamedFutureTask.java | 48 ++ .../cisd/common/concurrent/NamedRunnable.java | 28 + .../concurrent/NamingThreadFactory.java | 52 ++ .../concurrent/NamingThreadPoolExecutor.java | 281 ++++++++++ .../common/concurrent/PoolNameThread.java | 48 ++ .../cisd/common/concurrent/StopException.java | 49 ++ .../common/concurrent/TimerUtilities.java | 172 +++++++ .../process/ProcessExecutionHelper.java | 487 ++++++++++-------- .../cisd/common/process/ProcessResult.java | 223 +++++--- .../cisd/common/process/ProcessRunner.java | 2 + .../cisd/common/process/ProcessWatchdog.java | 1 + .../utilities/RecursiveHardLinkMaker.java | 7 +- .../io/DestroyableFileSystemUtils.java | 1 + .../concurrent/ConcurrencyUtilitiesTest.java | 191 ++++++- .../NamingThreadPoolExecutorTest.java | 250 +++++++++ .../common/concurrent/TimerUtilitiesTest.java | 240 +++++++++ .../process/ProcessExecutionHelperTest.java | 145 ++++-- 22 files changed, 2237 insertions(+), 385 deletions(-) create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionResult.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionStatus.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/NamedCallable.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/NamedFutureTask.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/NamedRunnable.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/StopException.java create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/TimerUtilities.java create mode 100644 common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java create mode 100644 common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TimerUtilitiesTest.java diff --git a/common/source/java/ch/systemsx/cisd/common/compression/tiff/TiffZipCompressionMethod.java b/common/source/java/ch/systemsx/cisd/common/compression/tiff/TiffZipCompressionMethod.java index 60a66735421..ddbb0f1cfd7 100644 --- a/common/source/java/ch/systemsx/cisd/common/compression/tiff/TiffZipCompressionMethod.java +++ b/common/source/java/ch/systemsx/cisd/common/compression/tiff/TiffZipCompressionMethod.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.Arrays; import java.util.List; +import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.compression.file.InPlaceCompressionMethod; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; @@ -41,12 +42,11 @@ public class TiffZipCompressionMethod extends InPlaceCompressionMethod private static String getImageMagickVersion(String convertExecutableToCheck) { - final long timeToWaitForCompletionMillis = 2 * 1000L; final ProcessResult result = ProcessExecutionHelper.run(Arrays.asList(convertExecutableToCheck, "--version"), - timeToWaitForCompletionMillis, operationLog, machineLog); + operationLog, machineLog, Constants.MILLIS_TO_WAIT_BEFORE_TIMEOUT); result.log(); - final String versionString = extractImageMagickVersion(result.getProcessOutput().get(0)); + final String versionString = extractImageMagickVersion(result.getOutput().get(0)); return versionString; } diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java index 54720ea890c..dc3d68aa068 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java @@ -17,11 +17,7 @@ package ch.systemsx.cisd.common.concurrent; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -37,28 +33,28 @@ import ch.systemsx.cisd.common.logging.LogLevel; public final class ConcurrencyUtilities { + /** Corresponds to no timeout at all when waiting for the future. */ + public static final long NO_TIMEOUT = -1L; + + /** Corresponds to an immediate timeout when waiting for the future. */ + public static final long IMMEDIATE_TIMEOUT = 0L; + /** - * Creates an {@link ExecutorService} where threads have a name starting with <var>name</var>. + * Tries to get the result of a <var>future</var>, maximally waiting <var>timeoutMillis</var> + * for the result to become available. Any {@link ExecutionException} that might occur in the + * future task is unwrapped and re-thrown. * - * @param name The name prefix of new threads started by this pool. - * @param corePoolSize The number of threads that should be kept running even if less theads are - * needed. - * @param maximumPoolSize The number of threads that this executor service is maximally allowed - * to spawn. + * @param future The future representing the execution to wait for. + * @param timeoutMillis The time-out (in milliseconds) to wait for the execution to finish. If + * it is smaller than 0, no time-out will apply. + * @return The result of the future, or <code>null</code>, if the result does not become + * available within <var>timeoutMillis</var> ms. + * @throws StopException If the thread got interrupted. */ - public static ExecutorService newNamedPool(final String name, int corePoolSize, - int maximumPoolSize) + public static <T> T tryGetResult(Future<T> future, long timeoutMillis) + throws StopException { - return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), new ThreadFactory() - { - private int executorThreadCount = 1; - - public Thread newThread(Runnable r) - { - return new Thread(r, name + " " + executorThreadCount); - } - }); + return tryGetResult(future, timeoutMillis, null, null, true); } /** @@ -66,43 +62,185 @@ public final class ConcurrencyUtilities * for the result to become available. Any {@link ExecutionException} that might occur in the * future task is unwrapped and re-thrown. * + * @param future The future representing the execution to wait for. + * @param timeoutMillis The time-out (in milliseconds) to wait for the execution to finish. If + * it is smaller than 0, no time-out will apply. * @return The result of the future, or <code>null</code>, if the result does not become * available within <var>timeoutMillis</var> ms. - * @throws CheckedExceptionTunnel of an {@link InterruptedException} if the current thread gets - * interrupted during waiting for the result. + * @throws StopException If the thread got interrupted and <var>stopOnInterrupt</var> + * is <code>true</code>. */ - public static <T> T tryGetResult(Future<T> future, long timeoutMillis) + public static <T> T tryGetResult(Future<T> future, long timeoutMillis, boolean stopOnInterrupt) + throws StopException { - return tryGetResult(future, timeoutMillis, null, null); + return tryGetResult(future, timeoutMillis, null, null, stopOnInterrupt); } /** * Tries to get the result of a <var>future</var>, maximally waiting <var>timeoutMillis</var> * for the result to become available. Any {@link ExecutionException} that might occur in the - * future task is unwrapped and re-thrown. + * future task is unwrapped and re-thrown (wrapped in a {@link CheckedExceptionTunnel} if + * necessary. * + * @param future The future representing the execution to wait for. + * @param timeoutMillis The time-out (in milliseconds) to wait for the execution to finish. If + * it is smaller than 0, no time-out will apply. + * @param loggerOrNull The logger to use for logging note-worthy information, or + * <code>null</code>, if nothing should be logged. + * @param operationNameOrNull The name of the operation performed, for log messages, or + * <code>null</code>, if it is not known or deemed unimportant. + * @param stopOnInterrupt If <code>true</code>, throw a {@link StopException} if + * the thread gets interrupted while waiting on the future. * @return The result of the future, or <code>null</code>, if the result does not become * available within <var>timeoutMillis</var> ms or if the waiting thread gets * interrupted. + * @throws StopException If the thread got interrupted and <var>stopOnInterrupt</var> + * is <code>true</code>. */ public static <T> T tryGetResult(Future<T> future, long timeoutMillis, + ISimpleLogger loggerOrNull, String operationNameOrNull, boolean stopOnInterrupt) + throws StopException + { + final ExecutionResult<T> result = + getResult(future, timeoutMillis, loggerOrNull, operationNameOrNull); + switch (result.getStatus()) + { + case COMPLETE: + { + return result.tryGetResult(); + } + case EXCEPTION: + { + final Throwable cause = result.tryGetException(); + assert cause != null; + if (cause instanceof Error) + { + throw (Error) cause; + } else + { + throw CheckedExceptionTunnel.wrapIfNecessary((Exception) cause); + } + } + case INTERRUPTED: + { + if (stopOnInterrupt) + { + throw new StopException(); + } else + { + return null; + } + } + default: + { + return null; + } + } + } + + /** + * Returns the result of a <var>future</var>, maximally waiting <var>timeoutMillis</var> for + * the result to become available. The return value is never <code>null</code>, but always a + * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes + * are: + * <ul> + * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a + * result is available, if provided.</li> + * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li> + * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li> + * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see + * {@link Thread#interrupt()}).</li> + * </ul> + * + * @param future The future representing the execution to wait for. + * @param timeoutMillis The time-out (in milliseconds) to wait for the execution to finish. If + * it is smaller than 0, no time-out will apply. + * @return The {@link ExecutionResult} of the <var>future</var>. May correspond to each one of + * the {@link ExecutionStatus} values. + */ + public static <T> ExecutionResult<T> getResult(Future<T> future, long timeoutMillis) + { + return getResult(future, timeoutMillis, null, null); + } + + /** + * Returns the result of a <var>future</var>, maximally waiting <var>timeoutMillis</var> for + * the result to become available. The return value is never <code>null</code>, but always a + * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes + * are: + * <ul> + * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a + * result is available, if provided.</li> + * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li> + * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li> + * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see + * {@link Thread#interrupt()}).</li> + * </ul> + * + * @param future The future representing the execution to wait for. + * @param timeoutMillis The time-out (in milliseconds) to wait for the execution to finish. If + * it is smaller than 0, no time-out will apply. + * @param loggerOrNull The logger to use for logging note-worthy information, or + * <code>null</code>, if nothing should be logged. + * @param operationNameOrNull The name of the operation performed, for log messages, or + * <code>null</code>, if it is not known or deemed unimportant. + * @return The {@link ExecutionResult} of the <var>future</var>. May correspond to each one of + * the {@link ExecutionStatus} values. + */ + public static <T> ExecutionResult<T> getResult(Future<T> future, long timeoutMillis, ISimpleLogger loggerOrNull, String operationNameOrNull) + { + return getResult(future, timeoutMillis, true, loggerOrNull, operationNameOrNull); + } + + /** + * Returns the result of a <var>future</var>, maximally waiting <var>timeoutMillis</var> for + * the result to become available. The return value is never <code>null</code>, but always a + * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes + * are: + * <ul> + * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a + * result is available, if provided.</li> + * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li> + * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li> + * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see + * {@link Thread#interrupt()}).</li> + * </ul> + * + * @param future The future representing the execution to wait for. + * @param timeoutMillis The time-out (in milliseconds) to wait for the execution to finish. If + * it is smaller than 0, no time-out will apply. + * @param cancelOnTimeout If <code>true</code>, the <var>future</var> will be canceled on + * time-out. + * @param loggerOrNull The logger to use for logging note-worthy information, or + * <code>null</code>, if nothing should be logged. + * @param operationNameOrNull The name of the operation performed, for log messages, or + * <code>null</code>, if it is not known or deemed unimportant. + * @return The {@link ExecutionResult} of the <var>future</var>. May correspond to each one of + * the {@link ExecutionStatus} values. + */ + public static <T> ExecutionResult<T> getResult(Future<T> future, long timeoutMillis, + boolean cancelOnTimeout, ISimpleLogger loggerOrNull, String operationNameOrNull) { final String operationName = - (operationNameOrNull == null) ? "UNKNOWN OPERATION" : operationNameOrNull; + (operationNameOrNull == null) ? "UNKNOWN" : operationNameOrNull; try { - return future.get(timeoutMillis, TimeUnit.MILLISECONDS); + return ExecutionResult.create(future.get(transform(timeoutMillis), + TimeUnit.MILLISECONDS)); } catch (TimeoutException ex) { - future.cancel(true); + if (cancelOnTimeout) + { + future.cancel(true); + } if (loggerOrNull != null) { loggerOrNull.log(LogLevel.DEBUG, String.format( "%s took longer than %f s, cancelled.", operationName, timeoutMillis / 1000f)); } - return null; + return ExecutionResult.createTimedOut(); } catch (InterruptedException ex) { future.cancel(true); @@ -111,7 +249,7 @@ public final class ConcurrencyUtilities loggerOrNull.log(LogLevel.DEBUG, String .format("%s got interrupted.", operationName)); } - return null; + return ExecutionResult.createInterrupted(); } catch (ExecutionException ex) { final Throwable cause = ex.getCause(); @@ -125,13 +263,12 @@ public final class ConcurrencyUtilities loggerOrNull.log(LogLevel.ERROR, String.format( "%s has caused an exception: %s [%s]", operationName, message, className)); } - if (cause instanceof Error) - { - throw (Error) cause; - } else - { - throw CheckedExceptionTunnel.wrapIfNecessary((Exception) cause); - } + return ExecutionResult.createExceptional(cause == null ? ex : cause); } } + + private static long transform(long timeoutMillis) + { + return (timeoutMillis < 0) ? Long.MAX_VALUE : timeoutMillis; + } } diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionResult.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionResult.java new file mode 100644 index 00000000000..48bd83076d9 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionResult.java @@ -0,0 +1,105 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +/** + * A class that contains the result of the execution of a {@link Runnable} (or {@link Callable}) in + * an {@link ExecutorService}. + * + * @author Bernd Rinn + */ +public class ExecutionResult<T> +{ + private final ExecutionStatus status; + + private final T resultOrNull; + + private final Throwable exceptionOrNull; + + private ExecutionResult(final ExecutionStatus status, final T resultOrNull, final Throwable exceptionOrNull) + { + this.status = status; + this.resultOrNull = resultOrNull; + this.exceptionOrNull = exceptionOrNull; + } + + /** + * Creates an {@link ExecutionResult} that corresponds to a "real" result. Since a + * {@link Runnable} can also provide a <code>null</code> result, <code>null</code> is an + * accepted value for the result. + */ + static <T> ExecutionResult<T> create(final T resultOrNull) + { + return new ExecutionResult<T>(ExecutionStatus.COMPLETE, resultOrNull, null); + } + + /** + * Creates an {@link ExecutionResult} that corresponds to an exception. + */ + static <T> ExecutionResult<T> createExceptional(final Throwable exception) + { + assert exception != null; + + return new ExecutionResult<T>(ExecutionStatus.EXCEPTION, null, exception); + } + + /** + * Creates an {@link ExecutionResult} that corresponds to a time out. + */ + static <T> ExecutionResult<T> createTimedOut() + { + return new ExecutionResult<T>(ExecutionStatus.TIMED_OUT, null, null); + } + + /** + * Creates an {@link ExecutionResult} that corresponds to an interruption. + */ + static <T> ExecutionResult<T> createInterrupted() + { + return new ExecutionResult<T>(ExecutionStatus.INTERRUPTED, null, null); + } + + /** + * Returns the {@link ExecutionStatus} of the execution. + */ + public ExecutionStatus getStatus() + { + return status; + } + + /** + * Returns the returned result of the execution, or <code>null</code>, if either the status + * is not {@link ExecutionStatus#COMPLETE} or if the execution didn't provide a result. + */ + public T tryGetResult() + { + return resultOrNull; + } + + /** + * Returns the thrown exception (or error) of the execution, or <code>null</code>, if the + * status is not {@link ExecutionStatus#EXCEPTION}. + */ + public Throwable tryGetException() + { + return exceptionOrNull; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionStatus.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionStatus.java new file mode 100644 index 00000000000..a943110edc8 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ExecutionStatus.java @@ -0,0 +1,41 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +/** + * The status of an execution of a {@link Runnable} (or {@link Callable}) in an + * {@link ExecutorService}. + * + * @author Bernd Rinn + */ +public enum ExecutionStatus +{ + /** The {@link Runnable} ran to completion and returned its result if any. */ + COMPLETE, + + /** The {@link Runnable} didn't run to completion in the specified time but got a timeout. */ + TIMED_OUT, + + /** The {@link Runnable} didn't run to completion because its thread was interrupted. */ + INTERRUPTED, + + /** The {@link Runnable} didn't run to completion because it threw an exception or error. */ + EXCEPTION; +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamedCallable.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamedCallable.java new file mode 100644 index 00000000000..4ae75955959 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamedCallable.java @@ -0,0 +1,30 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.Callable; + +/** + * A {@link Callable} with a name. + * + * @author Bernd Rinn + */ +public interface NamedCallable<T> extends Callable<T> +{ + /** Returns the name to be used for the thread name. */ + public String getCallableName(); +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamedFutureTask.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamedFutureTask.java new file mode 100644 index 00000000000..b84245d28e3 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamedFutureTask.java @@ -0,0 +1,48 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.FutureTask; + +/** + * A {@link FutureTask} with a name. + * + * @author Bernd Rinn + */ +class NamedFutureTask<V> extends FutureTask<V> implements NamedRunnable +{ + + private final String name; + + NamedFutureTask(NamedCallable<V> callable) + { + super(callable); + this.name = callable.getCallableName(); + } + + NamedFutureTask(NamedRunnable runnable, V result) + { + super(runnable, result); + this.name = runnable.getRunnableName(); + } + + public String getRunnableName() + { + return name; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamedRunnable.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamedRunnable.java new file mode 100644 index 00000000000..9ff5c8f0c96 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamedRunnable.java @@ -0,0 +1,28 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +/** + * A {@link Runnable} with a name. + * + * @author Bernd Rinn + */ +public interface NamedRunnable extends Runnable +{ + /** Returns the name to be used for the thread name. */ + public String getRunnableName(); +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java new file mode 100644 index 00000000000..06c1fb2e377 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} that gives (non-standard) names to new threads. If a name is a + * {@link NamedRunnable}, the name provided by {@link NamedRunnable#getRunnableName()} will be + * used, otherwise the <var>defaultName</var>. The thread count (number of already created threads + * in this factory) will always be appended. + * + * @author Bernd Rinn + */ +public class NamingThreadFactory implements ThreadFactory +{ + + private final String poolName; + + private int threadCount; + + public NamingThreadFactory(String poolName) + { + this.poolName = poolName; + this.threadCount = 1; + } + + public Thread newThread(Runnable r) + { + final String completePoolName = poolName + "-T" + threadCount++; + return new PoolNameThread(r, completePoolName); + } + + String getPoolName() + { + return poolName; + } +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java new file mode 100644 index 00000000000..de16227920c --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java @@ -0,0 +1,281 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A {@link ThreadPoolExecutor} that allows to attach names to the threads it manages. These names + * can come either from {@link NamedRunnable}s or {@link NamedCallable}s, or, if their standard + * counterparts are submitted, a default name is used. + * + * @author Bernd Rinn + */ +public class NamingThreadPoolExecutor extends ThreadPoolExecutor +{ + + /** + * The default time (in milli-seconds) to keep threads alive that are above the core pool size. + */ + public final static long DEFAULT_KEEP_ALIVE_TIME_MILLIS = 10000L; + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param poolName the default name for new threads + */ + public NamingThreadPoolExecutor(String poolName) + { + super(1, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param poolName the default name for new threads + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @throws IllegalArgumentException if corePoolSize less than zero. + */ + public NamingThreadPoolExecutor(String poolName, int corePoolSize) + { + super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param poolName the default name for new threads + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @param keepAliveTimeMillis when the number of threads is greater than the core, this is the + * maximum time in milliseconds that excess idle threads will wait for new tasks + * before terminating. + * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if + * maximumPoolSize less than or equal to zero, or if corePoolSize greater than + * maximumPoolSize. + */ + public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, + long keepAliveTimeMillis) + { + super(corePoolSize, maximumPoolSize, keepAliveTimeMillis, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param poolName the default name for new threads + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @throws IllegalArgumentException if corePoolSize less than zero, or if maximumPoolSize less + * than or equal to zero, or if corePoolSize greater than maximumPoolSize. + */ + public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize) + { + super(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param poolName the default name for new threads + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @param keepAliveTime when the number of threads is greater than the core, this is the maximum + * time that excess idle threads will wait for new tasks before terminating. + * @param unit the time unit for the keepAliveTime argument. + * @param workQueue the queue to use for holding tasks before they are executed. This queue will + * hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> + * method. + * @param handler the handler to use when execution is blocked because the thread bounds and + * queue capacities are reached. + * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if + * maximumPoolSize less than or equal to zero, or if corePoolSize greater than + * maximumPoolSize. + * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> or + * <tt>handler</tt> are null. + */ + public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, + RejectedExecutionHandler handler) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + new NamingThreadFactory(poolName), handler); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param poolName the default name for new threads + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @param keepAliveTime when the number of threads is greater than the core, this is the maximum + * time that excess idle threads will wait for new tasks before terminating. + * @param unit the time unit for the keepAliveTime argument. + * @param workQueue the queue to use for holding tasks before they are executed. This queue will + * hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> + * method. + * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if + * maximumPoolSize less than or equal to zero, or if corePoolSize greater than + * maximumPoolSize. + * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> are null. + */ + public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + new NamingThreadFactory(poolName)); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @param keepAliveTime when the number of threads is greater than the core, this is the maximum + * time that excess idle threads will wait for new tasks before terminating. + * @param unit the time unit for the keepAliveTime argument. + * @param workQueue the queue to use for holding tasks before they are executed. This queue will + * hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> + * method. + * @param threadFactory the factory to use when the executor creates a new thread. + * @param handler the handler to use when execution is blocked because the thread bounds and + * queue capacities are reached. + * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if + * maximumPoolSize less than or equal to zero, or if corePoolSize greater than + * maximumPoolSize. + * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> or + * <tt>handler</tt> are null. + */ + public NamingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue, NamingThreadFactory threadFactory, + RejectedExecutionHandler handler) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + /** + * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * + * @param corePoolSize the number of threads to keep in the pool, even if they are idle. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @param keepAliveTime when the number of threads is greater than the core, this is the maximum + * time that excess idle threads will wait for new tasks before terminating. + * @param unit the time unit for the keepAliveTime argument. + * @param workQueue the queue to use for holding tasks before they are executed. This queue will + * hold only the <tt>Runnable</tt> tasks submitted by the <tt>execute</tt> + * method. + * @param threadFactory the factory to use when the executor creates a new thread. + * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if + * maximumPoolSize less than or equal to zero, or if corePoolSize greater than + * maximumPoolSize. + * @throws NullPointerException if <tt>workQueue</tt> or <tt>threadFactory</tt> are null. + */ + public NamingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue, NamingThreadFactory threadFactory) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) + { + if (r instanceof NamedRunnable == false) + { + return; + } + final String runnableName = ((NamedRunnable) r).getRunnableName(); + if (t instanceof PoolNameThread) + { + ((PoolNameThread) t).setRunnableName(runnableName); + } else + { + t.setName(runnableName); + } + super.beforeExecute(t, r); + } + + @Override + public Future<?> submit(Runnable task) + { + if (task == null) + { + throw new NullPointerException(); + } + + final FutureTask<Object> ftask; + if (task instanceof NamedRunnable) + { + ftask = new NamedFutureTask<Object>((NamedRunnable) task, null); + } else + { + ftask = new FutureTask<Object>(task, null); + } + execute(ftask); + return ftask; + } + + @Override + public <T> Future<T> submit(Runnable task, T result) + { + if (task == null) + { + throw new NullPointerException(); + } + + final FutureTask<T> ftask; + if (task instanceof NamedRunnable) + { + ftask = new NamedFutureTask<T>((NamedRunnable) task, result); + } else + { + ftask = new FutureTask<T>(task, result); + } + execute(ftask); + return ftask; + } + + @Override + public <T> Future<T> submit(Callable<T> task) + { + if (task == null) + { + throw new NullPointerException(); + } + final FutureTask<T> ftask; + if (task instanceof NamedCallable) + { + ftask = new NamedFutureTask<T>((NamedCallable<T>) task); + } else + { + ftask = new FutureTask<T>(task); + } + execute(ftask); + return ftask; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java b/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java new file mode 100644 index 00000000000..96ba890677a --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java @@ -0,0 +1,48 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +/** + * A {@link Thread} that knows its pool name. + * + * @author Bernd Rinn + */ +public class PoolNameThread extends Thread +{ + private final String poolName; + + public PoolNameThread(Runnable target, String poolName) + { + super(target, poolName); + this.poolName = poolName; + } + + /** + * Sets the thread's name to be a combination of the name of the thread in the pool and the name + * of the runnable, separated by '::'. + */ + public void setRunnableName(String runnableName) + { + setName(poolName + "::" + runnableName); + } + + /** Clears the name of the runnable, setting the name of the thread to the pool name. */ + public void clearRunnableName() + { + setName(poolName); + } +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/StopException.java b/common/source/java/ch/systemsx/cisd/common/concurrent/StopException.java new file mode 100644 index 00000000000..591715ab852 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/StopException.java @@ -0,0 +1,49 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +/** + * Exception that signals that whoever gets it should stop its current work. + * <p> + * This is usually triggered by interrupting the thread that the work package is processed in and + * regularly checking with {@link #check()}. + * + * @author Bernd Rinn + */ +public class StopException extends RuntimeException +{ + + private static final long serialVersionUID = 1L; + + /** + * Checks whether the current thread has been interrupted and, if it has, throw a + * {@link StopException}. + */ + public static void check() throws StopException + { + if (Thread.interrupted()) + { + throw new StopException(); + } + } + + public StopException() + { + super(); + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/TimerUtilities.java b/common/source/java/ch/systemsx/cisd/common/concurrent/TimerUtilities.java new file mode 100644 index 00000000000..8e3b5421e03 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/TimerUtilities.java @@ -0,0 +1,172 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.lang.reflect.Field; +import java.util.Timer; +import java.util.TimerTask; + +/** + * Utilities for {@link Timer}. + * + * @author Bernd Rinn + */ +public class TimerUtilities +{ + private static final Field timerThreadFieldOrNull = tryGetTimerThreadField(); + + private static Field tryGetTimerThreadField() + { + try + { + final Field field = Timer.class.getDeclaredField("thread"); + field.setAccessible(true); + if (Thread.class.isAssignableFrom(field.getType())) + { + return field; + } + } catch (Exception ex) + { + // Nothing to do here. + } + return null; + } + + private static Thread tryGetTimerThread(Timer timer) + { + try + { + if (timerThreadFieldOrNull != null) + { + return (Thread) timerThreadFieldOrNull.get(timer); + } + } catch (Exception ex) + { + // Nothing to do here. + } + return null; + } + + @SuppressWarnings("deprecation") + private static void stopTimerThread(Thread timerThread) + { + timerThread.stop(new StopException()); + } + + /** + * Tries to join the <var>thread</var> {@link Thread#join(long)}. + * + * @param thread The {@link Thread} to join. + * @param millis The time-out in milli-seconds to wait for the thread to die. + * @return <code>true</code>, if the thread died in due time and <code>false</code> + * otherwise. + */ + private static boolean tryJoinThread(Thread thread, long millis) + { + try + { + thread.join(millis); + return (thread.isAlive() == false); + } catch (Exception ex) + { + return false; + } + } + + /** + * Returns <code>true</code>, if these utilities are operational (i.e. can work with the + * {@link Timer} class of the JRE) and <code>false</code> otherwise. + */ + public static boolean isOperational() + { + return (timerThreadFieldOrNull != null); + } + + /** + * Tries to interrupt the thread that the given <var>timer</var> uses for processing + * {@link TimerTask}s. + * + * @return <code>true</code>, if the timer thread was successfully interrupted and + * <code>false</code> otherwise. + */ + public static boolean tryInterruptTimerThread(Timer timer) + { + final Thread timerThreadOrNull = tryGetTimerThread(timer); + if (timerThreadOrNull != null) + { + timerThreadOrNull.interrupt(); + return true; + } else + { + return false; + } + } + + /** + * Tries to join the thread that <var>timer</var> is running its {@link TimerTask}s in (see + * {@link Thread#join(long)}. + * + * @param timer The {@link Timer} to get the thread from. + * @param millis The time-out in milli-seconds to wait for the thread to die. + * @return <code>true</code>, if the thread died in due time and <code>false</code> + * otherwise. + */ + public static boolean tryJoinTimerThread(Timer timer, long millis) + { + final Thread timerThreadOrNull = tryGetTimerThread(timer); + if (timerThreadOrNull != null) + { + return tryJoinThread(timerThreadOrNull, millis); + } + return false; + } + + /** + * Tries to shutdown the given <var>timer</var> by calling {@link Timer#cancel()}, + * interrupting the thread that it running its tasks and then trying to join this thread. + * + * @param timer The timer to shutdown. + * @param millis The time-out in milli-seconds to wait for the thread to die. The total time-out + * of this method can be twice as high as the value specified. + * @return <code>true</code>, if the thread died in due time and <code>false</code> + * otherwise. + */ + public static boolean tryShutdownTimer(Timer timer, long millis) + { + timer.cancel(); + final Thread timerThread = tryGetTimerThread(timer); + if (timerThread == null) + { + return false; + } + timerThread.interrupt(); + final boolean joinOK = tryJoinThread(timerThread, millis); + if (joinOK) + { + return true; + } + // If we have been interrupting the thread successfully but the interrupted flag has not + // been set, then try stopping and again joining the thread. + if (timerThread.isInterrupted()) + { + stopTimerThread(timerThread); + return tryJoinThread(timerThread, millis); + } + return false; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java index 26e837ec28d..00e2967c847 100644 --- a/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java +++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java @@ -21,165 +21,248 @@ import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; -import org.apache.log4j.Level; +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; -import ch.systemsx.cisd.common.utilities.OSUtilities; +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.concurrent.ExecutionResult; +import ch.systemsx.cisd.common.concurrent.ExecutionStatus; +import ch.systemsx.cisd.common.concurrent.NamedCallable; +import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor; +import ch.systemsx.cisd.common.concurrent.StopException; /** * Utility to execute a command from a command line and log all events. * - * @author Tomasz Pylak * @author Bernd Rinn */ -public final class ProcessExecutionHelper +public class ProcessExecutionHelper { /** - * The value indicating that there is no exit value available for a process execution. + * Strategy on whether to read the process output or not. */ - public static final int NO_EXIT_VALUE = -1; + public enum OutputReadingStrategy + { + /** Never read the output. */ + NEVER, - /** - * The value indicating the process execution went OK. - */ - public static final int EXIT_VALUE_OK = 0; + /** Read the output if the process failed in some way. */ + ON_ERROR, - /** - * The exit value returned by {@link Process#waitFor()} if the process was terminated by - * {@link Process#destroy()} on a UNIX machine. - */ - private static final int EXIT_VALUE_FOR_TERMINATION_UNIX = 143; + /** Always read the output. */ + ALWAYS; + } /** - * The exit value returned by {@link Process#waitFor()} if the process was terminated by - * {@link Process#destroy()} on a MS Windows machine. + * The default strategy for when to read the process output. */ - private static final int EXIT_VALUE_FOR_TERMINATION_WINDOWS = 1; + public static final OutputReadingStrategy DEFAULT_OUTPUT_READING_STRATEGY = + OutputReadingStrategy.ON_ERROR; + + /** Corresponds to no timeout at all for the process execution. */ + public static final long NO_TIMEOUT = ConcurrencyUtilities.NO_TIMEOUT; + + /** Corresponds to a short timeout of 1/10 s. */ + private static final long SHORT_TIMEOUT = 100; + + /** Corresponds to an immediate timeout for the process execution. */ + private static final long IMMEDIATE_TIMEOUT = ConcurrencyUtilities.IMMEDIATE_TIMEOUT; + + /** The executor service handling the threads that OS processes are spawned in. */ + private static final ExecutorService executor = new NamingThreadPoolExecutor("osproc", 10); + + /** The counter to draw the <var>processNumber</var> from. */ + private static final AtomicInteger processCounter = new AtomicInteger(); private final Logger operationLog; private final Logger machineLog; - public static List<String> createSshCommand(String command, File sshExecutable, String host) - { - ArrayList<String> wrappedCmd = new ArrayList<String>(); - List<String> sshCommand = Arrays.asList(sshExecutable.getPath(), "-T", host); - wrappedCmd.addAll(sshCommand); - wrappedCmd.add(command); - return wrappedCmd; - } + /** Read-only! */ + private final List<String> commandLine; + + private final long millisToWaitForCompletion; + + private final OutputReadingStrategy outputReadingStrategy; + + /** The number used in thread names to distinguish the process. */ + private final int processNumber; + + // Use this reference to make sure the process is as dead as you can get it to be. + private final AtomicReference<Process> processWrapper; /** * Runs an Operating System process, specified by <var>cmd</var>. * - * @param commandLine The command line to run. + * @param cmd The command line to run. * @param operationLog The {@link Logger} to use for all message on the higher level. * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. * @return <code>true</code>, if the process did complete successfully, <code>false</code> * otherwise. + * @throws StopException If the thread got interrupted. */ - public static boolean runAndLog(final List<String> commandLine, final Logger operationLog, - final Logger machineLog) + public static boolean runAndLog(final List<String> cmd, final Logger operationLog, + final Logger machineLog) throws StopException { - return new ProcessExecutionHelper(operationLog, machineLog).runAndLog(commandLine, 0L); + return new ProcessExecutionHelper(cmd, NO_TIMEOUT, DEFAULT_OUTPUT_READING_STRATEGY, + operationLog, machineLog).runAndLog(); } /** * Runs an Operating System process, specified by <var>cmd</var>. * - * @param commandLine The command line to run. + * @param cmd The command line to run. * @param operationLog The {@link Logger} to use for all message on the higher level. * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. * @return The process result. + * @throws StopException If the thread got interrupted. */ - public static ProcessResult run(final List<String> commandLine, final Logger operationLog, - final Logger machineLog) + public static ProcessResult run(final List<String> cmd, final Logger operationLog, + final Logger machineLog) throws StopException + { + return new ProcessExecutionHelper(cmd, NO_TIMEOUT, DEFAULT_OUTPUT_READING_STRATEGY, + operationLog, machineLog).run(true); + } + + /** + * Runs an Operating System process, specified by <var>cmd</var>. + * + * @param cmd The command line to run. + * @param operationLog The {@link Logger} to use for all message on the higher level. + * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. + * @param millisToWaitForCompletion The time to wait for the process to complete in + * milli-seconds. If the process is not finished after that time, it will be + * terminated by a watch dog. + * @return <code>true</code>, if the process did complete successfully, <code>false</code> + * otherwise. + * @throws StopException If the thread got interrupted. + */ + public static boolean runAndLog(final List<String> cmd, final Logger operationLog, + final Logger machineLog, final long millisToWaitForCompletion) throws StopException { - return new ProcessExecutionHelper(operationLog, machineLog).runWithoutWatchdog(commandLine); + return new ProcessExecutionHelper(cmd, millisToWaitForCompletion, + DEFAULT_OUTPUT_READING_STRATEGY, operationLog, machineLog).runAndLog(); } /** * Runs an Operating System process, specified by <var>cmd</var>. * * @param cmd The command line to run. + * @param operationLog The {@link Logger} to use for all message on the higher level. + * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. * @param millisToWaitForCompletion The time to wait for the process to complete in milli * seconds. If the process is not finished after that time, it will be terminated by * a watch dog. + * @return The process result. + * @throws StopException If the thread got interrupted. + */ + public static ProcessResult run(final List<String> cmd, final Logger operationLog, + final Logger machineLog, final long millisToWaitForCompletion) throws StopException + { + return new ProcessExecutionHelper(cmd, millisToWaitForCompletion, + DEFAULT_OUTPUT_READING_STRATEGY, operationLog, machineLog).run(true); + } + + /** + * Runs an Operating System process, specified by <var>cmd</var>. + * + * @param cmd The command line to run. * @param operationLog The {@link Logger} to use for all message on the higher level. * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. + * @param millisToWaitForCompletion The time to wait for the process to complete in + * milli-seconds. If the process is not finished after that time, it will be + * terminated by a watch dog. + * @param outputReadingStrategy The strategy for when to read the output (both + * <code>stdout</code> and <code>sterr</code>) of the process. * @return <code>true</code>, if the process did complete successfully, <code>false</code> * otherwise. + * @throws StopException If the thread got interrupted. */ - public static boolean runAndLog(final List<String> cmd, final long millisToWaitForCompletion, - final Logger operationLog, final Logger machineLog) + public static boolean runAndLog(final List<String> cmd, final Logger operationLog, + final Logger machineLog, final long millisToWaitForCompletion, + final OutputReadingStrategy outputReadingStrategy) throws StopException { - return new ProcessExecutionHelper(operationLog, machineLog).runAndLog(cmd, - millisToWaitForCompletion); + return new ProcessExecutionHelper(cmd, millisToWaitForCompletion, outputReadingStrategy, + operationLog, machineLog).runAndLog(); } /** * Runs an Operating System process, specified by <var>cmd</var>. * * @param cmd The command line to run. + * @param operationLog The {@link Logger} to use for all message on the higher level. + * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. * @param millisToWaitForCompletion The time to wait for the process to complete in milli * seconds. If the process is not finished after that time, it will be terminated by * a watch dog. - * @param operationLog The {@link Logger} to use for all message on the higher level. - * @param machineLog The {@link Logger} to use for all message on the lower (machine) level. + * @param outputReadingStrategy The strategy for when to read the output (both + * <code>stdout</code> and <code>sterr</code>) of the process. + * @param stopOnInterrupt If <code>true</code>, throw a {@link StopException} if the thread + * gets interrupted while waiting on the future. * @return The process result. + * @throws StopException If the thread got interrupted and <var>stopOnInterrupt</var> is + * <code>true</code>. */ - public static ProcessResult run(final List<String> cmd, final long millisToWaitForCompletion, - final Logger operationLog, final Logger machineLog) + public static ProcessResult run(final List<String> cmd, final Logger operationLog, + final Logger machineLog, final long millisToWaitForCompletion, + final OutputReadingStrategy outputReadingStrategy, final boolean stopOnInterrupt) + throws StopException { - return new ProcessExecutionHelper(operationLog, machineLog).runWithWatchdog(cmd, - millisToWaitForCompletion); + return new ProcessExecutionHelper(cmd, millisToWaitForCompletion, outputReadingStrategy, + operationLog, machineLog).run(stopOnInterrupt); } /** - * Returns <code>true</code> if the <var>exitValue</var> indicates that the process has been - * terminated on the Operating System level. + * Returns the name of the command represented by <var>commandLine</var>. */ - public static boolean isProcessTerminated(final int exitValue) + static String getCommandName(final List<String> commandLine) { - if (OSUtilities.isWindows()) - { - return exitValue == EXIT_VALUE_FOR_TERMINATION_WINDOWS; - } else - { - return exitValue == EXIT_VALUE_FOR_TERMINATION_UNIX; - } + return new File(commandLine.get(0)).getName(); } /** - * Returns the stdout (and stderr if {@link ProcessBuilder#redirectErrorStream(boolean)} has - * been called with <code>true</code>). + * Returns the command represented by <var>commandLine</var>. */ - public static List<String> readProcessOutputLines(final Process processOrNull, - final Logger machineLog) + private static String getCommand(final List<String> commandLine) + { + return StringUtils.join(commandLine, ' '); + } + + /** + * Returns the <code>stdout</code> (and <code>stderr</code> of the <var>process</var>. + */ + private final static List<String> readProcessOutputLines(final Process process, + final Logger machineLog, final boolean wait) { + assert process != null; + assert machineLog != null; + final List<String> processOutput = new ArrayList<String>(); - if (processOrNull == null) - { - return processOutput; - } final BufferedReader reader = - new BufferedReader(new InputStreamReader(processOrNull.getInputStream())); + new BufferedReader(new InputStreamReader(process.getInputStream())); try { - String ln; - while ((ln = reader.readLine()) != null) + while ((wait || reader.ready())) { - processOutput.add(ln); + final String line = reader.readLine(); + if (line == null) + { + break; + } + processOutput.add(line); } } catch (final IOException e) { - machineLog.warn(String.format("IOException when reading stdout, msg='%s'.", e + machineLog.warn(String.format("IOException when reading stdout/stderr, msg='%s'.", e .getMessage())); } finally { @@ -192,203 +275,185 @@ public final class ProcessExecutionHelper // Implementation // - private ProcessExecutionHelper(final Logger operationLog, final Logger machineLog) - { - this.operationLog = operationLog; - this.machineLog = machineLog; - } - - private final ProcessResult runWithWatchdog(final List<String> commandLine, - final long millisoWaitForCompletion) + /** + * The class that performs the actual calling and interaction with the Operating System process. + * Since we observed hangs of several process-related methods we call all of this in a separate + * thread. + */ + private class ProcessRunner implements NamedCallable<ProcessResult> { - assert millisoWaitForCompletion > 0L : "Unspecified time out."; - final ProcessWatchdog processWatchdog = new ProcessWatchdog(millisoWaitForCompletion); - Process process = null; - try - { - process = launchProcess(commandLine); - processWatchdog.start(process); - try - { - process.waitFor(); - processWatchdog.stop(); - return createResult(commandLine, process, processWatchdog.isProcessKilled(), - readProcessOutputLines(process, machineLog)); - } catch (final InterruptedException e) - { - operationLog.warn(String.format("Execution of %s interrupted after timeout.", - commandLine)); - return createResult(commandLine, process, processWatchdog.isProcessKilled(), - Collections.<String> emptyList()); - } - } catch (final IOException ex) - { - return createNotStartedResult(commandLine, ex); - } finally + private Process launch() throws IOException { - closeStreams(process); - if (process != null) + final ProcessBuilder processBuilder = new ProcessBuilder(commandLine); + processBuilder.redirectErrorStream(true); + if (operationLog.isDebugEnabled()) { - process.destroy(); + operationLog.debug("Running command: " + getCommand(commandLine)); } + final Process process = processBuilder.start(); + return process; } - } - private final ProcessResult runWithoutWatchdog(final List<String> commandLine) - { - Process process = null; - try + public ProcessResult call() throws Exception { - process = launchProcess(commandLine); try { - process.waitFor(); - return createResult(commandLine, process, false, readProcessOutputLines(process, - machineLog)); - } catch (final InterruptedException e) - { - operationLog.warn(String.format("Execution of %s interrupted after timeout.", - commandLine)); - return createResult(commandLine, process, true, Collections.<String> emptyList()); - } - } catch (final IOException ex) - { - return createNotStartedResult(commandLine, ex); - } finally - { - closeStreams(process); - if (process != null) + final Process process = launch(); + try + { + processWrapper.set(process); + final int exitValue = process.waitFor(); + if (processWrapper.getAndSet(null) == null) + { + // Value is irrelevant, the ProcessKiller got us. + return null; + } + List<String> processOutput = null; + if (OutputReadingStrategy.ALWAYS.equals(outputReadingStrategy) + || (OutputReadingStrategy.ON_ERROR.equals(outputReadingStrategy) && ProcessResult + .isProcessOK(exitValue) == false)) + { + processOutput = readProcessOutputLines(process, machineLog, true); + } + return new ProcessResult(commandLine, processNumber, ExecutionStatus.COMPLETE, + "", exitValue, processOutput, operationLog, machineLog); + } finally + { + IOUtils.closeQuietly(process.getErrorStream()); + IOUtils.closeQuietly(process.getInputStream()); + IOUtils.closeQuietly(process.getOutputStream()); + } + } catch (final Exception ex) { - process.destroy(); + machineLog.error("Exception when launching: " + ex.getMessage()); + throw ex; } } - } - private final Process launchProcess(final List<String> commandLine) throws IOException - { - final ProcessBuilder processBuilder = new ProcessBuilder(commandLine); - processBuilder.redirectErrorStream(true); - if (operationLog.isDebugEnabled()) + public String getCallableName() { - operationLog.debug("Executing command: " + commandLine); + return "run-P" + processNumber + "-{" + getCommandName(commandLine) + "}"; } - return processBuilder.start(); - } - - private final ProcessResult createNotStartedResult(final List<String> commandLine, - final IOException ex) - { - machineLog.error(String.format("Cannot execute executable %s", commandLine), ex); - return ProcessResult.createNotStarted(commandLine, operationLog, machineLog); } - private final ProcessResult createResult(final List<String> commandLine, - final Process processOrNull, final boolean isInterrupted, final List<String> outputLines) + /** + * The class that performs the destruction of a process that has timed-out. We do this in a + * separate thread because we have observed that, depending on the operating system and Java + * version, processes can hang indefinitely on launching. + */ + private class ProcessKiller implements NamedCallable<ProcessResult> { - if (processOrNull == null) - { - return ProcessResult.createNotStarted(commandLine, operationLog, machineLog); - } else + public ProcessResult call() { - final List<String> lines; - if (outputLines == null) + final Process process = processWrapper.getAndSet(null); + if (process != null) { - lines = readProcessOutputLines(processOrNull, machineLog); + List<String> processOutput = null; + if (OutputReadingStrategy.NEVER.equals(outputReadingStrategy) == false) + { + processOutput = readProcessOutputLines(process, machineLog, false); + } + process.destroy(); // Note: this also closes the I/O streams. + if (machineLog.isInfoEnabled()) + { + machineLog.info(String.format("Killed '" + getCommand(commandLine)) + "'."); + } + final int exitValue = getExitValue(process); + return new ProcessResult(commandLine, processNumber, ExecutionStatus.TIMED_OUT, "", + exitValue, processOutput, operationLog, machineLog); } else { - lines = outputLines; + return null; // Value signals that the ProcessRunner got us. } - if (isInterrupted) + } + + private int getExitValue(final Process process) + { + try { - return ProcessResult.createWaitingInterrupted(processOrNull, commandLine, - operationLog, machineLog, lines); - } else + return process.exitValue(); + } catch (final IllegalThreadStateException ex) { - return ProcessResult.create(processOrNull, commandLine, operationLog, machineLog, - lines); + return ProcessResult.NO_EXIT_VALUE; } } - } - private final boolean runAndLog(final List<String> cmd, final long millisToWaitForCompletion) - { - final ProcessResult result; - if (millisToWaitForCompletion > 0L) + public String getCallableName() { - result = runWithWatchdog(cmd, millisToWaitForCompletion); - } else - { - result = runWithoutWatchdog(cmd); + return "kill-P" + processNumber + "-{" + getCommandName(commandLine) + "}"; } - result.log(); - return result.isOK(); } - public final static void logProcessExecution(final String commandName, final int exitValue, - final List<String> processOutput, final Logger operationLog, final Logger machineLog) + private ProcessExecutionHelper(final List<String> commandLine, + final long millisToWaitForCompletion, + final OutputReadingStrategy outputReadingStrategy, final Logger operationLog, + final Logger machineLog) { - if (exitValue != EXIT_VALUE_OK) + this.processNumber = processCounter.getAndIncrement(); + this.operationLog = operationLog; + this.machineLog = machineLog; + // Backward compatibility. + if (millisToWaitForCompletion == IMMEDIATE_TIMEOUT) { - logProcessExitValue(Level.WARN, operationLog, commandName, exitValue); - logProcessOutput(Level.WARN, machineLog, commandName, processOutput); - } else if (operationLog.isDebugEnabled()) + this.millisToWaitForCompletion = NO_TIMEOUT; + } else { - logProcessExitValue(Level.DEBUG, operationLog, commandName, exitValue); - logProcessOutput(Level.DEBUG, machineLog, commandName, processOutput); + this.millisToWaitForCompletion = millisToWaitForCompletion; } + this.outputReadingStrategy = outputReadingStrategy; + this.commandLine = Collections.unmodifiableList(commandLine); + this.processWrapper = new AtomicReference<Process>(); } - private final static void logProcessExitValue(final Level logLevel, final Logger operationLog, - final String commandName, final int exitValue) + private ProcessResult run(final boolean stopOnInterrupt) { - assert logLevel != null; - assert operationLog != null; - assert commandName != null; - - if (isProcessTerminated(exitValue)) + final Future<ProcessResult> runnerFuture = executor.submit(new ProcessRunner()); + ExecutionResult<ProcessResult> result = + ConcurrencyUtilities.getResult(runnerFuture, millisToWaitForCompletion, false, + null, null); + if (result.getStatus() == ExecutionStatus.TIMED_OUT) + { + final Future<ProcessResult> killerFuture = executor.submit(new ProcessKiller()); + result = ConcurrencyUtilities.getResult(killerFuture, SHORT_TIMEOUT); + if (result.tryGetResult() == null) + { + result = ConcurrencyUtilities.getResult(runnerFuture, IMMEDIATE_TIMEOUT); + } + } + if (result.tryGetResult() != null) + { + return result.tryGetResult(); + } else if (stopOnInterrupt && ExecutionStatus.INTERRUPTED.equals(result.getStatus())) { - operationLog.log(logLevel, String.format("[%s] process was destroyed.", commandName)); + throw new StopException(); } else { - operationLog.log(logLevel, String.format("[%s] process returned with exit value %d.", - commandName, exitValue)); + return new ProcessResult(commandLine, processNumber, result.getStatus(), + tryGetStartupFailureMessage(result.tryGetException()), + ProcessResult.NO_EXIT_VALUE, null, operationLog, machineLog); } } - private final static void logProcessOutput(final Level logLevel, final Logger machineLog, - final String commandName, final List<String> processOutputLines) + private static String tryGetStartupFailureMessage(final Throwable throwableOrNull) { - assert logLevel != null; - assert machineLog != null; - assert commandName != null; - assert processOutputLines != null; - - if (processOutputLines.size() == 0) + if (throwableOrNull != null && throwableOrNull instanceof IOException) { - return; - } - machineLog.log(logLevel, String.format("[%s] output:", commandName)); - for (final String ln : processOutputLines) + return throwableOrNull.getMessage(); + } else { - if (ln.trim().length() > 0) - { - machineLog.log(logLevel, String.format("\"%s\"", ln)); - } + return null; } } - /** - * Close the streams belonging to given <var>Process</var>. - */ - private final static void closeStreams(final Process processOrNull) + private boolean runAndLog() throws StopException { - if (processOrNull == null) + final ProcessResult result = run(false); + result.log(); + if (result.isInterruped()) { - return; + throw new StopException(); } - IOUtils.closeQuietly(processOrNull.getInputStream()); - IOUtils.closeQuietly(processOrNull.getOutputStream()); - IOUtils.closeQuietly(processOrNull.getErrorStream()); + return result.isOK(); } } diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java index 830bd4e5a2d..8a9a113efb3 100644 --- a/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java +++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessResult.java @@ -16,95 +16,115 @@ package ch.systemsx.cisd.common.process; -import java.io.File; import java.util.Collections; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import ch.systemsx.cisd.common.concurrent.ExecutionStatus; +import ch.systemsx.cisd.common.utilities.OSUtilities; + /** * Class that keeps around the result of running an Operating System process. * <p> * Since the process output can only ever be read once from a process, it need to be kept around if * it is needed more than once. This is what this class is good for. - * </p> */ public final class ProcessResult { - private final boolean hasBlocked; + /** + * The value indicating the process execution went OK. + */ + public static final int EXIT_VALUE_OK = 0; + + /** + * The value indicating that there is no exit value available for a process execution. + */ + public static final int NO_EXIT_VALUE = -1; + + /** + * The exit value returned by {@link Process#waitFor()} if the process was terminated by + * {@link Process#destroy()} on a MS Windows machine. + */ + private static final int EXIT_VALUE_FOR_TERMINATION_WINDOWS = 1; + + /** + * The exit value returned by {@link Process#waitFor()} if the process was terminated by + * {@link Process#destroy()} on a UNIX machine. + */ + private static final int EXIT_VALUE_FOR_TERMINATION_UNIX = 143; + + private final ExecutionStatus status; + + private final String startupFailureMessage; + + private final int exitValue; private final List<String> commandLine; private final String commandName; + private final int processNumber; + private final Logger operationLog; private final Logger machineLog; - private final int exitValue; - - private final List<String> outputLines; + private final boolean outputAvailable; - private final boolean run; + private final List<String> output; /** - * Creates a <code>ProcessResult</code> for a process which normally terminates. + * Returns <code>true</code> if the <var>exitValue</var> indicates that the process has been + * terminated on the Operating System level. */ - public final static ProcessResult create(final Process process, final List<String> commandLine, - final Logger operationLog, final Logger machineLog, final List<String> outputLines) + public static boolean isProcessTerminated(final int exitValue) { - return new ProcessResult(process, false, commandLine, operationLog, machineLog, outputLines); - } - - /** - * Creates a <code>ProcessResult</code> for a process which did not start at all. - */ - public static ProcessResult createNotStarted(final List<String> commandLine, - final Logger operationLog, final Logger machineLog) - { - return new ProcessResult(null, false, commandLine, operationLog, machineLog, Collections - .<String> emptyList()); + if (OSUtilities.isWindows()) + { + return exitValue == ProcessResult.EXIT_VALUE_FOR_TERMINATION_WINDOWS; + } else + { + return exitValue == ProcessResult.EXIT_VALUE_FOR_TERMINATION_UNIX; + } } - /** - * Creates a <code>ProcessResult</code> for a process which blocked and could not be - * terminated. So we stopped waiting for it. - */ - public static ProcessResult createWaitingInterrupted(final Process process, - final List<String> commandLine, final Logger operationLog, final Logger machineLog, - final List<String> outputLines) + public static boolean isProcessOK(final int exitValue) { - return new ProcessResult(process, true, commandLine, operationLog, machineLog, outputLines); + return (exitValue == EXIT_VALUE_OK); } - private ProcessResult(final Process processOrNull, final boolean hasBlocked, - final List<String> commandLine, final Logger operationLog, final Logger machineLog, - final List<String> outputLines) + ProcessResult(final List<String> commandLine, final int processNumber, final ExecutionStatus status, + final String startupFailureMessageOrNull, final int exitValue, final List<String> processOutputOrNull, + final Logger operationLog, final Logger machineLog) { this.commandLine = commandLine; - this.commandName = new File(commandLine.get(0)).getName(); - this.hasBlocked = hasBlocked; - this.operationLog = operationLog; - this.machineLog = machineLog; - this.exitValue = getExitValue(processOrNull, hasBlocked); - this.outputLines = outputLines; - this.run = processOrNull != null; - } + this.commandName = ProcessExecutionHelper.getCommandName(commandLine); + this.processNumber = processNumber; + this.status = status; + this.startupFailureMessage = + (startupFailureMessageOrNull == null) ? "" : startupFailureMessageOrNull; + this.exitValue = exitValue; + this.outputAvailable = (processOutputOrNull != null); + if (outputAvailable) + { + this.output = Collections.unmodifiableList(processOutputOrNull); - private final static int getExitValue(final Process processOrNull, final boolean hasBlocked) - { - if (processOrNull != null && hasBlocked == false) + } else { - return processOrNull.exitValue(); + this.output = Collections.emptyList(); + } - return ProcessExecutionHelper.NO_EXIT_VALUE; + this.operationLog = operationLog; + this.machineLog = machineLog; } /** * Returns the command line that belongs to this process. */ - public final List<String> getCommandLine() + public List<String> getCommandLine() { return commandLine; } @@ -112,57 +132,102 @@ public final class ProcessResult /** * Returns the name of the command that belongs to this process. */ - public final String getCommandName() + public String getCommandName() { return commandName; } /** - * Returns the lines of the process output. + * Returns a number identifying the process that this result is for. + */ + public int getProcessNumber() + { + return processNumber; + } + + /** + * Returns <code>true</code> if the output (<code>stdout</code> and <code>stderr</code>) + * is available (note that even if it available it may still be empty). */ - public final List<String> getProcessOutput() + public boolean isOutputAvailable() { - return outputLines; + return outputAvailable; } - public final int exitValue() + /** + * Returns the output of the process (<code>stdout</code> and <code>stderr</code>). If it + * not available (see {@link #isOutputAvailable()}, an empty list is returned. + */ + public List<String> getOutput() + { + return output; + } + + /** + * Returns the exit value of the process, or {@link #NO_EXIT_VALUE}, if the value is not + * available. + */ + public int getExitValue() { return exitValue; } - public final boolean isOK() + /** + * Returns the message that was given when the process failed to startup. If the process didn't + * fail on startup, an empty String is returned. + */ + public String getStartupFailureMessage() + { + return startupFailureMessage; + } + + /** + * Returns <code>true</code>, if the process has completed successfully. + */ + public boolean isOK() { - return exitValue() == ProcessExecutionHelper.EXIT_VALUE_OK; + return isProcessOK(exitValue); } /** * Returns <code>true</code> if the process has been run at all. */ - public final boolean isRun() + public boolean isRun() + { + return StringUtils.isBlank(startupFailureMessage); + } + + /** + * Returns <code>true</code> if the process has been terminated on the Operating System level. + */ + public boolean isTerminated() { - return run; + return ProcessResult.isProcessTerminated(getExitValue()); } /** - * Returns <code>true</code> if the process could not been terminated after the timeout and we - * stopped waiting for it. + * Returns <code>true</code>, if the process has timed out on the Java level. */ - public final boolean hasBlocked() + public boolean isTimedOut() { - return hasBlocked; + return ExecutionStatus.TIMED_OUT.equals(status); } /** - * Returns <code>true</code> if the process has been terminated on the <i>Operating System</i> - * level. + * Returns <code>true</code>, if the Java thread that the process was running in got + * interrupted. */ - public final boolean isTerminated() + public boolean isInterruped() { - return ProcessExecutionHelper.isProcessTerminated(exitValue()); + return ExecutionStatus.INTERRUPTED.equals(status); } - public final void log() + /** + * Logs the outcome of the process execution. + */ + public void log() { + if (isOK() == false) { logProcessExitValue(Level.WARN); @@ -174,31 +239,37 @@ public final class ProcessResult } } - private final void logProcessExitValue(final Level logLevel) + private void logProcessExitValue(final Level logLevel) { if (isRun() == false) { - operationLog - .log(logLevel, String.format("[%s] process could not be run.", commandName)); - } else if (isTerminated()) + operationLog.log(logLevel, String.format("P%d-{%s} process has not started up: '%s'.", + processNumber, commandName, startupFailureMessage)); + } else if (isTimedOut()) { - operationLog.log(logLevel, String.format("[%s] process was destroyed.", commandName)); - } else if (hasBlocked()) + operationLog.log(logLevel, String.format("P%d-{%s} process has timed out.", + processNumber, commandName)); + } else if (isInterruped()) { - operationLog.log(logLevel, String.format( - "[%s] process has blocked and could not be destroyed.", commandName)); + operationLog.log(logLevel, String.format("P%d-{%s} thread was interrupted.", + processNumber, commandName)); + } else if (isTerminated()) + { + operationLog.log(logLevel, String.format("P%d-{%s} process was terminated.", + processNumber, commandName)); } else { - operationLog.log(logLevel, String.format("[%s] process returned with exit value %d.", - commandName, exitValue())); + operationLog.log(logLevel, String.format( + "P%d-{%s} process returned with exit value %d.", processNumber, commandName, + getExitValue())); } } - private final void logProcessOutput(final Level logLevel) + private void logProcessOutput(final Level logLevel) { assert logLevel != null; - final List<String> processOutputLines = getProcessOutput(); + final List<String> processOutputLines = getOutput(); if (processOutputLines.size() == 0) { return; diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessRunner.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessRunner.java index fd930f64952..5e6575c03c5 100644 --- a/common/source/java/ch/systemsx/cisd/common/process/ProcessRunner.java +++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessRunner.java @@ -16,6 +16,7 @@ package ch.systemsx.cisd.common.process; +import ch.systemsx.cisd.common.concurrent.StopException; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; /** @@ -39,6 +40,7 @@ public final class ProcessRunner int counter = 0; do { + StopException.check(); process.run(); if (counter > 0 && millisToSleepOnFailure > 0) { diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessWatchdog.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessWatchdog.java index df5cacda050..acebe1ebe8e 100644 --- a/common/source/java/ch/systemsx/cisd/common/process/ProcessWatchdog.java +++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessWatchdog.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; * * @author Christian Ribeaud */ +@Deprecated public final class ProcessWatchdog implements Runnable { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/RecursiveHardLinkMaker.java b/common/source/java/ch/systemsx/cisd/common/utilities/RecursiveHardLinkMaker.java index 49f7e3b2886..e14f247bca6 100644 --- a/common/source/java/ch/systemsx/cisd/common/utilities/RecursiveHardLinkMaker.java +++ b/common/source/java/ch/systemsx/cisd/common/utilities/RecursiveHardLinkMaker.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; +import ch.systemsx.cisd.common.concurrent.StopException; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.process.IProcess; @@ -187,6 +188,7 @@ public final class RecursiveHardLinkMaker implements IPathImmutableCopier { for (final File file : files) { + StopException.check(); if (tryMakeCopy(file, dir, null) == null) { return null; @@ -244,8 +246,8 @@ public final class RecursiveHardLinkMaker implements IPathImmutableCopier public boolean run() { boolean result = - ProcessExecutionHelper.runAndLog(cmd, singleFileLinkTimeout - .getMillisToWaitForCompletion(), operationLog, machineLog); + ProcessExecutionHelper.runAndLog(cmd, operationLog, machineLog, + singleFileLinkTimeout.getMillisToWaitForCompletion()); // NOTE: we have noticed that sometimes the result is false although the file // have been copied if (result == false && destFile.exists() @@ -269,6 +271,7 @@ public final class RecursiveHardLinkMaker implements IPathImmutableCopier private static boolean checkIfIdenticalContent(final File file1, final File file2) { + StopException.check(); try { return FileUtils.contentEquals(file1, file2); diff --git a/common/source/java/org/apache/commons/io/DestroyableFileSystemUtils.java b/common/source/java/org/apache/commons/io/DestroyableFileSystemUtils.java index a1d992e4063..ae71fcbd6d6 100644 --- a/common/source/java/org/apache/commons/io/DestroyableFileSystemUtils.java +++ b/common/source/java/org/apache/commons/io/DestroyableFileSystemUtils.java @@ -34,6 +34,7 @@ import ch.systemsx.cisd.common.process.ProcessWatchdog; * * @author Christian Ribeaud */ +@Deprecated public final class DestroyableFileSystemUtils extends FileSystemUtils { diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java index bfa5c0bcce4..807ba598dd5 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java @@ -21,10 +21,8 @@ import static org.testng.AssertJUnit.*; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -42,39 +40,52 @@ public class ConcurrencyUtilitiesTest private final static String name = "This is the pool name"; - private ThreadPoolExecutor eservice; - @BeforeClass public void init() { LogInitializer.init(); - eservice = (ThreadPoolExecutor) ConcurrencyUtilities.newNamedPool(name, 1, 2); } @Test - public void testNewNamedPool() throws Throwable + public void testTryGetFutureOK() { - assertEquals(1, eservice.getCorePoolSize()); - assertEquals(2, eservice.getMaximumPoolSize()); - final Future<?> future = eservice.submit(new Runnable() + final String valueProvided = "This is the execution return value"; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Future<String> future = eservice.submit(new Callable<String>() { - public void run() + public String call() throws Exception { - assertEquals(name + " 1", Thread.currentThread().getName()); + return valueProvided; } }); - try - { - future.get(200L, TimeUnit.MILLISECONDS); - } catch (ExecutionException ex) - { - throw ex.getCause(); - } + final String valueObtained = ConcurrencyUtilities.tryGetResult(future, 200L); + assertEquals(valueProvided, valueObtained); + assertTrue(future.isDone()); } @Test + public void testGetExecutionResultOK() + { + final String valueProvided = "This is the execution return value"; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Future<String> future = eservice.submit(new Callable<String>() + { + public String call() throws Exception + { + return valueProvided; + } + }); + final ExecutionResult<String> result = ConcurrencyUtilities.getResult(future, 200L); + assertEquals(ExecutionStatus.COMPLETE, result.getStatus()); + assertNull(result.tryGetException()); + assertEquals(valueProvided, result.tryGetResult()); + assertTrue(future.isDone()); + } + + @Test(groups = "slow") public void testTryGetFutureTimeout() { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -94,9 +105,63 @@ public class ConcurrencyUtilitiesTest assertTrue(future.isDone()); } + @Test(groups = "slow") + public void testGetExecutionResultTimeout() + { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Future<String> future = eservice.submit(new Callable<String>() + { + public String call() throws Exception + { + try + { + Thread.sleep(200L); + } catch (InterruptedException ex) + { + throw new CheckedExceptionTunnel(ex); + } + return null; + } + }); + final ExecutionResult<String> result = ConcurrencyUtilities.getResult(future, 20L); + assertEquals(ExecutionStatus.TIMED_OUT, result.getStatus()); + assertNull(result.tryGetResult()); + assertNull(result.tryGetException()); + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + } + + @Test(groups = "slow") + public void testGetExecutionResultTimeoutWithoutCancelation() + { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Future<String> future = eservice.submit(new Callable<String>() + { + public String call() throws Exception + { + try + { + Thread.sleep(200L); + } catch (InterruptedException ex) + { + throw new CheckedExceptionTunnel(ex); + } + return null; + } + }); + final ExecutionResult<String> result = + ConcurrencyUtilities.getResult(future, 20L, false, null, null); + assertEquals(ExecutionStatus.TIMED_OUT, result.getStatus()); + assertNull(result.tryGetResult()); + assertNull(result.tryGetException()); + assertFalse(future.isDone()); + assertFalse(future.isCancelled()); + } + @Test public void testTryGetFutureInterrupted() { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); final Thread thread = Thread.currentThread(); final Future<String> future = eservice.submit(new Callable<String>() { @@ -121,21 +186,109 @@ public class ConcurrencyUtilitiesTest thread.interrupt(); } }, 20L); - final String shouldBeNull = ConcurrencyUtilities.tryGetResult(future, 200L); + final String shouldBeNull = ConcurrencyUtilities.tryGetResult(future, 200L, false); t.cancel(); assertNull(shouldBeNull); assertTrue(future.isCancelled()); assertFalse(Thread.interrupted()); } + @Test(expectedExceptions = { StopException.class }) + public void testTryGetFutureStop() + { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Thread thread = Thread.currentThread(); + final Future<String> future = eservice.submit(new Callable<String>() + { + public String call() throws Exception + { + try + { + Thread.sleep(200L); + } catch (InterruptedException ex) + { + throw new CheckedExceptionTunnel(ex); + } + return null; + } + }); + final Timer t = new Timer(); + t.schedule(new TimerTask() + { + @Override + public void run() + { + thread.interrupt(); + } + }, 20L); + // Supposed to throw a StopException + ConcurrencyUtilities.tryGetResult(future, 200L); + } + + @Test + public void testGetExecutionResultInterrupted() + { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Thread thread = Thread.currentThread(); + final Future<String> future = eservice.submit(new Callable<String>() + { + public String call() throws Exception + { + try + { + Thread.sleep(200L); + } catch (InterruptedException ex) + { + throw new CheckedExceptionTunnel(ex); + } + return null; + } + }); + final Timer t = new Timer(); + t.schedule(new TimerTask() + { + @Override + public void run() + { + thread.interrupt(); + } + }, 20L); + final ExecutionResult<String> result = ConcurrencyUtilities.getResult(future, 200L); + t.cancel(); + assertEquals(ExecutionStatus.INTERRUPTED, result.getStatus()); + assertNull(result.tryGetResult()); + assertNull(result.tryGetException()); + assertTrue(future.isCancelled()); + assertFalse(Thread.interrupted()); + } + private static class TaggedException extends RuntimeException { private static final long serialVersionUID = 1L; } + @Test + public void testGetExecutionResultException() + { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final Future<String> future = eservice.submit(new Callable<String>() + { + public String call() throws Exception + { + throw new TaggedException(); + } + }); + final ExecutionResult<String> result = ConcurrencyUtilities.getResult(future, 100L); + assertEquals(ExecutionStatus.EXCEPTION, result.getStatus()); + assertTrue(result.tryGetException() instanceof TaggedException); + assertNull(result.tryGetResult()); + assertTrue(future.isDone()); + } + @Test public void testTryGetFutureException() { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java new file mode 100644 index 00000000000..3aa75557ea1 --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java @@ -0,0 +1,250 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.common.logging.LogInitializer; + +/** + * Test cases for the {@link NamingThreadPoolExecutor}. + * + * @author Bernd Rinn + */ +public class NamingThreadPoolExecutorTest +{ + + private final static String name = "This is the pool name"; + + @BeforeClass + public void init() + { + LogInitializer.init(); + } + + @Test + public void testNamedPool() throws Throwable + { + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + assertEquals(1, eservice.getCorePoolSize()); + assertEquals(2, eservice.getMaximumPoolSize()); + final Future<?> future = eservice.submit(new Runnable() + { + public void run() + { + assertEquals(name + "-T1", Thread.currentThread().getName()); + } + }); + try + { + future.get(200L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } + } + + @Test(groups = "slow") + public void testThreadDefaultNames() throws Throwable + { + final int max = 10; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, max, max); + assertEquals(max, eservice.getCorePoolSize()); + assertEquals(max, eservice.getMaximumPoolSize()); + final Set<String> expectedThreadNameSet = new HashSet<String>(); + for (int i = 1; i <= max; ++i) + { + expectedThreadNameSet.add(name + "-T" + i); + } + final Set<String> threadNameSet = Collections.synchronizedSet(new HashSet<String>()); + final Set<Future<?>> futureSet = new HashSet<Future<?>>(); + for (int i = 0; i < max; ++i) + { + futureSet.add(eservice.submit(new Runnable() + { + public void run() + { + threadNameSet.add(Thread.currentThread().getName()); + try + { + Thread.sleep(20L); + } catch (InterruptedException ex) + { + fail("We got interrupted."); + } + } + })); + } + for (Future<?> future : futureSet) + { + try + { + future.get(400L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } + } + assertEquals(expectedThreadNameSet, threadNameSet); + } + + @Test(groups = "slow") + public void testSubmitNamedRunnable() throws Throwable + { + final String runnableName = "This is the special runnable name"; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + assertEquals(1, eservice.getCorePoolSize()); + assertEquals(1, eservice.getMaximumPoolSize()); + final Future<?> future = eservice.submit(new NamedRunnable() + { + public void run() + { + assertEquals(name + "-T1::" + runnableName, Thread.currentThread().getName()); + } + + public String getRunnableName() + { + return runnableName; + } + }); + try + { + future.get(200L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } + } + + @Test(groups = "slow") + public void testExecuteNamedRunnable() throws Throwable + { + final String runnableName = "This is the special runnable name"; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + assertEquals(1, eservice.getCorePoolSize()); + assertEquals(1, eservice.getMaximumPoolSize()); + final Semaphore sem = new Semaphore(0); + eservice.execute(new NamedRunnable() + { + public void run() + { + assertEquals(name + "-T1::" + runnableName, Thread.currentThread().getName()); + sem.release(); + } + + public String getRunnableName() + { + return runnableName; + } + }); + assertTrue(sem.tryAcquire(200L, TimeUnit.MILLISECONDS)); + } + + @Test(groups = "slow") + public void testSubmitNamedCallable() throws Throwable + { + final String callableName = "This is the special callable name"; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + assertEquals(1, eservice.getCorePoolSize()); + assertEquals(1, eservice.getMaximumPoolSize()); + final Future<?> future = eservice.submit(new NamedCallable<Object>() + { + public Object call() throws Exception + { + assertEquals(name + "-T1::" + callableName, Thread.currentThread().getName()); + return null; + } + + public String getCallableName() + { + return callableName; + } + }); + try + { + future.get(200L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } + } + + @Test(groups = "slow") + public void testSubmitNamedCallables() throws Throwable + { + final String callableName1 = "This is the first special callable name"; + final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + assertEquals(1, eservice.getCorePoolSize()); + assertEquals(1, eservice.getMaximumPoolSize()); + final Future<?> future1 = eservice.submit(new NamedCallable<Object>() + { + public Object call() throws Exception + { + assertEquals(name + "-T1::" + callableName1, Thread.currentThread().getName()); + return null; + } + + public String getCallableName() + { + return callableName1; + } + }); + try + { + future1.get(200L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } + final String callableName2 = "This is the second special callable name"; + final Future<?> future2 = eservice.submit(new NamedCallable<Object>() + { + public Object call() throws Exception + { + assertEquals(name + "-T1::" + callableName2, Thread.currentThread().getName()); + return null; + } + + public String getCallableName() + { + return callableName2; + } + }); + try + { + future2.get(200L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } + } + +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TimerUtilitiesTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TimerUtilitiesTest.java new file mode 100644 index 00000000000..42ff35d44d8 --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TimerUtilitiesTest.java @@ -0,0 +1,240 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import static org.testng.AssertJUnit.*; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.Test; + +/** + * Test cases for the {@link TimerUtilities}. + * <p> + * Note that the semaphores are dual-use as synchronization barrier (ensure the tasks are running + * when we try to interact with its thread) and as probes for success or failure. + * + * @author Bernd Rinn + */ +public class TimerUtilitiesTest +{ + @Test + public void testOperational() + { + assertTrue(TimerUtilities.isOperational()); + } + + @Test + public void testInterrupt() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + @Override + public void run() + { + sem.release(); + try + { + Thread.sleep(200L); + fail("should have been interrupted."); + } catch (InterruptedException ex) + { + // That is expected. + sem.release(); + } + } + }; + timer.schedule(task, 0L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + assertTrue(TimerUtilities.tryInterruptTimerThread(timer)); + assertTrue(sem.tryAcquire(100L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testJoin() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + @Override + public void run() + { + sem.release(); + // We immediately return. + } + }; + timer.schedule(task, 50L, 1000L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + timer.cancel(); + assertTrue(TimerUtilities.tryJoinTimerThread(timer, 200L)); + } + + @Test + public void testJoinFailed() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + @Override + public void run() + { + sem.release(); + try + { + Thread.sleep(50L); + } catch (InterruptedException ex) + { + throw new AssertionError("Unexpected interrupt."); + } + } + }; + timer.schedule(task, 0L, 200L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + assertFalse(TimerUtilities.tryJoinTimerThread(timer, 100L)); + } + + @Test + public void testInterruptAndJoin() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + @Override + public void run() + { + sem.release(); + try + { + Thread.sleep(200L); + fail("should have been interrupted."); + } catch (InterruptedException ex) + { + // That is expected, signal success. + sem.release(); + } + } + }; + timer.schedule(task, 0L, 1000L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + timer.cancel(); + assertTrue(TimerUtilities.tryInterruptTimerThread(timer)); + assertTrue(TimerUtilities.tryJoinTimerThread(timer, 100L)); + assertTrue(sem.tryAcquire()); + } + + @Test + public void testInterruptAndJoinFailed() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + @Override + public void run() + { + sem.release(); + try + { + Thread.sleep(200L); + fail("should have been interrupted."); + } catch (InterruptedException ex) + { + // That is expected, signal success. + sem.release(); + } + } + }; + timer.schedule(task, 0L, 1000L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + // Here we would need a timer.cancel() to make the join succeed + assertTrue(TimerUtilities.tryInterruptTimerThread(timer)); + assertFalse(TimerUtilities.tryJoinTimerThread(timer, 100L)); + assertTrue(sem.tryAcquire()); + } + + @Test + public void testShutdown() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + @Override + public void run() + { + sem.release(); + try + { + Thread.sleep(200L); + fail("should have been interrupted."); + } catch (InterruptedException ex) + { + // That is expected, signal success. + sem.release(); + } + } + }; + timer.schedule(task, 0L, 1000L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + assertTrue(TimerUtilities.tryShutdownTimer(timer, 100L)); + assertTrue(sem.tryAcquire()); + } + + @Test + public void testShutdownWithStop() throws InterruptedException + { + final Semaphore sem = new Semaphore(0); + final Timer timer = new Timer(); + final TimerTask task = new TimerTask() + { + private void keepWheelsSpinning() + { + do + { + // Nothing. + } while (true); + } + + @Override + public void run() + { + sem.release(); + try + { + keepWheelsSpinning(); + fail("should have been stopped."); + } catch (StopException ex) + { + // That is expected, signal success. + sem.release(); + } + } + }; + timer.schedule(task, 0L, 1000L); + sem.acquire(); // Ensure we don't cancel() before the task is running. + assertTrue(TimerUtilities.tryShutdownTimer(timer, 50L)); + assertTrue(sem.tryAcquire()); + } +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/process/ProcessExecutionHelperTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/process/ProcessExecutionHelperTest.java index 30a9de40e0c..488fa56be23 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/process/ProcessExecutionHelperTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/process/ProcessExecutionHelperTest.java @@ -23,6 +23,8 @@ import static org.testng.AssertJUnit.assertTrue; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Timer; +import java.util.TimerTask; import org.apache.log4j.Logger; import org.testng.annotations.BeforeClass; @@ -30,9 +32,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import ch.systemsx.cisd.common.collections.CollectionIO; +import ch.systemsx.cisd.common.concurrent.StopException; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogInitializer; +import ch.systemsx.cisd.common.process.ProcessExecutionHelper.OutputReadingStrategy; /** * Test cases for the {@link ProcessExecutionHelper}. @@ -74,10 +78,13 @@ public class ProcessExecutionHelperTest return createExecutable(name, "#! /bin/sh", "exit " + exitValue); } + private final String sleepyMessage = "I am feeling sooo sleepy..."; + private File createSleepingExecutable(String name, long millisToSleep) throws IOException, InterruptedException { - return createExecutable(name, "#! /bin/sh", "sleep " + (millisToSleep / 1000.0f), "exit 0"); + return createExecutable(name, "#! /bin/sh", "echo " + sleepyMessage, "sleep " + + (millisToSleep / 1000.0f), "exit 0"); } @BeforeClass @@ -98,9 +105,9 @@ public class ProcessExecutionHelperTest @Test(groups = { "requires_unix" }) - public void testExecutionOKWithoutWatchDog() throws Exception + public void testExecutionOKWithoutTimeOut() throws Exception { - final File dummyExec = createExecutable("dummy.sh", 0); + final File dummyExec = createExecutable("dummyOKWithoutTimeOut.sh", 0); final boolean ok = ProcessExecutionHelper.runAndLog(Arrays.asList(dummyExec.getAbsolutePath()), operationLog, machineLog); @@ -109,9 +116,9 @@ public class ProcessExecutionHelperTest @Test(groups = { "requires_unix" }) - public void testExecutionFailedWithoutWatchDog() throws Exception + public void testExecutionFailedWithoutTimeOut() throws Exception { - final File dummyExec = createExecutable("dummy.sh", 1); + final File dummyExec = createExecutable("dummyFailingWithoutTimeOut.sh", 1); final boolean ok = ProcessExecutionHelper.runAndLog(Arrays.asList(dummyExec.getAbsolutePath()), operationLog, machineLog); @@ -120,58 +127,111 @@ public class ProcessExecutionHelperTest @Test(groups = { "requires_unix" }) - public void testExecutionOKWithWatchDog() throws Exception + public void testExecutionOKWithTimeOut() throws Exception { - final File dummyExec = createExecutable("dummy.sh", 0); + final File dummyExec = createExecutable("dummyOKWithTimeOut.sh", 0); final boolean ok = ProcessExecutionHelper.runAndLog(Arrays.asList(dummyExec.getAbsolutePath()), - WATCHDOG_WAIT_MILLIS, operationLog, machineLog); + operationLog, machineLog, WATCHDOG_WAIT_MILLIS); assertTrue(ok); } @Test(groups = { "requires_unix" }) - public void testExecutionFailedWithWatchDog() throws Exception + public void testExecutionFailedWithTimeOut() throws Exception { - final File dummyExec = createExecutable("dummy.sh", 1); + final File dummyExec = createExecutable("dummyFailingWithTimeOut.sh", 1); final boolean ok = ProcessExecutionHelper.runAndLog(Arrays.asList(dummyExec.getAbsolutePath()), - WATCHDOG_WAIT_MILLIS, operationLog, machineLog); + operationLog, machineLog, WATCHDOG_WAIT_MILLIS); assertFalse(ok); } @Test(groups = { "requires_unix", "slow" }) - public void testExecutionOKWithWatchDogWaiting() throws Exception + public void testSleepyExecutionOKWithTimeOut() throws Exception { - final File dummyExec = createSleepingExecutable("dummy.sh", WATCHDOG_WAIT_MILLIS / 2); - final boolean ok = - ProcessExecutionHelper.runAndLog(Arrays.asList(dummyExec.getAbsolutePath()), - WATCHDOG_WAIT_MILLIS, operationLog, machineLog); - assertTrue(ok); + final File dummyExec = + createSleepingExecutable("dummySleepyOKWithTimeOut.sh", WATCHDOG_WAIT_MILLIS / 2); + final ProcessResult result = + ProcessExecutionHelper.run(Arrays.asList(dummyExec.getAbsolutePath()), + operationLog, machineLog, WATCHDOG_WAIT_MILLIS); + assertTrue(result.isOK()); + assertEquals(0, result.getOutput().size()); + } + + @Test(groups = + { "requires_unix", "slow" }, expectedExceptions = + { StopException.class }) + public void testSleepyExecutionGetsStopped() throws Exception + { + final Thread thisThread = Thread.currentThread(); + final Timer timer = new Timer(); + try + { + timer.schedule(new TimerTask() + { + @Override + public void run() + { + thisThread.interrupt(); + } + }, WATCHDOG_WAIT_MILLIS / 10); + final File dummyExec = + createSleepingExecutable("dummySleepyFailedWithTimeOut.sh", + 2 * WATCHDOG_WAIT_MILLIS); + ProcessExecutionHelper.run(Arrays.asList(dummyExec.getAbsolutePath()), operationLog, + machineLog, WATCHDOG_WAIT_MILLIS); + } finally + { + timer.cancel(); + } } @Test(groups = { "requires_unix", "slow" }) - public void testExecutionFailedWithWatchDogHitting() throws Exception + public void testSleepyExecutionGetsInterrupted() throws Exception { - final File dummyExec = createSleepingExecutable("dummy.sh", 2 * WATCHDOG_WAIT_MILLIS); - final boolean ok = - ProcessExecutionHelper.runAndLog(Arrays.asList(dummyExec.getAbsolutePath()), - WATCHDOG_WAIT_MILLIS, operationLog, machineLog); - assertFalse(ok); + final Thread thisThread = Thread.currentThread(); + final Timer timer = new Timer(); + try + { + timer.schedule(new TimerTask() + { + @Override + public void run() + { + thisThread.interrupt(); + } + }, WATCHDOG_WAIT_MILLIS / 10); + final File dummyExec = + createSleepingExecutable("dummySleepyFailedWithTimeOut.sh", + 2 * WATCHDOG_WAIT_MILLIS); + final ProcessResult result = + ProcessExecutionHelper.run(Arrays.asList(dummyExec.getAbsolutePath()), + operationLog, machineLog, WATCHDOG_WAIT_MILLIS, + ProcessExecutionHelper.DEFAULT_OUTPUT_READING_STRATEGY, false); + assertTrue(result.isInterruped()); + } finally + { + timer.cancel(); + } } @Test(groups = { "requires_unix", "slow" }) - public void testTryExecutionFailedWithWatchDogHitting() throws Exception + public void testSleepyExecutionFailedWithTimeOut() throws Exception { - final File dummyExec = createSleepingExecutable("dummy.sh", 2 * WATCHDOG_WAIT_MILLIS); + final File dummyExec = + createSleepingExecutable("dummySleepyFailedWithTimeOut.sh", + 2 * WATCHDOG_WAIT_MILLIS); final ProcessResult result = ProcessExecutionHelper.run(Arrays.asList(dummyExec.getAbsolutePath()), - WATCHDOG_WAIT_MILLIS, operationLog, machineLog); - assertTrue(result.hasBlocked() - || ProcessExecutionHelper.isProcessTerminated(result.exitValue())); + operationLog, machineLog, WATCHDOG_WAIT_MILLIS); + assertTrue(result.isTimedOut()); + assertFalse(result.isOK()); + assertEquals(1, result.getOutput().size()); + assertEquals(sleepyMessage, result.getOutput().get(0)); } @Test(groups = @@ -187,15 +247,30 @@ public class ProcessExecutionHelperTest + stdout2, "echo " + stderr2); final ProcessResult result = ProcessExecutionHelper.run(Arrays.asList(dummyExec.getAbsolutePath()), - operationLog, machineLog); - final int exitValue = result.exitValue(); + operationLog, machineLog, ProcessExecutionHelper.NO_TIMEOUT, + OutputReadingStrategy.ALWAYS, false); + final int exitValue = result.getExitValue(); assertEquals(0, exitValue); result.log(); - assertEquals(4, result.getProcessOutput().size()); - assertEquals(stdout1, result.getProcessOutput().get(0)); - assertEquals(stderr1, result.getProcessOutput().get(1)); - assertEquals(stdout2, result.getProcessOutput().get(2)); - assertEquals(stderr2, result.getProcessOutput().get(3)); + assertTrue(result.isOutputAvailable()); + assertEquals(4, result.getOutput().size()); + assertEquals(stdout1, result.getOutput().get(0)); + assertEquals(stderr1, result.getOutput().get(1)); + assertEquals(stdout2, result.getOutput().get(2)); + assertEquals(stderr2, result.getOutput().get(3)); + } + + @Test(groups = "requires_unix") + public void testStartupFailed() + { + final ProcessResult result = + ProcessExecutionHelper.run(Arrays.asList("some_non_existent_executable"), + operationLog, machineLog); + result.log(); + assertFalse(result.isRun()); + assertTrue(result.getStartupFailureMessage().indexOf( + "some_non_existent_executable: not found") >= 0); + System.out.println("Startup failure: " + result.getStartupFailureMessage()); } } -- GitLab