Skip to content
Snippets Groups Projects
Commit c661c0a3 authored by brinn's avatar brinn
Browse files

change: improve exception handling in final phase

add: CancelException thrown when running a callable that had been successfully canceled

SVN: 6660
parent 4598ad3b
No related branches found
No related tags found
No related merge requests found
......@@ -139,6 +139,16 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable
EXCEPTION,
}
/**
* An exception thrown when the callable is called if it got canceled before.
*
* @author Bernd Rinn
*/
public static class CanceledException extends RuntimeException
{
private static final long serialVersionUID = 1L;
}
/**
* A role that executes {@link Runnable}s and {@link Callable}s immediately in the current
* thread and that marks the code it runs as suitable for <code>Thread.stop()</code>.
......@@ -173,6 +183,10 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable
* Note that this method is <i>always</i> called, no matter what the cause is. If you want
* to perform clean up only for some causes, check <var>cause</var> first.
* <p>
* Note that the current Thread may be in an interrupted state when this method is called or
* may get interrupted during the method runs. It is advised not to use methods that throw
* an {@link InterruptedException} or equivalent in this method.
* <p>
* <strong>Don't perform any time consuming operations in this method and avoid any
* operations that can fail with an exception.</strong>
*
......@@ -329,13 +343,16 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable
return (causeOrNull != null) ? causeOrNull : new InterruptedException();
}
public V call() throws InterruptedException
public V call() throws InterruptedException, CanceledException
{
if (threadGuard.startGuard() == false)
{
throw new CanceledException();
}
Throwable throwableOrNull = null;
try
{
final V result;
threadGuard.startGuardOrInterrupt();
try
{
result = delegate.call(new IStoppableExecutor<V>()
......@@ -367,10 +384,10 @@ public final class TerminableCallable<V> implements Callable<V>, ITerminable
} catch (Throwable th)
{
throwableOrNull = th;
threadGuard.markFinishingOrInterrupt();
threadGuard.shutdownGuard();
throw CheckedExceptionTunnel.wrapIfNecessary(th);
}
threadGuard.markFinishingOrInterrupt();
threadGuard.shutdownGuard();
return result;
} catch (StopException ex)
{
......
......@@ -25,7 +25,7 @@ import ch.systemsx.cisd.common.exceptions.HardStopException;
/**
* A class that provides the framework for guarding a {@link Thread} such that it can be stopped
* safely. It will guard the current thread of whoever runs {@link #startGuardOrInterrupt()}.
* safely. It will guard the current thread of whoever runs {@link #startGuard()}.
* <p>
* A {@link ThreadGuard} instance can only guard a thread once, it can not be reset.
*
......@@ -35,7 +35,7 @@ final class ThreadGuard
{
private enum State
{
INITIAL, CANCELLED, TERMINATING, RUNNING, FINISHING
INITIAL, CANCELED, TERMINATING, RUNNING, FINISHING
}
/** The lock that guards stopping the thread. */
......@@ -82,8 +82,10 @@ final class ThreadGuard
{
if (state == State.RUNNING)
{
final Thread t = thread;
thread = null;
state = State.TERMINATING;
return thread;
return t;
} else
{
return null;
......@@ -132,36 +134,29 @@ final class ThreadGuard
// Whatever manipulates state or thread needs to be synchronized.
/**
* Mark the guard as being started. The current thread when running this method will be guarded
* from now on. Implies {@link #preventStopping()}.
* Start up the guard. The current thread when running this method will be guarded from now on.
* Implies {@link #preventStopping()}.
*
* @throws InterruptedException If the guard is not in initial state (e.g. because it got
* cancelled).
* @return <code>true</code>, if the guard was successfully started and <code>false</code>,
* if the guard had been canceled before.
*/
synchronized void startGuardOrInterrupt() throws InterruptedException
synchronized boolean startGuard()
{
if (state != State.INITIAL)
{
Thread.interrupted(); // Clear interrupt flag
throw new InterruptedException();
return false;
}
stopLock.lock();
state = State.RUNNING;
thread = Thread.currentThread();
return true;
}
/**
* Mark the guard as being in state finishing.
*
* @throws InterruptedException If this thread is not in state running.
* Shut down the guard. Does not yet set it to finished.
*/
synchronized void markFinishingOrInterrupt() throws InterruptedException
synchronized void shutdownGuard()
{
if (state != State.RUNNING)
{
Thread.interrupted(); // Clear interrupt flag
throw new InterruptedException();
}
state = State.FINISHING;
thread = null;
}
......@@ -171,7 +166,7 @@ final class ThreadGuard
*/
synchronized boolean hasStarted()
{
return (state != State.INITIAL && state != State.CANCELLED);
return (state != State.INITIAL && state != State.CANCELED);
}
/**
......@@ -183,15 +178,18 @@ final class ThreadGuard
}
/**
* Tries to cancel the guard, i.e. prevent it from running if it doesn't run yet.
* Tries to cancel the guard, i.e. prevent it from running if it doesn't run yet. If canceling
* is successful, it implies marking the guard as finished.
*
* @return <code>true</code>, if the guard has been cancelled successfully.
* @return <code>true</code>, if the guard has been canceled successfully.
*/
synchronized boolean cancel()
{
if (state == State.INITIAL)
{
state = State.CANCELLED;
state = State.CANCELED;
// Do not call markFinished() as the stopLock is not yet initialized.
finishedLatch.countDown();
return true;
} else
{
......@@ -208,7 +206,7 @@ final class ThreadGuard
* <p>
* The following steps are performed:
* <ol>
* <li> If the guard got cancelled, return with cod<code>true</code>. </li>
* <li> If the guard got canceled, return with cod<code>true</code>. </li>
* <li> If the guard is already in state finishing, wait for the guard to be set to finished and
* return <code>true</code>, if that happens in due time and <code>false</code> otherwise.
* </li>
......@@ -243,7 +241,10 @@ final class ThreadGuard
if (t != null)
{
t.interrupt();
if (waitForFinished(waitInterruptMillis) == false)
if (waitForFinished(waitInterruptMillis))
{
return true;
} else
{
if (stop(t, timeoutMillis - (System.currentTimeMillis() - start)) == false)
{
......
......@@ -23,9 +23,11 @@ import static org.testng.AssertJUnit.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.annotations.Test;
import ch.systemsx.cisd.common.concurrent.TerminableCallable.CanceledException;
import ch.systemsx.cisd.common.concurrent.TerminableCallable.FinishCause;
import ch.systemsx.cisd.common.concurrent.TerminableCallable.ICallableCleaner;
import ch.systemsx.cisd.common.concurrent.TerminableCallable.IStoppableExecutor;
......@@ -142,12 +144,40 @@ public class TerminableCallableTest
finishLatch);
final TerminableCallable<Object> callableUnderTest = TerminableCallable.create(sensor);
new Thread(callableUnderTest.asRunnable(), "complete").start();
finishLatch.await(500L, TimeUnit.MILLISECONDS);
finishLatch.await(200L, TimeUnit.MILLISECONDS);
assertTrue(milestoneLatch.await(0, TimeUnit.MILLISECONDS));
assertTrue(describe(sensor.cause), FinishCause.COMPLETED.equals(sensor.cause));
assertEquals(1, sensor.cleanUpCount);
}
@Test
public void testCancel() throws Exception
{
final CountDownLatch launchLatch = new CountDownLatch(1);
final CountDownLatch milestoneLatch = new CountDownLatch(1);
final CountDownLatch finishLatch = new CountDownLatch(1);
final TestRunnable sensor =
new TestRunnable(launchLatch, milestoneLatch, Strategy.COMPLETE_IMMEDIATELY,
finishLatch);
final TerminableCallable<Object> callableUnderTest = TerminableCallable.create(sensor);
callableUnderTest.cancel();
final Thread t = new Thread(callableUnderTest.asRunnable(), "cancel");
final AtomicReference<Throwable> uncaughtException = new AtomicReference<Throwable>(null);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
public void uncaughtException(Thread t2, Throwable e)
{
uncaughtException.set(e);
}
});
t.start();
finishLatch.await(200L, TimeUnit.MILLISECONDS);
assertFalse(milestoneLatch.await(0, TimeUnit.MILLISECONDS));
assertNull(sensor.cause);
assertEquals(0, sensor.cleanUpCount);
assertEquals(CanceledException.class, uncaughtException.get().getClass());
}
@Test
public void testInterrupt() throws Exception
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment