diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java index abfac9d301773f0f477103e067e49b6101dcebc7..da17857d9df94540df612511bfa29cc8a9f1e6fa 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java @@ -96,7 +96,7 @@ public class MonitoringProxy<T> { private final static int NUMBER_OF_CORE_THREADS = 10; - private final static ExecutorService executor = + private final static ExecutorService defaultExecutorService = new NamingThreadPoolExecutor("Monitoring Proxy").corePoolSize(NUMBER_OF_CORE_THREADS) .daemonize(); @@ -125,6 +125,8 @@ public class MonitoringProxy<T> private IMonitoringProxyLogger invocationLoggerOrNull; private IActivitySensor sensorOrNull; + + private ExecutorService executorService = defaultExecutorService; private Set<MonitorCommunicator> currentOperations = Collections.synchronizedSet(new HashSet<MonitorCommunicator>()); @@ -140,7 +142,6 @@ public class MonitoringProxy<T> @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable - { try { @@ -202,7 +203,6 @@ public class MonitoringProxy<T> private class MonitoringInvocationHandler implements InvocationHandler { - @Override public Object invoke(final Object myProxy, final Method method, final Object[] args) throws Throwable @@ -287,7 +287,7 @@ public class MonitoringProxy<T> args[args.length - 1] = communicator; } - final Future<Object> future = executor.submit(new NamedCallable<Object>() + final Future<Object> future = executorService.submit(new NamedCallable<Object>() { @Override public Object call() throws Exception @@ -610,6 +610,17 @@ public class MonitoringProxy<T> this.loggerOrNull = newLogger; return this; } + + /** + * Set the <var>newExecutorService</var> to use calls to this monitoring proxy. + * <p> + * If not set, a default executor service will be used. + */ + public MonitoringProxy<T> executorService(ExecutorService newExecutorService) + { + this.executorService = newExecutorService; + return this; + } /** * Sets the logger to be used for all invocations of methods of this proxy. diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java index 6c088b2c2b24ab3cbac135bb3fa184414a755ee2..931b3987886c536bb2a76cbac1435160f2767ddd 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -37,6 +38,7 @@ import org.testng.annotations.Test; import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; +import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor; import ch.systemsx.cisd.base.tests.Retry50; import ch.systemsx.cisd.common.TimingParameters; import ch.systemsx.cisd.common.concurrent.MonitoringProxy.IMonitorCommunicator; @@ -75,6 +77,8 @@ public class MonitoringProxyTest private ITest retryingOnceExceptionThrowingProxy; private ITest retryingTwiceExceptionThrowingProxy; + + private ITest nonDefaultExecutorServiceProxy; private static class SignalException extends RuntimeException { @@ -100,6 +104,8 @@ public class MonitoringProxyTest void busyUpdatingActivity(IMonitorCommunicator communicator); String getString(boolean hang); + + String getThreadName(); boolean getBoolean(boolean hang); @@ -126,6 +132,9 @@ public class MonitoringProxyTest private final static Pattern THREAD_NAME_PATTERN = Pattern .compile("Monitoring Proxy-T[0-9]+::main::" + THREAD_NAME); + private final static Pattern MY_SPECIAL_THREAD_NAME_PATTERN = Pattern + .compile("My Special Monitoring Proxy-T[0-9]+::main::" + THREAD_NAME); + private class TestImpl implements ITest { private final IActivityObserver observer; @@ -296,6 +305,12 @@ public class MonitoringProxyTest } } + @Override + public String getThreadName() + { + return Thread.currentThread().getName(); + } + } @BeforeClass @@ -314,6 +329,23 @@ public class MonitoringProxyTest ITest.class.getMethod("getSpecialStatus", new Class<?>[] { Boolean.TYPE }), Status.SPECIAL_UUUPS).sensor(observerSensor) .errorLog(logger).get(); + final ExecutorService executorService = + new NamingThreadPoolExecutor("My Special Monitoring Proxy").corePoolSize(1) + .daemonize(); + + nonDefaultExecutorServiceProxy = + MonitoringProxy + .create(ITest.class, new TestImpl(observerSensor)) + .timing(TimingParameters.createNoRetries(TIMEOUT_MILLIS)) + .errorValueOnTimeout() + .name(THREAD_NAME) + .errorTypeValueMapping(Status.class, Status.UUUPS) + .errorMethodValueMapping( + ITest.class.getMethod("getSpecialStatus", new Class<?>[] + { Boolean.TYPE }), Status.SPECIAL_UUUPS).sensor(observerSensor) + .errorLog(logger) + .executorService(executorService) + .get(); exceptionThrowingProxy = MonitoringProxy.create(ITest.class, new TestImpl(observerSensor)) .timing(TimingParameters.createNoRetries(TIMEOUT_MILLIS)).name(THREAD_NAME) @@ -362,6 +394,13 @@ public class MonitoringProxyTest defaultReturningProxy.idle(true); } + @Test + public void testNonDefaultExecutorService() + { + final String threadName = nonDefaultExecutorServiceProxy.getThreadName(); + assertTrue(threadName, MY_SPECIAL_THREAD_NAME_PATTERN.matcher(threadName).matches()); + } + @Test(expectedExceptions = SignalException.class, retryAnalyzer = Retry50.class) public void testThrowExceptionNullReturningPolicy() {