Skip to content
Snippets Groups Projects
Commit 9b8d4186 authored by brinn's avatar brinn
Browse files

Add support for calling methods asynchronously to MonitoringProxy.

SVN: 27123
parent c215f62b
No related branches found
No related tags found
No related merge requests found
......@@ -26,11 +26,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
import ch.systemsx.cisd.base.namedthread.NamedCallable;
......@@ -106,6 +108,12 @@ public class MonitoringProxy<T>
private final Map<Method, Object> errorMethodValueMap;
private final Map<Class<?>, Object> asyncOkTypeValueMap;
private final Map<Method, Object> asyncOkMethodValueMap;
private final Set<Method> asyncMethodSet;
private final Set<Class<? extends Exception>> exceptionClassesSuitableForRetrying;
private final MonitoringInvocationHandler handler;
......@@ -122,10 +130,12 @@ public class MonitoringProxy<T>
private ISimpleLogger loggerOrNull;
private LogLevel logLevelForSuccessfulCalls;
private IMonitoringProxyLogger invocationLoggerOrNull;
private IActivitySensor sensorOrNull;
private ExecutorService executorService = defaultExecutorService;
private Set<MonitorCommunicator> currentOperations =
......@@ -192,6 +202,22 @@ public class MonitoringProxy<T>
result.put(Short.TYPE, (short) 0);
result.put(Integer.TYPE, 0);
result.put(Long.TYPE, 0L);
result.put(Float.TYPE, 0f);
result.put(Double.TYPE, 0.0);
return result;
}
private static Map<Class<?>, Object> createDefaultOKTypeValueMap()
{
final Map<Class<?>, Object> result = new HashMap<Class<?>, Object>();
result.put(Void.TYPE, Void.TYPE.cast(null));
result.put(Boolean.TYPE, true);
result.put(Byte.TYPE, (byte) 1);
result.put(Short.TYPE, (short) 1);
result.put(Integer.TYPE, 1);
result.put(Long.TYPE, 1L);
result.put(Float.TYPE, 1f);
result.put(Double.TYPE, 1.0);
return result;
}
......@@ -227,42 +253,72 @@ public class MonitoringProxy<T>
private ExecutionResult<Object> retryingExecuteInThread(final Object myProxy,
final Method method, final Object[] args)
{
int counter = 0;
ExecutionResult<Object> result = null;
boolean willRetry;
do
final Callable<ExecutionResult<Object>> callable =
new Callable<ExecutionResult<Object>>()
{
@Override
public ExecutionResult<Object> call() throws Exception
{
int counter = 0;
ExecutionResult<Object> result = null;
boolean willRetry;
do
{
result = executeInThread(myProxy, method, args);
if (result.getStatus() == ExecutionStatus.COMPLETE
|| result.getStatus() == ExecutionStatus.INTERRUPTED
|| exceptionStatusUnsuitableForRetry(result))
{
willRetry = false;
} else
{
willRetry =
(counter++ < timingParameters
.getMaxRetriesOnFailure());
}
if (invocationLoggerOrNull != null)
{
invocationLoggerOrNull.log(method, result, willRetry);
}
if (willRetry
&& timingParameters
.getIntervalToWaitAfterFailureMillis() > 0)
{
try
{
Thread.sleep(timingParameters
.getIntervalToWaitAfterFailureMillis());
} catch (InterruptedException ex)
{
result = ExecutionResult.createInterrupted();
if (invocationLoggerOrNull != null)
{
invocationLoggerOrNull.log(method, result, false);
}
return result;
}
}
} while (willRetry);
return result;
}
};
try
{
result = executeInThread(myProxy, method, args);
if (result.getStatus() == ExecutionStatus.COMPLETE
|| result.getStatus() == ExecutionStatus.INTERRUPTED
|| exceptionStatusUnsuitableForRetry(result))
if (asyncMethodSet.contains(method))
{
willRetry = false;
final Object returnValue =
asyncOkMethodValueMap.containsKey(method) ? asyncOkMethodValueMap
.get(method) : asyncOkTypeValueMap.get(method.getReturnType());
executorService.submit(callable);
return ExecutionResult.create(returnValue);
} else
{
willRetry = (counter++ < timingParameters.getMaxRetriesOnFailure());
}
if (invocationLoggerOrNull != null)
{
invocationLoggerOrNull.log(method, result, willRetry);
}
if (willRetry && timingParameters.getIntervalToWaitAfterFailureMillis() > 0)
{
try
{
Thread.sleep(timingParameters.getIntervalToWaitAfterFailureMillis());
} catch (InterruptedException ex)
{
result = ExecutionResult.createInterrupted();
if (invocationLoggerOrNull != null)
{
invocationLoggerOrNull.log(method, result, false);
}
return result;
}
return callable.call();
}
} while (willRetry);
return result;
} catch (Exception ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
}
private boolean exceptionStatusUnsuitableForRetry(ExecutionResult<Object> result)
......@@ -349,6 +405,12 @@ public class MonitoringProxy<T>
return describe(method);
}
}
@Override
public LogLevel getLogLevelForSuccess()
{
return logLevelForSuccessfulCalls;
}
};
final ExecutionResult<Object> result =
ConcurrencyUtilities.getResult(future, timingParameters.getTimeoutMillis(),
......@@ -387,7 +449,11 @@ public class MonitoringProxy<T>
assert interfaceClass.isInterface();
this.errorTypeValueMap = createDefaultErrorTypeValueMap();
this.asyncOkTypeValueMap = createDefaultOKTypeValueMap();
this.errorMethodValueMap = new HashMap<Method, Object>();
this.asyncOkMethodValueMap = new HashMap<Method, Object>();
this.asyncMethodSet = new HashSet<Method>();
this.logLevelForSuccessfulCalls = LogLevel.OFF;
this.exceptionClassesSuitableForRetrying = new HashSet<Class<? extends Exception>>();
this.timingParameters = TimingParameters.getNoTimeoutNoRetriesParameters();
this.delegate = new DelegatingInvocationHandler<T>(objectToProxyFor);
......@@ -610,7 +676,7 @@ public class MonitoringProxy<T>
this.loggerOrNull = newLogger;
return this;
}
/**
* Set the <var>newExecutorService</var> to use calls to this monitoring proxy.
* <p>
......@@ -658,6 +724,45 @@ public class MonitoringProxy<T>
return this;
}
/**
* The monitoring proxy can call some methods in a simple asynchronous way. This method is to
* state that the given <var>method</var> should be called asynchronously.
* <p>
* <i>Note that asynchronously called methods cannot be monitored for success but will return a
* value signing "OK" to the caller.<i>
*/
public MonitoringProxy<T> callAsynchronously(Method method)
{
asyncMethodSet.add(method);
return this;
}
/**
* Sets a return <var>value</var> that signals "OK" for the type <var>clazz</var>.
* <p>
* <i>A value set by this method is only relevant if the proxy is configured to run a method
* asynchronously.</i>
*/
public <V> MonitoringProxy<T> asyncOkTypeValueMapping(Class<V> clazz, V value)
{
asyncOkTypeValueMap.put(clazz, value);
return this;
}
/**
* Sets a return <var>value</var> that signals "OK" for the given <var>method</var>. This
* <var>value</var> takes precedence over the ok type mapping for methods with the same return
* type.
* <p>
* <i>A value set by this method is only relevant if the proxy is configured to run a method
* asynchronously.</i>
*/
public MonitoringProxy<T> asyncOkMethodValueMapping(Method method, Object value)
{
asyncOkMethodValueMap.put(method, value);
return this;
}
/**
* Add an {@link Exception} class that is suitable for retrying the operation.
*/
......@@ -677,6 +782,17 @@ public class MonitoringProxy<T>
return this;
}
/**
* Sets a log level for successful calls to the proxy.
* <p>
* Default: {@link LogLevel#OFF}.
*/
public MonitoringProxy<T> logLevelForSuccessfulCalls(LogLevel newLogLevelForSuccessfulCalls)
{
this.logLevelForSuccessfulCalls = newLogLevelForSuccessfulCalls;
return this;
}
/**
* Sets a sensor for detecting activity during a monitored method call. Activity on this sensor
* can prevent the monitored method invocation from timing out.
......
......@@ -20,6 +20,7 @@ import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
......@@ -44,6 +45,11 @@ public class AssertingLogger implements ISimpleLogger
records.add(new LogRecord(level, message, throwableOrNull));
}
public void reset()
{
records.clear();
}
public void assertNumberOfMessage(final int expectedNumberOfMessages)
{
assertEquals(expectedNumberOfMessages, records.size());
......@@ -79,6 +85,19 @@ public class AssertingLogger implements ISimpleLogger
assertTrue(assertError, message.matches(throwableMessagePattern));
}
public int getNumberOfRecords()
{
return records.size();
}
public void print(PrintStream out)
{
for (LogRecord record : records)
{
out.println(record);
}
}
private static class LogRecord
{
final LogLevel level;
......@@ -93,5 +112,19 @@ public class AssertingLogger implements ISimpleLogger
this.message = message;
this.throwableOrNull = throwableOrNull;
}
@Override
public String toString()
{
if (throwableOrNull != null)
{
return "LogRecord [level=" + level + ", message=" + message + ", throwable="
+ throwableOrNull + "]";
} else
{
return "LogRecord [level=" + level + ", message=" + message + "]";
}
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment