diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java index 4e862c0aa310704cbd4232906f2c130125ae2885..59bfaad224882b6f3dfd940e2eb11d3f2b4d09c6 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java @@ -16,7 +16,9 @@ package ch.systemsx.cisd.common.concurrent; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -305,6 +307,62 @@ public final class ConcurrencyUtilities } } + /** + * Submits the <var>callable</var> to the <var>executor</var>. + * + * @return A future which allows to terminate the task even when running. + * @see ExecutorService#submit(Callable) + */ + public static <V> ITerminableFuture<V> submit(ExecutorService executor, + TerminableCallable<V> callable) + { + final Future<V> future = executor.submit(callable); + return new TerminableFuture<V>(future, callable); + } + + /** + * Submits the <var>callableWithCleaner</var> to the <var>executor</var>. + * + * @return A future which allows to terminate the task even when running. + * @see ExecutorService#submit(Callable) + */ + public static <V> ITerminableFuture<V> submit(ExecutorService executor, + TerminableCallable.ICallableCleaner<V> callableWithCleaner) + { + return submit(executor, TerminableCallable.create(callableWithCleaner)); + } + + /** + * Submits the <var>callable</var> to the <var>executor</var>. + * + * @return A future which allows to terminate the task even when running. + * @see ExecutorService#submit(Callable) + */ + public static <V> ITerminableFuture<V> submit(ExecutorService executor, + TerminableCallable.ICallable<V> callable) + { + return submit(executor, TerminableCallable.create(callable)); + } + + /** + * Submits the <var>stoppableCallable</var> to the <var>executor</var>. + * <p> + * <strong>Note: Code executed in the <var>stoppableCallable</var> must <i>not</i> change + * variables or data structures used by several threads or else the problems described in <a + * href="http://java.sun.com/j2se/1.5.0/docs/guide/misc/threadPrimitiveDeprecation.html">"Why is + * <code>Thread.stop()</code> deprecated?"</a> apply to your code! Watch out for static + * thread-safe variables like e.g. the ones of type {@link ThreadLocal}!</strong> + * + * @return A future which allows to terminate the task even when running. + * @see TerminableCallable#createStoppable(Callable) + * @see ExecutorService#submit(Callable) + */ + public static <V> ITerminableFuture<V> submitAsStoppable(ExecutorService executor, + Callable<V> stoppableCallable) + { + return submit(executor, TerminableCallable.createStoppable(stoppableCallable)); + } + /** * The same as {@link Thread#sleep(long)} but throws a {@link StopException} on interruption * rather than a {@link InterruptedException}. @@ -321,8 +379,8 @@ public final class ConcurrencyUtilities } /** - * The same as {@link Thread#join()} but throws a {@link StopException} on interruption - * rather than a {@link InterruptedException}. + * The same as {@link Thread#join()} but throws a {@link StopException} on interruption rather + * than a {@link InterruptedException}. */ public static void join(Thread thread) throws StopException { @@ -351,8 +409,8 @@ public final class ConcurrencyUtilities } /** - * The same as {@link Object#wait()} but throws a {@link StopException} on interruption - * rather than a {@link InterruptedException}. + * The same as {@link Object#wait()} but throws a {@link StopException} on interruption rather + * than a {@link InterruptedException}. */ public static void wait(Object obj) throws StopException { diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ITerminableFuture.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ITerminableFuture.java new file mode 100644 index 0000000000000000000000000000000000000000..9f3325653c8767f39066648d3ca2824c0082e9df --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ITerminableFuture.java @@ -0,0 +1,112 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.Future; + +import ch.systemsx.cisd.common.concurrent.TerminableCallable.ICleaner; +import ch.systemsx.cisd.common.exceptions.StopException; +import ch.systemsx.cisd.common.utilities.ITerminable; + +/** + * An interface that combines {@link Future} and {@link ITerminable} and that can be used to control + * a {@link TerminableCallable}. + * + * @author Bernd Rinn + */ +public interface ITerminableFuture<V> extends Future<V>, ITerminable +{ + /** + * Returns <code>true</code>, if the task of the future is currently running and + * <code>false</code> otherwise. + */ + public boolean isRunning(); + + /** + * Waits for the task of the future to start running. The method waits at most + * <var>timeoutMillis</var> milli-seconds. + * + * @return <code>true</code>, if the task of the future has started running when the method + * returns. + */ + public boolean waitForStarted(long timeoutMillis) throws StopException; + + /** + * Returns <code>true</code>, if the task of the future has already started running. + */ + public boolean hasStarted(); + + /** + * Waits for the task of the future to finish running. The method waits at most + * <var>timeoutMillis</var> milli-seconds. + * + * @return <code>true</code>, if the task of the future has finished running when the method + * returns. + */ + public boolean waitForFinished(long timeoutMillis) throws StopException; + + /** + * Returns <code>true</code>, if the task of the future has already finished running. + */ + public boolean hasFinished(); + + /** + * Waits for the task of the future to finish cleaning up. The method waits at most + * <var>timeoutMillis</var> milli-seconds. + * + * @return <code>true</code>, if the task of the future has finished cleaning up when the + * method returns. + */ + public boolean waitForCleanedUp(long timeoutMillis) throws StopException; + + /** + * Returns <code>true</code>, if the task of the future has already called the + * {@link ICleaner#cleanUp(ch.systemsx.cisd.common.concurrent.TerminableCallable.FinishCause)} + * method, if any. + */ + public boolean hasCleanedUp(); + + /** + * Terminates the task of the future if it has already started running. If it has not yet + * started running, this method cancels it using {@link Future#cancel(boolean)}. Blocks until + * the task of the future has terminated if it already has started running. + * <p> + * Note that there is a semantic difference in the return value of this method to the return + * value of {@link Future#cancel(boolean)} in that it returns <code>true</code> if the task is + * no longer running, regardless of whether it was <i>this</i> call that terminated it or not. + * + * @return <code>true</code> if and only if the task of the future has terminated successfully + * or never been started. + */ + public boolean terminate() throws StopException; + + /** + * Terminates the task of the future if it has already started running. If it has not yet + * started running, this method cancels it using {@link Future#cancel(boolean)}. Blocks until + * the task of the future has terminated or a timeout occurs if it already has started running. + * <p> + * Note that there is a semantic difference in the return value of this method to the return + * value of {@link Future#cancel(boolean)} in that it returns <code>true</code> if the task is + * no longer running, regardless of whether it was <i>this</i> call that terminated it or not. + * + * @param timeoutMillis The time (in milli-seconds) to wait at the most for the future to + * terminate. + * @return <code>true</code> if and only if the task of the future has terminated successfully + * or never been started. + */ + public boolean terminate(long timeoutMillis) throws StopException; +} diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableFuture.java b/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableFuture.java new file mode 100644 index 0000000000000000000000000000000000000000..3e633555c4f677651323a62d6fbff2333786f676 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableFuture.java @@ -0,0 +1,137 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import ch.systemsx.cisd.common.exceptions.StopException; + +/** + * Implementation of a {@link ITerminableFuture} that delegates to appropriate classes. Note that + * there is some additional logic in the {@link #terminate()} and {@link #terminate(long)} methods + * to make them equivalent to {@link Future#cancel(boolean)} if the task did not yet start running. + * + * @author Bernd Rinn + */ +final class TerminableFuture<V> implements ITerminableFuture<V> +{ + private static final long TINY_PERIOD_MILLIS = 20L; + + private final Future<V> delegateFuture; + + private final TerminableCallable<V> delegateTerminableCallable; + + public TerminableFuture(Future<V> delegateFuture, + TerminableCallable<V> delegateTerminableCallable) + { + this.delegateFuture = delegateFuture; + this.delegateTerminableCallable = delegateTerminableCallable; + } + + public boolean cancel(boolean mayInterruptIfRunning) + { + return delegateFuture.cancel(mayInterruptIfRunning); + } + + public V get() throws InterruptedException, ExecutionException + { + return delegateFuture.get(); + } + + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException + { + return delegateFuture.get(timeout, unit); + } + + public boolean isCancelled() + { + return delegateFuture.isCancelled(); + } + + public boolean isDone() + { + return delegateFuture.isDone(); + } + + public boolean isRunning() + { + return delegateTerminableCallable.isRunning(); + } + + public boolean waitForStarted(long timeoutMillis) throws StopException + { + return delegateTerminableCallable.waitForStarted(timeoutMillis); + } + + public boolean hasStarted() + { + return delegateTerminableCallable.hasStarted(); + } + + public boolean waitForFinished(long timeoutMillis) throws StopException + { + return delegateTerminableCallable.waitForFinished(timeoutMillis); + } + + public boolean hasFinished() + { + return delegateTerminableCallable.hasFinished(); + } + + public boolean waitForCleanedUp(long timeoutMillis) throws StopException + { + return delegateTerminableCallable.waitForCleanedUp(timeoutMillis); + } + + public boolean hasCleanedUp() + { + return delegateTerminableCallable.hasCleanedUp(); + } + + public boolean terminate() + { + cancel(false); + // Wait for a very short period of time to ensure that the callable didn't just start + // running at the very moment when we canceled. + if (waitForStarted(TINY_PERIOD_MILLIS)) + { + return delegateTerminableCallable.terminate(); + } else + { + return true; + } + } + + public final boolean terminate(long timeoutMillis) throws StopException + { + cancel(false); + // Wait for a very short period of time to ensure that the callable didn't just start + // running at the very moment when we canceled. + if (waitForStarted(TINY_PERIOD_MILLIS)) + { + return delegateTerminableCallable.terminate(timeoutMillis); + } else + { + return true; + } + } + +}