Skip to content
Snippets Groups Projects
Commit 7ac0eacf authored by brinn's avatar brinn
Browse files

add: methods ConcurrencyUtilities.submit() that provide some additional...

add: methods ConcurrencyUtilities.submit() that provide some additional control over the task by returning an ITerminableFuture which delegates to a TerminableCallable where appropriate

SVN: 6631
parent 4ac07c64
No related branches found
No related tags found
No related merge requests found
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
package ch.systemsx.cisd.common.concurrent; package ch.systemsx.cisd.common.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
...@@ -305,6 +307,62 @@ public final class ConcurrencyUtilities ...@@ -305,6 +307,62 @@ public final class ConcurrencyUtilities
} }
} }
/**
* Submits the <var>callable</var> to the <var>executor</var>.
*
* @return A future which allows to terminate the task even when running.
* @see ExecutorService#submit(Callable)
*/
public static <V> ITerminableFuture<V> submit(ExecutorService executor,
TerminableCallable<V> callable)
{
final Future<V> future = executor.submit(callable);
return new TerminableFuture<V>(future, callable);
}
/**
* Submits the <var>callableWithCleaner</var> to the <var>executor</var>.
*
* @return A future which allows to terminate the task even when running.
* @see ExecutorService#submit(Callable)
*/
public static <V> ITerminableFuture<V> submit(ExecutorService executor,
TerminableCallable.ICallableCleaner<V> callableWithCleaner)
{
return submit(executor, TerminableCallable.create(callableWithCleaner));
}
/**
* Submits the <var>callable</var> to the <var>executor</var>.
*
* @return A future which allows to terminate the task even when running.
* @see ExecutorService#submit(Callable)
*/
public static <V> ITerminableFuture<V> submit(ExecutorService executor,
TerminableCallable.ICallable<V> callable)
{
return submit(executor, TerminableCallable.create(callable));
}
/**
* Submits the <var>stoppableCallable</var> to the <var>executor</var>.
* <p>
* <strong>Note: Code executed in the <var>stoppableCallable</var> must <i>not</i> change
* variables or data structures used by several threads or else the problems described in <a
* href="http://java.sun.com/j2se/1.5.0/docs/guide/misc/threadPrimitiveDeprecation.html">"Why is
* <code>Thread.stop()</code> deprecated?"</a> apply to your code! Watch out for static
* thread-safe variables like e.g. the ones of type {@link ThreadLocal}!</strong>
*
* @return A future which allows to terminate the task even when running.
* @see TerminableCallable#createStoppable(Callable)
* @see ExecutorService#submit(Callable)
*/
public static <V> ITerminableFuture<V> submitAsStoppable(ExecutorService executor,
Callable<V> stoppableCallable)
{
return submit(executor, TerminableCallable.createStoppable(stoppableCallable));
}
/** /**
* The same as {@link Thread#sleep(long)} but throws a {@link StopException} on interruption * The same as {@link Thread#sleep(long)} but throws a {@link StopException} on interruption
* rather than a {@link InterruptedException}. * rather than a {@link InterruptedException}.
...@@ -321,8 +379,8 @@ public final class ConcurrencyUtilities ...@@ -321,8 +379,8 @@ public final class ConcurrencyUtilities
} }
/** /**
* The same as {@link Thread#join()} but throws a {@link StopException} on interruption * The same as {@link Thread#join()} but throws a {@link StopException} on interruption rather
* rather than a {@link InterruptedException}. * than a {@link InterruptedException}.
*/ */
public static void join(Thread thread) throws StopException public static void join(Thread thread) throws StopException
{ {
...@@ -351,8 +409,8 @@ public final class ConcurrencyUtilities ...@@ -351,8 +409,8 @@ public final class ConcurrencyUtilities
} }
/** /**
* The same as {@link Object#wait()} but throws a {@link StopException} on interruption * The same as {@link Object#wait()} but throws a {@link StopException} on interruption rather
* rather than a {@link InterruptedException}. * than a {@link InterruptedException}.
*/ */
public static void wait(Object obj) throws StopException public static void wait(Object obj) throws StopException
{ {
......
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.concurrent;
import java.util.concurrent.Future;
import ch.systemsx.cisd.common.concurrent.TerminableCallable.ICleaner;
import ch.systemsx.cisd.common.exceptions.StopException;
import ch.systemsx.cisd.common.utilities.ITerminable;
/**
* An interface that combines {@link Future} and {@link ITerminable} and that can be used to control
* a {@link TerminableCallable}.
*
* @author Bernd Rinn
*/
public interface ITerminableFuture<V> extends Future<V>, ITerminable
{
/**
* Returns <code>true</code>, if the task of the future is currently running and
* <code>false</code> otherwise.
*/
public boolean isRunning();
/**
* Waits for the task of the future to start running. The method waits at most
* <var>timeoutMillis</var> milli-seconds.
*
* @return <code>true</code>, if the task of the future has started running when the method
* returns.
*/
public boolean waitForStarted(long timeoutMillis) throws StopException;
/**
* Returns <code>true</code>, if the task of the future has already started running.
*/
public boolean hasStarted();
/**
* Waits for the task of the future to finish running. The method waits at most
* <var>timeoutMillis</var> milli-seconds.
*
* @return <code>true</code>, if the task of the future has finished running when the method
* returns.
*/
public boolean waitForFinished(long timeoutMillis) throws StopException;
/**
* Returns <code>true</code>, if the task of the future has already finished running.
*/
public boolean hasFinished();
/**
* Waits for the task of the future to finish cleaning up. The method waits at most
* <var>timeoutMillis</var> milli-seconds.
*
* @return <code>true</code>, if the task of the future has finished cleaning up when the
* method returns.
*/
public boolean waitForCleanedUp(long timeoutMillis) throws StopException;
/**
* Returns <code>true</code>, if the task of the future has already called the
* {@link ICleaner#cleanUp(ch.systemsx.cisd.common.concurrent.TerminableCallable.FinishCause)}
* method, if any.
*/
public boolean hasCleanedUp();
/**
* Terminates the task of the future if it has already started running. If it has not yet
* started running, this method cancels it using {@link Future#cancel(boolean)}. Blocks until
* the task of the future has terminated if it already has started running.
* <p>
* Note that there is a semantic difference in the return value of this method to the return
* value of {@link Future#cancel(boolean)} in that it returns <code>true</code> if the task is
* no longer running, regardless of whether it was <i>this</i> call that terminated it or not.
*
* @return <code>true</code> if and only if the task of the future has terminated successfully
* or never been started.
*/
public boolean terminate() throws StopException;
/**
* Terminates the task of the future if it has already started running. If it has not yet
* started running, this method cancels it using {@link Future#cancel(boolean)}. Blocks until
* the task of the future has terminated or a timeout occurs if it already has started running.
* <p>
* Note that there is a semantic difference in the return value of this method to the return
* value of {@link Future#cancel(boolean)} in that it returns <code>true</code> if the task is
* no longer running, regardless of whether it was <i>this</i> call that terminated it or not.
*
* @param timeoutMillis The time (in milli-seconds) to wait at the most for the future to
* terminate.
* @return <code>true</code> if and only if the task of the future has terminated successfully
* or never been started.
*/
public boolean terminate(long timeoutMillis) throws StopException;
}
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import ch.systemsx.cisd.common.exceptions.StopException;
/**
* Implementation of a {@link ITerminableFuture} that delegates to appropriate classes. Note that
* there is some additional logic in the {@link #terminate()} and {@link #terminate(long)} methods
* to make them equivalent to {@link Future#cancel(boolean)} if the task did not yet start running.
*
* @author Bernd Rinn
*/
final class TerminableFuture<V> implements ITerminableFuture<V>
{
private static final long TINY_PERIOD_MILLIS = 20L;
private final Future<V> delegateFuture;
private final TerminableCallable<V> delegateTerminableCallable;
public TerminableFuture(Future<V> delegateFuture,
TerminableCallable<V> delegateTerminableCallable)
{
this.delegateFuture = delegateFuture;
this.delegateTerminableCallable = delegateTerminableCallable;
}
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegateFuture.cancel(mayInterruptIfRunning);
}
public V get() throws InterruptedException, ExecutionException
{
return delegateFuture.get();
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException
{
return delegateFuture.get(timeout, unit);
}
public boolean isCancelled()
{
return delegateFuture.isCancelled();
}
public boolean isDone()
{
return delegateFuture.isDone();
}
public boolean isRunning()
{
return delegateTerminableCallable.isRunning();
}
public boolean waitForStarted(long timeoutMillis) throws StopException
{
return delegateTerminableCallable.waitForStarted(timeoutMillis);
}
public boolean hasStarted()
{
return delegateTerminableCallable.hasStarted();
}
public boolean waitForFinished(long timeoutMillis) throws StopException
{
return delegateTerminableCallable.waitForFinished(timeoutMillis);
}
public boolean hasFinished()
{
return delegateTerminableCallable.hasFinished();
}
public boolean waitForCleanedUp(long timeoutMillis) throws StopException
{
return delegateTerminableCallable.waitForCleanedUp(timeoutMillis);
}
public boolean hasCleanedUp()
{
return delegateTerminableCallable.hasCleanedUp();
}
public boolean terminate()
{
cancel(false);
// Wait for a very short period of time to ensure that the callable didn't just start
// running at the very moment when we canceled.
if (waitForStarted(TINY_PERIOD_MILLIS))
{
return delegateTerminableCallable.terminate();
} else
{
return true;
}
}
public final boolean terminate(long timeoutMillis) throws StopException
{
cancel(false);
// Wait for a very short period of time to ensure that the callable didn't just start
// running at the very moment when we canceled.
if (waitForStarted(TINY_PERIOD_MILLIS))
{
return delegateTerminableCallable.terminate(timeoutMillis);
} else
{
return true;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment