From c661c0a3adde078412b8c6420b32457b7f15401c Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Wed, 18 Jun 2008 05:44:17 +0000 Subject: [PATCH] change: improve exception handling in final phase add: CancelException thrown when running a callable that had been successfully canceled SVN: 6660 --- .../common/concurrent/TerminableCallable.java | 25 +++++++-- .../cisd/common/concurrent/ThreadGuard.java | 51 ++++++++++--------- .../concurrent/TerminableCallableTest.java | 32 +++++++++++- 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableCallable.java b/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableCallable.java index cb14208fd83..a857ecde506 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableCallable.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/TerminableCallable.java @@ -139,6 +139,16 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable EXCEPTION, } + /** + * An exception thrown when the callable is called if it got canceled before. + * + * @author Bernd Rinn + */ + public static class CanceledException extends RuntimeException + { + private static final long serialVersionUID = 1L; + } + /** * A role that executes {@link Runnable}s and {@link Callable}s immediately in the current * thread and that marks the code it runs as suitable for <code>Thread.stop()</code>. @@ -173,6 +183,10 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable * Note that this method is <i>always</i> called, no matter what the cause is. If you want * to perform clean up only for some causes, check <var>cause</var> first. * <p> + * Note that the current Thread may be in an interrupted state when this method is called or + * may get interrupted during the method runs. It is advised not to use methods that throw + * an {@link InterruptedException} or equivalent in this method. + * <p> * <strong>Don't perform any time consuming operations in this method and avoid any * operations that can fail with an exception.</strong> * @@ -329,13 +343,16 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable return (causeOrNull != null) ? causeOrNull : new InterruptedException(); } - public V call() throws InterruptedException + public V call() throws InterruptedException, CanceledException { + if (threadGuard.startGuard() == false) + { + throw new CanceledException(); + } Throwable throwableOrNull = null; try { final V result; - threadGuard.startGuardOrInterrupt(); try { result = delegate.call(new IStoppableExecutor<V>() @@ -367,10 +384,10 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable } catch (Throwable th) { throwableOrNull = th; - threadGuard.markFinishingOrInterrupt(); + threadGuard.shutdownGuard(); throw CheckedExceptionTunnel.wrapIfNecessary(th); } - threadGuard.markFinishingOrInterrupt(); + threadGuard.shutdownGuard(); return result; } catch (StopException ex) { diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ThreadGuard.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ThreadGuard.java index 943411a6ca5..df3414b417f 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/ThreadGuard.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ThreadGuard.java @@ -25,7 +25,7 @@ import ch.systemsx.cisd.common.exceptions.HardStopException; /** * A class that provides the framework for guarding a {@link Thread} such that it can be stopped - * safely. It will guard the current thread of whoever runs {@link #startGuardOrInterrupt()}. + * safely. It will guard the current thread of whoever runs {@link #startGuard()}. * <p> * A {@link ThreadGuard} instance can only guard a thread once, it can not be reset. * @@ -35,7 +35,7 @@ final class ThreadGuard { private enum State { - INITIAL, CANCELLED, TERMINATING, RUNNING, FINISHING + INITIAL, CANCELED, TERMINATING, RUNNING, FINISHING } /** The lock that guards stopping the thread. */ @@ -82,8 +82,10 @@ final class ThreadGuard { if (state == State.RUNNING) { + final Thread t = thread; + thread = null; state = State.TERMINATING; - return thread; + return t; } else { return null; @@ -132,36 +134,29 @@ final class ThreadGuard // Whatever manipulates state or thread needs to be synchronized. /** - * Mark the guard as being started. The current thread when running this method will be guarded - * from now on. Implies {@link #preventStopping()}. + * Start up the guard. The current thread when running this method will be guarded from now on. + * Implies {@link #preventStopping()}. * - * @throws InterruptedException If the guard is not in initial state (e.g. because it got - * cancelled). + * @return <code>true</code>, if the guard was successfully started and <code>false</code>, + * if the guard had been canceled before. */ - synchronized void startGuardOrInterrupt() throws InterruptedException + synchronized boolean startGuard() { if (state != State.INITIAL) { - Thread.interrupted(); // Clear interrupt flag - throw new InterruptedException(); + return false; } stopLock.lock(); state = State.RUNNING; thread = Thread.currentThread(); + return true; } /** - * Mark the guard as being in state finishing. - * - * @throws InterruptedException If this thread is not in state running. + * Shut down the guard. Does not yet set it to finished. */ - synchronized void markFinishingOrInterrupt() throws InterruptedException + synchronized void shutdownGuard() { - if (state != State.RUNNING) - { - Thread.interrupted(); // Clear interrupt flag - throw new InterruptedException(); - } state = State.FINISHING; thread = null; } @@ -171,7 +166,7 @@ final class ThreadGuard */ synchronized boolean hasStarted() { - return (state != State.INITIAL && state != State.CANCELLED); + return (state != State.INITIAL && state != State.CANCELED); } /** @@ -183,15 +178,18 @@ final class ThreadGuard } /** - * Tries to cancel the guard, i.e. prevent it from running if it doesn't run yet. + * Tries to cancel the guard, i.e. prevent it from running if it doesn't run yet. If canceling + * is successful, it implies marking the guard as finished. * - * @return <code>true</code>, if the guard has been cancelled successfully. + * @return <code>true</code>, if the guard has been canceled successfully. */ synchronized boolean cancel() { if (state == State.INITIAL) { - state = State.CANCELLED; + state = State.CANCELED; + // Do not call markFinished() as the stopLock is not yet initialized. + finishedLatch.countDown(); return true; } else { @@ -208,7 +206,7 @@ final class ThreadGuard * <p> * The following steps are performed: * <ol> - * <li> If the guard got cancelled, return with cod<code>true</code>. </li> + * <li> If the guard got canceled, return with cod<code>true</code>. </li> * <li> If the guard is already in state finishing, wait for the guard to be set to finished and * return <code>true</code>, if that happens in due time and <code>false</code> otherwise. * </li> @@ -243,7 +241,10 @@ final class ThreadGuard if (t != null) { t.interrupt(); - if (waitForFinished(waitInterruptMillis) == false) + if (waitForFinished(waitInterruptMillis)) + { + return true; + } else { if (stop(t, timeoutMillis - (System.currentTimeMillis() - start)) == false) { diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TerminableCallableTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TerminableCallableTest.java index 6395ba9b004..33093846ac7 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TerminableCallableTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/TerminableCallableTest.java @@ -23,9 +23,11 @@ import static org.testng.AssertJUnit.assertTrue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.testng.annotations.Test; +import ch.systemsx.cisd.common.concurrent.TerminableCallable.CanceledException; import ch.systemsx.cisd.common.concurrent.TerminableCallable.FinishCause; import ch.systemsx.cisd.common.concurrent.TerminableCallable.ICallableCleaner; import ch.systemsx.cisd.common.concurrent.TerminableCallable.IStoppableExecutor; @@ -142,12 +144,40 @@ public class TerminableCallableTest finishLatch); final TerminableCallable<Object> callableUnderTest = TerminableCallable.create(sensor); new Thread(callableUnderTest.asRunnable(), "complete").start(); - finishLatch.await(500L, TimeUnit.MILLISECONDS); + finishLatch.await(200L, TimeUnit.MILLISECONDS); assertTrue(milestoneLatch.await(0, TimeUnit.MILLISECONDS)); assertTrue(describe(sensor.cause), FinishCause.COMPLETED.equals(sensor.cause)); assertEquals(1, sensor.cleanUpCount); } + @Test + public void testCancel() throws Exception + { + final CountDownLatch launchLatch = new CountDownLatch(1); + final CountDownLatch milestoneLatch = new CountDownLatch(1); + final CountDownLatch finishLatch = new CountDownLatch(1); + final TestRunnable sensor = + new TestRunnable(launchLatch, milestoneLatch, Strategy.COMPLETE_IMMEDIATELY, + finishLatch); + final TerminableCallable<Object> callableUnderTest = TerminableCallable.create(sensor); + callableUnderTest.cancel(); + final Thread t = new Thread(callableUnderTest.asRunnable(), "cancel"); + final AtomicReference<Throwable> uncaughtException = new AtomicReference<Throwable>(null); + t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() + { + public void uncaughtException(Thread t2, Throwable e) + { + uncaughtException.set(e); + } + }); + t.start(); + finishLatch.await(200L, TimeUnit.MILLISECONDS); + assertFalse(milestoneLatch.await(0, TimeUnit.MILLISECONDS)); + assertNull(sensor.cause); + assertEquals(0, sensor.cleanUpCount); + assertEquals(CanceledException.class, uncaughtException.get().getClass()); + } + @Test public void testInterrupt() throws Exception { -- GitLab