Skip to content
Snippets Groups Projects
Commit b59c5025 authored by brinn's avatar brinn
Browse files

Support using proxy-specific executor services.

SVN: 27121
parent 564bcf4c
No related branches found
No related tags found
No related merge requests found
...@@ -96,7 +96,7 @@ public class MonitoringProxy<T> ...@@ -96,7 +96,7 @@ public class MonitoringProxy<T>
{ {
private final static int NUMBER_OF_CORE_THREADS = 10; private final static int NUMBER_OF_CORE_THREADS = 10;
private final static ExecutorService executor = private final static ExecutorService defaultExecutorService =
new NamingThreadPoolExecutor("Monitoring Proxy").corePoolSize(NUMBER_OF_CORE_THREADS) new NamingThreadPoolExecutor("Monitoring Proxy").corePoolSize(NUMBER_OF_CORE_THREADS)
.daemonize(); .daemonize();
...@@ -125,6 +125,8 @@ public class MonitoringProxy<T> ...@@ -125,6 +125,8 @@ public class MonitoringProxy<T>
private IMonitoringProxyLogger invocationLoggerOrNull; private IMonitoringProxyLogger invocationLoggerOrNull;
private IActivitySensor sensorOrNull; private IActivitySensor sensorOrNull;
private ExecutorService executorService = defaultExecutorService;
private Set<MonitorCommunicator> currentOperations = private Set<MonitorCommunicator> currentOperations =
Collections.synchronizedSet(new HashSet<MonitorCommunicator>()); Collections.synchronizedSet(new HashSet<MonitorCommunicator>());
...@@ -140,7 +142,6 @@ public class MonitoringProxy<T> ...@@ -140,7 +142,6 @@ public class MonitoringProxy<T>
@Override @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{ {
try try
{ {
...@@ -202,7 +203,6 @@ public class MonitoringProxy<T> ...@@ -202,7 +203,6 @@ public class MonitoringProxy<T>
private class MonitoringInvocationHandler implements InvocationHandler private class MonitoringInvocationHandler implements InvocationHandler
{ {
@Override @Override
public Object invoke(final Object myProxy, final Method method, final Object[] args) public Object invoke(final Object myProxy, final Method method, final Object[] args)
throws Throwable throws Throwable
...@@ -287,7 +287,7 @@ public class MonitoringProxy<T> ...@@ -287,7 +287,7 @@ public class MonitoringProxy<T>
args[args.length - 1] = communicator; args[args.length - 1] = communicator;
} }
final Future<Object> future = executor.submit(new NamedCallable<Object>() final Future<Object> future = executorService.submit(new NamedCallable<Object>()
{ {
@Override @Override
public Object call() throws Exception public Object call() throws Exception
...@@ -610,6 +610,17 @@ public class MonitoringProxy<T> ...@@ -610,6 +610,17 @@ public class MonitoringProxy<T>
this.loggerOrNull = newLogger; this.loggerOrNull = newLogger;
return this; return this;
} }
/**
* Set the <var>newExecutorService</var> to use calls to this monitoring proxy.
* <p>
* If not set, a default executor service will be used.
*/
public MonitoringProxy<T> executorService(ExecutorService newExecutorService)
{
this.executorService = newExecutorService;
return this;
}
/** /**
* Sets the logger to be used for all invocations of methods of this proxy. * Sets the logger to be used for all invocations of methods of this proxy.
......
...@@ -26,6 +26,7 @@ import java.util.Arrays; ...@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
...@@ -37,6 +38,7 @@ import org.testng.annotations.Test; ...@@ -37,6 +38,7 @@ import org.testng.annotations.Test;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor;
import ch.systemsx.cisd.base.tests.Retry50; import ch.systemsx.cisd.base.tests.Retry50;
import ch.systemsx.cisd.common.TimingParameters; import ch.systemsx.cisd.common.TimingParameters;
import ch.systemsx.cisd.common.concurrent.MonitoringProxy.IMonitorCommunicator; import ch.systemsx.cisd.common.concurrent.MonitoringProxy.IMonitorCommunicator;
...@@ -75,6 +77,8 @@ public class MonitoringProxyTest ...@@ -75,6 +77,8 @@ public class MonitoringProxyTest
private ITest retryingOnceExceptionThrowingProxy; private ITest retryingOnceExceptionThrowingProxy;
private ITest retryingTwiceExceptionThrowingProxy; private ITest retryingTwiceExceptionThrowingProxy;
private ITest nonDefaultExecutorServiceProxy;
private static class SignalException extends RuntimeException private static class SignalException extends RuntimeException
{ {
...@@ -100,6 +104,8 @@ public class MonitoringProxyTest ...@@ -100,6 +104,8 @@ public class MonitoringProxyTest
void busyUpdatingActivity(IMonitorCommunicator communicator); void busyUpdatingActivity(IMonitorCommunicator communicator);
String getString(boolean hang); String getString(boolean hang);
String getThreadName();
boolean getBoolean(boolean hang); boolean getBoolean(boolean hang);
...@@ -126,6 +132,9 @@ public class MonitoringProxyTest ...@@ -126,6 +132,9 @@ public class MonitoringProxyTest
private final static Pattern THREAD_NAME_PATTERN = Pattern private final static Pattern THREAD_NAME_PATTERN = Pattern
.compile("Monitoring Proxy-T[0-9]+::main::" + THREAD_NAME); .compile("Monitoring Proxy-T[0-9]+::main::" + THREAD_NAME);
private final static Pattern MY_SPECIAL_THREAD_NAME_PATTERN = Pattern
.compile("My Special Monitoring Proxy-T[0-9]+::main::" + THREAD_NAME);
private class TestImpl implements ITest private class TestImpl implements ITest
{ {
private final IActivityObserver observer; private final IActivityObserver observer;
...@@ -296,6 +305,12 @@ public class MonitoringProxyTest ...@@ -296,6 +305,12 @@ public class MonitoringProxyTest
} }
} }
@Override
public String getThreadName()
{
return Thread.currentThread().getName();
}
} }
@BeforeClass @BeforeClass
...@@ -314,6 +329,23 @@ public class MonitoringProxyTest ...@@ -314,6 +329,23 @@ public class MonitoringProxyTest
ITest.class.getMethod("getSpecialStatus", new Class<?>[] ITest.class.getMethod("getSpecialStatus", new Class<?>[]
{ Boolean.TYPE }), Status.SPECIAL_UUUPS).sensor(observerSensor) { Boolean.TYPE }), Status.SPECIAL_UUUPS).sensor(observerSensor)
.errorLog(logger).get(); .errorLog(logger).get();
final ExecutorService executorService =
new NamingThreadPoolExecutor("My Special Monitoring Proxy").corePoolSize(1)
.daemonize();
nonDefaultExecutorServiceProxy =
MonitoringProxy
.create(ITest.class, new TestImpl(observerSensor))
.timing(TimingParameters.createNoRetries(TIMEOUT_MILLIS))
.errorValueOnTimeout()
.name(THREAD_NAME)
.errorTypeValueMapping(Status.class, Status.UUUPS)
.errorMethodValueMapping(
ITest.class.getMethod("getSpecialStatus", new Class<?>[]
{ Boolean.TYPE }), Status.SPECIAL_UUUPS).sensor(observerSensor)
.errorLog(logger)
.executorService(executorService)
.get();
exceptionThrowingProxy = exceptionThrowingProxy =
MonitoringProxy.create(ITest.class, new TestImpl(observerSensor)) MonitoringProxy.create(ITest.class, new TestImpl(observerSensor))
.timing(TimingParameters.createNoRetries(TIMEOUT_MILLIS)).name(THREAD_NAME) .timing(TimingParameters.createNoRetries(TIMEOUT_MILLIS)).name(THREAD_NAME)
...@@ -362,6 +394,13 @@ public class MonitoringProxyTest ...@@ -362,6 +394,13 @@ public class MonitoringProxyTest
defaultReturningProxy.idle(true); defaultReturningProxy.idle(true);
} }
@Test
public void testNonDefaultExecutorService()
{
final String threadName = nonDefaultExecutorServiceProxy.getThreadName();
assertTrue(threadName, MY_SPECIAL_THREAD_NAME_PATTERN.matcher(threadName).matches());
}
@Test(expectedExceptions = SignalException.class, retryAnalyzer = Retry50.class) @Test(expectedExceptions = SignalException.class, retryAnalyzer = Retry50.class)
public void testThrowExceptionNullReturningPolicy() public void testThrowExceptionNullReturningPolicy()
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment