diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java index 2716d13f62392fc0823caf4b8d0db8e8f9288028..27c514926a3f663cd7da5f5c6f070e9b99a843a1 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java @@ -21,6 +21,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -120,6 +121,8 @@ public class MonitoringProxy<T> private IActivitySensor sensorOrNull; + private Set<Thread> currentThreads = Collections.synchronizedSet(new HashSet<Thread>()); + private static class DelegatingInvocationHandler<T> implements InvocationHandler { private final T objectToProxyFor; @@ -265,62 +268,69 @@ public class MonitoringProxy<T> final Object[] args) { final String callingThreadName = Thread.currentThread().getName(); - final Future<Object> future = executor.submit(new NamedCallable<Object>() - { - public Object call() throws Exception + currentThreads.add(Thread.currentThread()); + try + { + final Future<Object> future = executor.submit(new NamedCallable<Object>() { - try - { - return delegate.invoke(myProxy, method, args); - } catch (Throwable th) + public Object call() throws Exception { - if (th instanceof Error) + try { - throw (Error) th; - } else + return delegate.invoke(myProxy, method, args); + } catch (Throwable th) { - throw (Exception) th; + if (th instanceof Error) + { + throw (Error) th; + } else + { + throw (Exception) th; + } } } - } - public String getCallableName() - { - if (nameOrNull != null) - { - return callingThreadName + "::" + nameOrNull; - } else - { - return callingThreadName + "::" + describe(method); - } - } - }); - final ILogSettings logSettingsOrNull = - (loggerOrNull == null) ? null : new ILogSettings() + public String getCallableName() { - public LogLevel getLogLevelForError() + if (nameOrNull != null) { - return LogLevel.ERROR; - } - - public ISimpleLogger getLogger() + return callingThreadName + "::" + nameOrNull; + } else { - return loggerOrNull; + return callingThreadName + "::" + describe(method); } - - public String getOperationName() + } + }); + final ILogSettings logSettingsOrNull = + (loggerOrNull == null) ? null : new ILogSettings() { - if (nameOrNull != null) + public LogLevel getLogLevelForError() { - return describe(method) + "[" + nameOrNull + "]"; - } else + return LogLevel.ERROR; + } + + public ISimpleLogger getLogger() { - return describe(method); + return loggerOrNull; } - } - }; - return ConcurrencyUtilities.getResult(future, timingParameters.getTimeoutMillis(), - true, logSettingsOrNull, sensorOrNull); + + public String getOperationName() + { + if (nameOrNull != null) + { + return describe(method) + "[" + nameOrNull + "]"; + } else + { + return describe(method); + } + } + }; + return ConcurrencyUtilities.getResult(future, timingParameters.getTimeoutMillis(), + true, logSettingsOrNull, sensorOrNull); + } finally + { + currentThreads.remove(Thread.currentThread()); + } } private Object getErrorValue(final Method method) @@ -504,4 +514,19 @@ public class MonitoringProxy<T> return proxy; } + /** + * Cancel all currently running operations. + */ + public void cancelCurrentOperations() + { + synchronized (currentThreads) + { + for (Thread t : currentThreads) + { + t.interrupt(); + + } + } + } + }