From 2a846df61a34d60be6f8fc893e2a6f6a8dd1eb17 Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Sat, 30 Jan 2010 13:04:04 +0000
Subject: [PATCH] add: improved support for communication between the
 MonitoringProxy and a monitored method call by using an IMonitorCommunication
 instance

SVN: 14570
---
 .../concurrent/ConcurrencyUtilities.java      | 139 ++++++++-----
 .../common/concurrent/MonitoringProxy.java    | 192 ++++++++++++++++--
 .../concurrent/ConcurrencyUtilitiesTest.java  |   2 +-
 .../concurrent/MonitoringProxyTest.java       |  80 +++++++-
 4 files changed, 349 insertions(+), 64 deletions(-)

diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java
index f63d056e1a2..40df5ac8f6c 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilities.java
@@ -64,6 +64,18 @@ public final class ConcurrencyUtilities
         LogLevel getLogLevelForError();
     }
 
+    /**
+     * A role for notifying the caller that a future will be cancelled <em>before</em> it is
+     * actually cancelled.
+     */
+    public interface ICancellationNotifier
+    {
+        /**
+         * Called immediately before the future is cancelled.
+         */
+        void willCancel();
+    }
+
     /**
      * Tries to get the result of a <var>future</var>, maximally waiting <var>timeoutMillis</var>
      * for the result to become available. Any {@link ExecutionException} that might occur in the
@@ -76,7 +88,8 @@ public final class ConcurrencyUtilities
      *         available within <var>timeoutMillis</var> ms.
      * @throws InterruptedExceptionUnchecked If the thread got interrupted.
      */
-    public static <T> T tryGetResult(Future<T> future, long timeoutMillis) throws InterruptedExceptionUnchecked
+    public static <T> T tryGetResult(Future<T> future, long timeoutMillis)
+            throws InterruptedExceptionUnchecked
     {
         return tryGetResult(future, timeoutMillis, null, true);
     }
@@ -91,8 +104,8 @@ public final class ConcurrencyUtilities
      *            it is smaller than 0, no time-out will apply.
      * @return The result of the future, or <code>null</code>, if the result does not become
      *         available within <var>timeoutMillis</var> ms.
-     * @throws InterruptedExceptionUnchecked If the thread got interrupted and <var>stopOnInterrupt</var> is
-     *             <code>true</code>.
+     * @throws InterruptedExceptionUnchecked If the thread got interrupted and
+     *             <var>stopOnInterrupt</var> is <code>true</code>.
      */
     public static <T> T tryGetResult(Future<T> future, long timeoutMillis, boolean stopOnInterrupt)
             throws InterruptedExceptionUnchecked
@@ -111,16 +124,17 @@ public final class ConcurrencyUtilities
      *            it is smaller than 0, no time-out will apply.
      * @param logSettingsOrNull The settings for error logging, or <code>null</code>, if error
      *            conditions should not be logged.
-     * @param stopOnInterrupt If <code>true</code>, throw a {@link InterruptedExceptionUnchecked} if the thread gets
-     *            interrupted while waiting on the future.
+     * @param stopOnInterrupt If <code>true</code>, throw a {@link InterruptedExceptionUnchecked} if
+     *            the thread gets interrupted while waiting on the future.
      * @return The result of the future, or <code>null</code>, if the result does not become
      *         available within <var>timeoutMillis</var> ms or if the waiting thread gets
      *         interrupted.
-     * @throws InterruptedExceptionUnchecked If the thread got interrupted and <var>stopOnInterrupt</var> is
-     *             <code>true</code>.
+     * @throws InterruptedExceptionUnchecked If the thread got interrupted and
+     *             <var>stopOnInterrupt</var> is <code>true</code>.
      */
     public static <T> T tryGetResult(Future<T> future, long timeoutMillis,
-            ILogSettings logSettingsOrNull, boolean stopOnInterrupt) throws InterruptedExceptionUnchecked
+            ILogSettings logSettingsOrNull, boolean stopOnInterrupt)
+            throws InterruptedExceptionUnchecked
     {
         final ExecutionResult<T> result = getResult(future, timeoutMillis, logSettingsOrNull);
         return tryDealWithResult(result, stopOnInterrupt);
@@ -146,13 +160,13 @@ public final class ConcurrencyUtilities
      * with the deviant cases yourself, then call this method to deal with the rest.
      * 
      * @param result A
-     * @param stopOnInterrupt If <code>true</code>, throw a {@link InterruptedExceptionUnchecked} if the thread gets
-     *            interrupted while waiting on the future.
+     * @param stopOnInterrupt If <code>true</code>, throw a {@link InterruptedExceptionUnchecked} if
+     *            the thread gets interrupted while waiting on the future.
      * @return The value of the <var>result</var> of the future, or <code>null</code>, if the result
      *         status is {@link ExecutionStatus#TIMED_OUT} or {@link ExecutionStatus#INTERRUPTED}
      *         and <var>stopOnInterrupt</var> is <code>false</code>.
-     * @throws InterruptedExceptionUnchecked If the thread got interrupted and <var>stopOnInterrupt</var> is
-     *             <code>true</code>.
+     * @throws InterruptedExceptionUnchecked If the thread got interrupted and
+     *             <var>stopOnInterrupt</var> is <code>true</code>.
      * @throws RuntimeException If the result status is {@link ExecutionStatus#EXCEPTION} and the
      *             exception is derived from {@link RuntimeException}.
      * @throws CheckedExceptionTunnel If the result status is {@link ExecutionStatus#EXCEPTION} and
@@ -195,11 +209,12 @@ public final class ConcurrencyUtilities
      * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes
      * are:
      * <ul>
-     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a
-     * result is available, if provided.</li> <li> {@link ExecutionStatus#EXCEPTION}: The execution
-     * has been terminated by an exception.</li> <li> {@link ExecutionStatus#TIMED_OUT}: The
-     * execution timed out.</li> <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the
-     * execution was interrupted (see {@link Thread#interrupt()}).</li>
+     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a result
+     * is available, if provided.</li>
+     * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li>
+     * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li>
+     * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see
+     * {@link Thread#interrupt()}).</li>
      * </ul>
      * 
      * @param future The future representing the execution to wait for.
@@ -219,11 +234,12 @@ public final class ConcurrencyUtilities
      * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes
      * are:
      * <ul>
-     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a
-     * result is available, if provided.</li> <li> {@link ExecutionStatus#EXCEPTION}: The execution
-     * has been terminated by an exception.</li> <li> {@link ExecutionStatus#TIMED_OUT}: The
-     * execution timed out.</li> <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the
-     * execution was interrupted (see {@link Thread#interrupt()}).</li>
+     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a result
+     * is available, if provided.</li>
+     * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li>
+     * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li>
+     * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see
+     * {@link Thread#interrupt()}).</li>
      * </ul>
      * 
      * @param future The future representing the execution to wait for.
@@ -246,11 +262,12 @@ public final class ConcurrencyUtilities
      * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes
      * are:
      * <ul>
-     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a
-     * result is available, if provided.</li> <li> {@link ExecutionStatus#EXCEPTION}: The execution
-     * has been terminated by an exception.</li> <li> {@link ExecutionStatus#TIMED_OUT}: The
-     * execution timed out.</li> <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the
-     * execution was interrupted (see {@link Thread#interrupt()}).</li>
+     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a result
+     * is available, if provided.</li>
+     * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li>
+     * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li>
+     * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see
+     * {@link Thread#interrupt()}).</li>
      * </ul>
      * 
      * @param future The future representing the execution to wait for.
@@ -266,7 +283,7 @@ public final class ConcurrencyUtilities
     public static <T> ExecutionResult<T> getResult(Future<T> future, long timeoutMillis,
             boolean cancelOnTimeout, ILogSettings logSettingsOrNull)
     {
-        return getResult(future, timeoutMillis, cancelOnTimeout, logSettingsOrNull, null);
+        return getResult(future, timeoutMillis, cancelOnTimeout, logSettingsOrNull, null, null);
     }
 
     private static boolean isActive(IActivitySensor sensorOrNull, long timeoutMillis)
@@ -280,11 +297,12 @@ public final class ConcurrencyUtilities
      * {@link ExecutionResult} that describes the outcome of the execution. The possible outcomes
      * are:
      * <ul>
-     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a
-     * result is available, if provided.</li> <li> {@link ExecutionStatus#EXCEPTION}: The execution
-     * has been terminated by an exception.</li> <li> {@link ExecutionStatus#TIMED_OUT}: The
-     * execution timed out.</li> <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the
-     * execution was interrupted (see {@link Thread#interrupt()}).</li>
+     * <li> {@link ExecutionStatus#COMPLETE}: The execution has been performed correctly and a result
+     * is available, if provided.</li>
+     * <li> {@link ExecutionStatus#EXCEPTION}: The execution has been terminated by an exception.</li>
+     * <li> {@link ExecutionStatus#TIMED_OUT}: The execution timed out.</li>
+     * <li> {@link ExecutionStatus#INTERRUPTED}: The thread of the execution was interrupted (see
+     * {@link Thread#interrupt()}).</li>
      * </ul>
      * 
      * @param future The future representing the execution to wait for.
@@ -294,12 +312,15 @@ public final class ConcurrencyUtilities
      *            time-out.
      * @param logSettingsOrNull The settings for error logging, or <code>null</code>, if error
      *            conditions should not be logged.
+     * @param cancellationNotifierOrNull The notifier for cancellation of the future or
+     *            <code>null</code>
      * @param sensorOrNull A sensor that can prevent the method from timing out by showing activity.
      * @return The {@link ExecutionResult} of the <var>future</var>. May correspond to each one of
      *         the {@link ExecutionStatus} values.
      */
     public static <T> ExecutionResult<T> getResult(Future<T> future, long timeoutMillis,
-            boolean cancelOnTimeout, ILogSettings logSettingsOrNull, IActivitySensor sensorOrNull)
+            boolean cancelOnTimeout, ILogSettings logSettingsOrNull,
+            ICancellationNotifier cancellationNotifierOrNull, IActivitySensor sensorOrNull)
     {
         try
         {
@@ -308,7 +329,8 @@ public final class ConcurrencyUtilities
             {
                 try
                 {
-                    result = ExecutionResult.create(future.get(transform(timeoutMillis),
+                    result =
+                            ExecutionResult.create(future.get(transform(timeoutMillis),
                                     TimeUnit.MILLISECONDS));
                 } catch (TimeoutException ex)
                 {
@@ -319,6 +341,10 @@ public final class ConcurrencyUtilities
             {
                 if (cancelOnTimeout)
                 {
+                    if (cancellationNotifierOrNull != null)
+                    {
+                        cancellationNotifierOrNull.willCancel();
+                    }
                     future.cancel(true);
                 }
                 if (logSettingsOrNull != null)
@@ -336,6 +362,10 @@ public final class ConcurrencyUtilities
             }
         } catch (InterruptedException ex)
         {
+            if (cancellationNotifierOrNull != null)
+            {
+                cancellationNotifierOrNull.willCancel();
+            }
             future.cancel(true);
             if (logSettingsOrNull != null)
             {
@@ -346,6 +376,10 @@ public final class ConcurrencyUtilities
         } catch (InterruptedExceptionUnchecked ex)
         {
             // Happens when Thread.stop(new StopException()) is called.
+            if (cancellationNotifierOrNull != null)
+            {
+                cancellationNotifierOrNull.willCancel();
+            }
             future.cancel(true);
             if (logSettingsOrNull != null)
             {
@@ -355,6 +389,10 @@ public final class ConcurrencyUtilities
             return ExecutionResult.createInterrupted();
         } catch (ThreadDeath ex)
         {
+            if (cancellationNotifierOrNull != null)
+            {
+                cancellationNotifierOrNull.willCancel();
+            }
             future.cancel(true);
             if (logSettingsOrNull != null)
             {
@@ -375,11 +413,18 @@ public final class ConcurrencyUtilities
             final Throwable cause = ex.getCause();
             if (cause instanceof InterruptedExceptionUnchecked)
             {
+                if (cancellationNotifierOrNull != null)
+                {
+                    cancellationNotifierOrNull.willCancel();
+                }
                 future.cancel(true);
                 if (logSettingsOrNull != null)
                 {
-                    logSettingsOrNull.getLogger().log(logSettingsOrNull.getLogLevelForError(),
-                            String.format("%s: interrupted.", logSettingsOrNull.getOperationName()));
+                    logSettingsOrNull.getLogger()
+                            .log(
+                                    logSettingsOrNull.getLogLevelForError(),
+                                    String.format("%s: interrupted.", logSettingsOrNull
+                                            .getOperationName()));
                 }
                 return ExecutionResult.createInterrupted();
             }
@@ -456,8 +501,8 @@ public final class ConcurrencyUtilities
     }
 
     /**
-     * The same as {@link Thread#sleep(long)} but throws a {@link InterruptedExceptionUnchecked} on interruption
-     * rather than a {@link InterruptedException}.
+     * The same as {@link Thread#sleep(long)} but throws a {@link InterruptedExceptionUnchecked} on
+     * interruption rather than a {@link InterruptedException}.
      */
     public static void sleep(long millis) throws InterruptedExceptionUnchecked
     {
@@ -471,8 +516,8 @@ public final class ConcurrencyUtilities
     }
 
     /**
-     * The same as {@link Thread#join()} but throws a {@link InterruptedExceptionUnchecked} on interruption rather
-     * than a {@link InterruptedException}.
+     * The same as {@link Thread#join()} but throws a {@link InterruptedExceptionUnchecked} on
+     * interruption rather than a {@link InterruptedException}.
      */
     public static void join(Thread thread) throws InterruptedExceptionUnchecked
     {
@@ -486,8 +531,8 @@ public final class ConcurrencyUtilities
     }
 
     /**
-     * The same as {@link Thread#join(long)} but throws a {@link InterruptedExceptionUnchecked} on interruption
-     * rather than a {@link InterruptedException}.
+     * The same as {@link Thread#join(long)} but throws a {@link InterruptedExceptionUnchecked} on
+     * interruption rather than a {@link InterruptedException}.
      */
     public static void join(Thread thread, long millis) throws InterruptedExceptionUnchecked
     {
@@ -501,8 +546,8 @@ public final class ConcurrencyUtilities
     }
 
     /**
-     * The same as {@link Object#wait()} but throws a {@link InterruptedExceptionUnchecked} on interruption rather
-     * than a {@link InterruptedException}.
+     * The same as {@link Object#wait()} but throws a {@link InterruptedExceptionUnchecked} on
+     * interruption rather than a {@link InterruptedException}.
      */
     public static void wait(Object obj) throws InterruptedExceptionUnchecked
     {
@@ -516,8 +561,8 @@ public final class ConcurrencyUtilities
     }
 
     /**
-     * The same as {@link Object#wait(long)} but throws a {@link InterruptedExceptionUnchecked} on interruption
-     * rather than a {@link InterruptedException}.
+     * The same as {@link Object#wait(long)} but throws a {@link InterruptedExceptionUnchecked} on
+     * interruption rather than a {@link InterruptedException}.
      */
     public static void wait(Object obj, long millis) throws InterruptedExceptionUnchecked
     {
diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java
index 27c514926a3..e2a4bd2056c 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
 import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
@@ -78,13 +80,16 @@ import ch.systemsx.cisd.common.logging.LogLevel;
  *                 .errorTypeValueMapping(String.class, &quot;ERROR&quot;).get();
  * </pre>
  * <p>
- * <i>Note:</i> A MonitoringProxy object can only be used safely from more than one thread if
+ * <strong>Note:</strong> A MonitoringProxy object can only be used safely from more than one thread
+ * if
  * <ol>
  * <li>The proxied object is thread-safe</li>
- * <li>No observer / sensor pattern is used to detect activity (can produce "false negatives" on
- * hanging method calls)</li>
+ * <li>No proxy-wide observer / sensor pattern is used to detect activity (can produce
+ * "false negatives" on hanging method calls), however using a {@link IMonitorCommunicator} is safe.
+ * </li>
  * </ol>
  * 
+ * @see IMonitorCommunicator
  * @author Bernd Rinn
  */
 public class MonitoringProxy<T>
@@ -121,7 +126,8 @@ public class MonitoringProxy<T>
 
     private IActivitySensor sensorOrNull;
 
-    private Set<Thread> currentThreads = Collections.synchronizedSet(new HashSet<Thread>());
+    private Set<MonitorCommunicator> currentOperations =
+            Collections.synchronizedSet(new HashSet<MonitorCommunicator>());
 
     private static class DelegatingInvocationHandler<T> implements InvocationHandler
     {
@@ -268,13 +274,22 @@ public class MonitoringProxy<T>
                 final Object[] args)
         {
             final String callingThreadName = Thread.currentThread().getName();
-            currentThreads.add(Thread.currentThread());
+            final MonitorCommunicator communicator = new MonitorCommunicator();
+            currentOperations.add(communicator);
             try
             {
+                final Class<?>[] types = method.getParameterTypes();
+                if (types.length > 0 && types[types.length - 1] == IMonitorCommunicator.class)
+                {
+                    // Inject communicator into actual arguments
+                    args[args.length - 1] = communicator;
+                }
+
                 final Future<Object> future = executor.submit(new NamedCallable<Object>()
                     {
                         public Object call() throws Exception
                         {
+                            communicator.setMonitoredThread();
                             try
                             {
                                 return delegate.invoke(myProxy, method, args);
@@ -287,6 +302,9 @@ public class MonitoringProxy<T>
                                 {
                                     throw (Exception) th;
                                 }
+                            } finally
+                            {
+                                communicator.clearMonitoredThread();
                             }
                         }
 
@@ -325,11 +343,20 @@ public class MonitoringProxy<T>
                                     }
                                 }
                             };
-                return ConcurrencyUtilities.getResult(future, timingParameters.getTimeoutMillis(),
-                        true, logSettingsOrNull, sensorOrNull);
+                final ExecutionResult<Object> result =
+                        ConcurrencyUtilities.getResult(future, timingParameters.getTimeoutMillis(),
+                                true, logSettingsOrNull,
+                                new ConcurrencyUtilities.ICancellationNotifier()
+                                    {
+                                        public void willCancel()
+                                        {
+                                            communicator.cancel(false);
+                                        }
+                                    }, communicator);
+                return result;
             } finally
             {
-                currentThreads.remove(Thread.currentThread());
+                currentOperations.remove(communicator);
             }
         }
 
@@ -370,6 +397,135 @@ public class MonitoringProxy<T>
 
     }
 
+    /**
+     * A role for communication between the monitor and a monitored method call. The monitored
+     * method will signal activity to the monitor by calling {@link #update()} and learn whether it
+     * has been cancelled by the monitor by calling {@link #isCancelled()}. A
+     * <code>IMonitorCommunicator</code> object is injected into the actual parameters of a method
+     * call by the monitor if and only if the last formal parameter of a proxied method call is of
+     * type <code>IMonitorCommunicator</code>. The actual parameter of the
+     * <code>IMonitorCommunicator</code> provided by the caller is meaningless in this case. The
+     * constant {@link MonitoringProxy#MONITOR_COMMUNICATOR} can be used as a placeholder.
+     * <p>
+     * <em>Example:</em> The interface
+     * 
+     * <pre>
+     * interface IOperationsToBeMonitored
+     * {
+     *     void possiblyHangingAndRetriedOperation(int someArgument, IMonitorCommunicator communicator);
+     * }
+     * </pre>
+     * 
+     * may be implemented as
+     * 
+     * <pre>
+     * public void possiblyHangingAndRetriedOperation(int someArgument, IMonitorCommunicator communicator)
+     * {
+     *     ...
+     *     while (&lt;some condition&gt;)
+     *     {
+     *         if (communicator.isCancelled())
+     *         {
+     *             return;
+     *         }
+     *         communicator.update();
+     *     }
+     *     ...
+     * }
+     * </pre>
+     * 
+     * and called as
+     * 
+     * <pre>
+     * monitoredObject.possiblyHangingAndRetriedOperation(42, MonitoringProxy.MONITOR_COMMUNICATOR);
+     * </pre>
+     * <p>
+     * <strong>Note:</strong> The object is unique to the method call, not even a retried method
+     * call (e.g. in case of timeout) will get the same <code>IMonitorCommunicator</code> object
+     * instance. Thus the injected <code>IMonitorCommunicator</code> is a safe way of communicating
+     * between the {@link MonitoringProxy} and the exact method call.
+     */
+    public interface IMonitorCommunicator extends IActivityObserver
+    {
+        /**
+         * Returns <code>true</code> if the monitor has cancelled execution of the method.
+         */
+        public boolean isCancelled();
+    }
+
+    /**
+     * A placeholder for an {@link IMonitorCommunicator}. To be used in the actual parameter list of
+     * a monitored proxy call as an indication that a {@link IMonitorCommunicator} instance is used
+     * in this method call.
+     */
+    public static final IMonitorCommunicator MONITOR_COMMUNICATOR = new IMonitorCommunicator()
+        {
+            public void update()
+            {
+            }
+
+            public boolean isCancelled()
+            {
+                return false;
+            }
+        };
+
+    private class MonitorCommunicator implements IMonitorCommunicator, IActivitySensor
+    {
+        private final AtomicBoolean cancelled = new AtomicBoolean();
+
+        private final AtomicLong lastActivityAt = new AtomicLong(System.currentTimeMillis());
+
+        private Thread monitoredThreadOrNull;
+
+        synchronized void cancel(boolean interruptThread)
+        {
+            cancelled.set(true);
+            if (monitoredThreadOrNull != null && interruptThread)
+            {
+                monitoredThreadOrNull.interrupt();
+            }
+        }
+
+        synchronized void setMonitoredThread()
+        {
+            monitoredThreadOrNull = Thread.currentThread();
+        }
+
+        synchronized void clearMonitoredThread()
+        {
+            monitoredThreadOrNull = null;
+        }
+
+        public boolean isCancelled()
+        {
+            return cancelled.get();
+        }
+
+        public void update()
+        {
+            lastActivityAt.set(System.currentTimeMillis());
+        }
+
+        public long getLastActivityMillisMoreRecentThan(long thresholdMillis)
+        {
+            return sensorOrNull != null ? Math.max(lastActivityAt.get(), sensorOrNull
+                    .getLastActivityMillisMoreRecentThan(thresholdMillis)) : lastActivityAt.get();
+        }
+
+        public boolean hasActivityMoreRecentThan(long thresholdMillis)
+        {
+            return sensorOrNull != null ? sensorOrNull.hasActivityMoreRecentThan(thresholdMillis)
+                    || primHasActivityMoreRecentThan(thresholdMillis)
+                    : primHasActivityMoreRecentThan(thresholdMillis);
+        }
+
+        private boolean primHasActivityMoreRecentThan(long thresholdMillis)
+        {
+            return (System.currentTimeMillis() - lastActivityAt.get() < thresholdMillis);
+        }
+    }
+
     /**
      * Creates a {@link MonitoringProxy} of type <var>interfaceClass</var> for the
      * <var>objectToProxyFor</var>.
@@ -497,8 +653,17 @@ public class MonitoringProxy<T>
     }
 
     /**
-     * Sets an sensor of fine-grained activity. Activity on this sensor can prevent a method
-     * invocation from timing out.
+     * Sets a sensor for detecting activity during a monitored method call. Activity on this sensor
+     * can prevent the monitored method invocation from timing out.
+     * <p>
+     * <strong>Note:</strong> This sensor is only meant to be used for activity that has to be
+     * detected in the program's execution environment, e.g. recent disk or network activity. If the
+     * activity can be sensed in the monitored method call itself, use the
+     * {@link IMonitorCommunicator} instead. Using the <var>sensor</var> set with this method to
+     * sense activity in the method invocation itself when the invocation may be retried is
+     * inherently unsafe!
+     * 
+     * @see IMonitorCommunicator
      */
     public MonitoringProxy<T> sensor(IActivitySensor sensor)
     {
@@ -519,12 +684,11 @@ public class MonitoringProxy<T>
      */
     public void cancelCurrentOperations()
     {
-        synchronized (currentThreads)
+        synchronized (currentOperations)
         {
-            for (Thread t : currentThreads)
+            for (MonitorCommunicator op : currentOperations)
             {
-                t.interrupt();
-
+                op.cancel(true);
             }
         }
     }
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java
index 7053cab8c5a..149098fb9b0 100644
--- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java
@@ -285,7 +285,7 @@ public class ConcurrencyUtilitiesTest
                     }
                 });
             final ExecutionResult<String> result =
-                    ConcurrencyUtilities.getResult(future, 50L, true, logSettings, sensor);
+                    ConcurrencyUtilities.getResult(future, 50L, true, logSettings, null, sensor);
             assertEquals(ExecutionStatus.COMPLETE, result.getStatus());
             assertEquals(msg, result.tryGetResult());
             assertNull(result.tryGetException());
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java
index d85e2a8d8ad..569b998d495 100644
--- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import org.testng.annotations.AfterClass;
@@ -37,6 +38,7 @@ import org.testng.annotations.Test;
 import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
 import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
 import ch.systemsx.cisd.common.TimingParameters;
+import ch.systemsx.cisd.common.concurrent.MonitoringProxy.IMonitorCommunicator;
 import ch.systemsx.cisd.common.logging.ConsoleLogger;
 import ch.systemsx.cisd.common.logging.ISimpleLogger;
 import ch.systemsx.cisd.common.test.Retry10;
@@ -94,6 +96,8 @@ public class MonitoringProxyTest
 
         void busyUpdatingActivity();
 
+        void busyUpdatingActivity(IMonitorCommunicator communicator);
+
         String getString(boolean hang);
 
         boolean getBoolean(boolean hang);
@@ -108,7 +112,14 @@ public class MonitoringProxyTest
 
         void worksOnSecondInvocation() throws RetryItException;
 
+        void resetInvocationsCancelled();
+
+        int getInvocationsCancelled();
+
+        void worksOnSecondInvocation(IMonitorCommunicator communicator) throws RetryItException;
+
         void worksOnThirdInvocation() throws RetryItException;
+
     }
 
     private final static Pattern THREAD_NAME_PATTERN =
@@ -158,6 +169,22 @@ public class MonitoringProxyTest
             }
         }
 
+        public void busyUpdatingActivity(IMonitorCommunicator communicator)
+        {
+            checkThreadName();
+            threadToStop = Thread.currentThread();
+            final long timeToHangMillis = (long) (TIMEOUT_MILLIS * 1.5);
+            final long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start < timeToHangMillis)
+            {
+                if (communicator.isCancelled())
+                {
+                    return;
+                }
+                communicator.update();
+            }
+        }
+
         public boolean getBoolean(boolean hang)
         {
             checkThreadName();
@@ -212,10 +239,43 @@ public class MonitoringProxyTest
 
         int invocationCount2 = 0;
 
+        AtomicInteger invocationsCancelled = new AtomicInteger();
+
+        public int getInvocationsCancelled()
+        {
+            return invocationsCancelled.get();
+        }
+
+        public void resetInvocationsCancelled()
+        {
+            invocationsCancelled.set(0);
+        }
+
+        public void worksOnSecondInvocation(IMonitorCommunicator communicator)
+                throws RetryItException
+        {
+            checkThreadName();
+            if (++invocationCount2 < 2)
+            {
+                try
+                {
+                    ConcurrencyUtilities.sleep(TIMEOUT_MILLIS * 3);
+                } finally
+                {
+                    if (communicator.isCancelled())
+                    {
+                        invocationsCancelled.incrementAndGet();
+                    }
+                }
+            }
+        }
+
+        int invocationCount3 = 0;
+
         public void worksOnThirdInvocation() throws RetryItException
         {
             checkThreadName();
-            if (++invocationCount2 < 3)
+            if (++invocationCount3 < 3)
             {
                 throw new RetryItException();
             }
@@ -224,7 +284,7 @@ public class MonitoringProxyTest
     }
 
     @BeforeClass
-    public void createMonitoringProxy() throws NoSuchMethodException
+    public void createMonitoringProxies() throws NoSuchMethodException
     {
         final ISimpleLogger logger = new ConsoleLogger();
         observerSensor = new RecordingActivityObserverSensor();
@@ -306,6 +366,12 @@ public class MonitoringProxyTest
         exceptionThrowingProxy.busyUpdatingActivity();
     }
 
+    @Test(groups = "slow")
+    public void testNoTimeoutDueToCommunicatorUpdate()
+    {
+        exceptionThrowingProxy.busyUpdatingActivity(null);
+    }
+
     @Test
     public void testGetStringNullReturningPolicy()
     {
@@ -436,6 +502,16 @@ public class MonitoringProxyTest
         retryingOnceExceptionThrowingProxy.worksOnSecondInvocation();
     }
 
+    @Test(groups = "slow")
+    public void testRetryOnceFailOnceWithCommunicator()
+    {
+        retryingOnceExceptionThrowingProxy.resetInvocationsCancelled();
+        retryingOnceExceptionThrowingProxy
+                .worksOnSecondInvocation(MonitoringProxy.MONITOR_COMMUNICATOR);
+        ConcurrencyUtilities.sleep(TIMEOUT_MILLIS);
+        assertEquals(1, retryingOnceExceptionThrowingProxy.getInvocationsCancelled());
+    }
+
     @Test(expectedExceptions = RetryItException.class, retryAnalyzer = Retry10.class)
     public void testRetryOnceFailTwice()
     {
-- 
GitLab