From f11b6f604f6beffc085d9c077144bc89b6c7660f Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Mon, 9 Jun 2008 06:26:19 +0000
Subject: [PATCH] add: new class MonitoringProxy and unit tests

SVN: 6500
---
 .../common/concurrent/MonitoringProxy.java    | 333 ++++++++++++++++++
 .../concurrent/MonitoringProxyTest.java       | 312 ++++++++++++++++
 2 files changed, 645 insertions(+)
 create mode 100644 common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java
 create mode 100644 common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java

diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java
new file mode 100644
index 00000000000..1492fbbfd74
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/MonitoringProxy.java
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2008 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.concurrent;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import ch.systemsx.cisd.common.exceptions.StopException;
+import ch.systemsx.cisd.common.exceptions.TimeoutException;
+
+/**
+ * A class that can provide a dynamic {@link Proxy} for an interface that delegates the method
+ * invocations to an implementation of this interface, monitoring for calls to
+ * {@link Thread#interrupt()} and for timeouts. (Note that by default no timeout is set.)
+ * <p>
+ * On calls to {@link Thread#interrupt()} the proxy will throw a {@link StopException}, on timeouts
+ * a {@link TimeoutException}.
+ * <p>
+ * The proxy can be configured by chaining. If all options have been set, the actual proxy can be
+ * obtained by calling {@link #get()}. In order to e.g. set the timeout to 10s, the following call
+ * chain can be used:
+ * 
+ * <pre>
+ * If proxy = MonitoringProxy.create(If.class, someInstance).timeoutMillis(10000L).get();
+ * </pre>
+ * 
+ * Instead of throwing a exception, the proxy can also be configured to provide special error values
+ * on error conditions. This configuration is done by {@link #errorValueOnInterrupt()} for thread
+ * interrupts and {@link #errorValueOnTimeout()} for timeouts.
+ * <p>
+ * The error return values can be set, either for the type of the return value of a method, or by
+ * the method itself. If present, the specific method-value mapping will take precedence of the
+ * generic return-type-value mapping. In order to set a value "ERROR" for String return types, use a
+ * chaining call like:
+ * 
+ * <pre>
+ * If proxy =
+ *         MonitoringProxy.create(If.class, someInstance).errorValueOnInterrupt()
+ *                 .errorTypeValueMapping(String.class, &quot;ERROR&quot;).get();
+ * </pre>
+ * 
+ * @author Bernd Rinn
+ */
+public class MonitoringProxy<T>
+{
+
+    private final static ExecutorService executor =
+            new NamingThreadPoolExecutor("Monitoring Proxy").daemonize();
+
+    private final DelegatingInvocationHandler<T> delegate;
+
+    private final Map<Class, Object> errorTypeValueMap;
+
+    private final Map<Method, Object> errorMethodValueMap;
+
+    private final MonitoringInvocationHandler handler;
+
+    private final T proxy;
+
+    private long timeoutMillis;
+
+    private boolean errorValueOnTimeout;
+
+    private boolean errorValueOnInterrupt;
+
+    private String nameOrNull;
+
+    private static class DelegatingInvocationHandler<T> implements InvocationHandler
+    {
+
+        private final T objectToProxyFor;
+
+        private DelegatingInvocationHandler(T objectToProxyFor)
+        {
+            this.objectToProxyFor = objectToProxyFor;
+        }
+
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+        {
+            try
+            {
+                return method.invoke(objectToProxyFor, args);
+            } catch (InvocationTargetException ex)
+            {
+                throw ex.getTargetException();
+            }
+        }
+
+    }
+
+    private static String describe(Method method)
+    {
+        final StringBuilder builder = new StringBuilder(100);
+        builder.append("Call to method '");
+        builder.append(method.getDeclaringClass().getSimpleName());
+        builder.append('.');
+        builder.append(method.getName());
+        builder.append('(');
+        boolean addComma = false;
+        for (Class<?> clazz : method.getParameterTypes())
+        {
+            if (addComma)
+            {
+                builder.append(',');
+            }
+            builder.append(clazz.getSimpleName());
+            addComma = true;
+        }
+        builder.append(")'");
+        return builder.toString();
+    }
+
+    private static Map<Class, Object> createDefaultErrorTypeValueMap()
+    {
+        final Map<Class, Object> result = new HashMap<Class, Object>();
+        result.put(Void.TYPE, Void.TYPE.cast(null));
+        result.put(Boolean.TYPE, false);
+        result.put(Byte.TYPE, (byte) 0);
+        result.put(Short.TYPE, (short) 0);
+        result.put(Integer.TYPE, 0);
+        result.put(Long.TYPE, 0L);
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T cast(Class<T> interfaceClass, Object objectToProxyFor)
+    {
+        return (T) objectToProxyFor;
+    }
+
+    private class MonitoringInvocationHandler implements InvocationHandler
+    {
+
+        public Object invoke(final Object myProxy, final Method method, final Object[] args)
+                throws Throwable
+        {
+            final Future<Object> future = executor.submit(new NamedCallable<Object>()
+                {
+                    public Object call() throws Exception
+                    {
+                        try
+                        {
+                            return delegate.invoke(myProxy, method, args);
+                        } catch (Throwable th)
+                        {
+                            if (th instanceof Error)
+                            {
+                                throw (Error) th;
+                            } else
+                            {
+                                throw (Exception) th;
+                            }
+                        }
+                    }
+
+                    public String getCallableName()
+                    {
+                        if (nameOrNull != null)
+                        {
+                            return nameOrNull;
+                        } else
+                        {
+                            return describe(method);
+                        }
+                    }
+                });
+            final ExecutionResult<Object> result =
+                    ConcurrencyUtilities.getResult(future, timeoutMillis);
+            if (result.getStatus() == ExecutionStatus.TIMED_OUT)
+            {
+                if (errorValueOnTimeout == false)
+                {
+                    throw new TimeoutException(describe(method) + " timed out (timeout="
+                            + timeoutMillis + "ms).");
+                }
+                return getErrorValue(method);
+            }
+            if (result.getStatus() == ExecutionStatus.INTERRUPTED && errorValueOnInterrupt)
+            {
+                return getErrorValue(method);
+            }
+            return ConcurrencyUtilities.tryDealWithResult(result);
+        }
+
+        private Object getErrorValue(final Method method)
+        {
+            if (errorMethodValueMap.containsKey(method))
+            {
+                return errorMethodValueMap.get(method);
+            }
+            if (errorTypeValueMap.containsKey(method.getReturnType()))
+            {
+                return errorTypeValueMap.get(method.getReturnType());
+            }
+            return null;
+        }
+    }
+
+    private MonitoringProxy(Class<T> interfaceClass, T objectToProxyFor)
+    {
+        assert interfaceClass.isInterface();
+
+        this.errorTypeValueMap = createDefaultErrorTypeValueMap();
+        this.errorMethodValueMap = new HashMap<Method, Object>();
+        this.timeoutMillis = ConcurrencyUtilities.NO_TIMEOUT;
+        this.delegate = new DelegatingInvocationHandler<T>(objectToProxyFor);
+        this.handler = new MonitoringInvocationHandler();
+        this.proxy = createProxy(interfaceClass, objectToProxyFor);
+    }
+
+    private T createProxy(Class<T> interfaceClass, T objectToProxyFor)
+    {
+        final Class<?>[] interfaceClasses = new Class<?>[]
+            { interfaceClass };
+        final Object proxyInstance =
+                Proxy.newProxyInstance(interfaceClass.getClassLoader(), interfaceClasses, handler);
+        return cast(interfaceClass, proxyInstance);
+
+    }
+
+    /**
+     * Creates a {@link MonitoringProxy} of type <var>interfaceClass</var> for the
+     * <var>objectToProxyFor</var>.
+     * 
+     * @param interfaceClass The type of the interface to proxy for. It is a programming error to
+     *            provide a class that does not represent an interface.
+     * @param objectToProxyFor The object to proxy for.
+     * @return A monitoring proxy that can be configured as required and that finally provides the
+     *         actual proxy with {@link #get()}.
+     */
+    public static <T> MonitoringProxy<T> create(Class<T> interfaceClass, T objectToProxyFor)
+    {
+        return new MonitoringProxy<T>(interfaceClass, objectToProxyFor);
+    }
+
+    /**
+     * Sets the timeout to <var>newTimeoutMillis</var>.
+     * 
+     * @return This object (for chaining).
+     */
+    public MonitoringProxy<T> timeoutMillis(long newTimeoutMillis)
+    {
+        this.timeoutMillis = newTimeoutMillis;
+        return this;
+    }
+
+    /**
+     * Sets the mode where a special error value is provided on timeout.
+     * 
+     * @return This object (for chaining).
+     */
+    public MonitoringProxy<T> errorValueOnTimeout()
+    {
+        this.errorValueOnTimeout = true;
+        return this;
+    }
+
+    /**
+     * Sets the mode where a special error value is provided on thread interruption.
+     * 
+     * @return This object (for chaining).
+     */
+    public MonitoringProxy<T> errorValueOnInterrupt()
+    {
+        this.errorValueOnInterrupt = true;
+        return this;
+    }
+
+    /**
+     * Sets the name of this proxy (for setting the thread name) to <var>newName</var>.
+     * 
+     * @return This object (for chaining).
+     */
+    public MonitoringProxy<T> name(String newName)
+    {
+        this.nameOrNull = newName;
+        return this;
+    }
+
+    /**
+     * Sets an error return <var>value</var> for the type <var>clazz</var>.
+     * <p>
+     * <i>A value set by this method is only relevant if the proxy is configured to return error
+     * values rather than to throw exceptions.</i>
+     */
+    public <V> MonitoringProxy<T> errorTypeValueMapping(Class<V> clazz, V value)
+    {
+        errorTypeValueMap.put(clazz, value);
+        return this;
+    }
+
+    /**
+     * Sets an error return <var>value</var> for the given <var>method</var>. This <var>value</var>
+     * takes precedence over the error type mapping for methods with the same return type.
+     * <p>
+     * <i>A value set by this method is only relevant if the proxy is configured to return error
+     * values rather than to throw exceptions.</i>
+     */
+    public MonitoringProxy<T> errorMethodValueMapping(Method method, Object value)
+    {
+        errorMethodValueMap.put(method, value);
+        return this;
+    }
+
+    /**
+     * Returns the actual proxy.
+     */
+    public T get()
+    {
+        return proxy;
+    }
+
+}
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java
new file mode 100644
index 00000000000..b73544be74e
--- /dev/null
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/MonitoringProxyTest.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2008 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.concurrent;
+
+import static org.testng.AssertJUnit.*;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.regex.Pattern;
+
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import ch.systemsx.cisd.common.exceptions.StopException;
+import ch.systemsx.cisd.common.exceptions.TimeoutException;
+
+/**
+ * Test cases for the {@link MonitoringProxy}.
+ * 
+ * @author Bernd Rinn
+ */
+public class MonitoringProxyTest
+{
+
+    private static final String THREAD_NAME = "Some Flaky Stuff";
+
+    private static final String THE_STRING = "some string";
+
+    private static final boolean THE_BOOLEAN = true;
+
+    private static final int THE_INTEGER = 17;
+
+    private static final Status THE_STATUS = Status.TWO;
+
+    private static final long TIMEOUT_MILLIS = 50L;
+
+    private ITest defaultReturningProxy;
+
+    private ITest exceptionThrowingProxy;
+
+    private static class SignalException extends RuntimeException
+    {
+        private static final long serialVersionUID = 1L;
+    }
+
+    enum Status
+    {
+        ONE, TWO, THREE, UUUPS, SPECIAL_UUUPS
+    }
+
+    interface ITest
+    {
+        void idle(boolean hang);
+
+        String getString(boolean hang);
+
+        boolean getBoolean(boolean hang);
+
+        int getInteger(boolean hang);
+
+        Status getStatus(boolean hang);
+
+        Status getSpecialStatus(boolean hang);
+
+        void throwSignalException() throws SignalException;
+    }
+
+    private static class TestImpl implements ITest
+    {
+        private final static Pattern THREAD_NAME_PATTERN =
+                Pattern.compile("Monitoring Proxy-T[0-9]+::" + THREAD_NAME);
+
+        private void hang(boolean hang)
+        {
+            if (hang)
+            {
+                while (true)
+                {
+                }
+            }
+        }
+
+        private void checkThreadName()
+        {
+            final String name = Thread.currentThread().getName();
+            assertTrue(name, THREAD_NAME_PATTERN.matcher(name).matches());
+        }
+
+        public void idle(boolean hang)
+        {
+            checkThreadName();
+            hang(hang);
+        }
+
+        public boolean getBoolean(boolean hang)
+        {
+            checkThreadName();
+            hang(hang);
+            return THE_BOOLEAN;
+        }
+
+        public int getInteger(boolean hang)
+        {
+            checkThreadName();
+            hang(hang);
+            return THE_INTEGER;
+        }
+
+        public String getString(boolean hang)
+        {
+            checkThreadName();
+            hang(hang);
+            return THE_STRING;
+        }
+
+        public Status getStatus(boolean hang)
+        {
+            checkThreadName();
+            hang(hang);
+            return THE_STATUS;
+        }
+
+        public Status getSpecialStatus(boolean hang)
+        {
+            checkThreadName();
+            hang(hang);
+            return THE_STATUS;
+        }
+
+        public void throwSignalException() throws SignalException
+        {
+            checkThreadName();
+            throw new SignalException();
+        }
+    }
+
+    @BeforeTest
+    public void testCreateMonitoringProxy() throws NoSuchMethodException
+    {
+        defaultReturningProxy =
+                MonitoringProxy.create(ITest.class, new TestImpl()).timeoutMillis(TIMEOUT_MILLIS)
+                        .errorValueOnTimeout().name(THREAD_NAME).errorTypeValueMapping(
+                                Status.class, Status.UUUPS).errorMethodValueMapping(
+                                ITest.class.getMethod("getSpecialStatus", new Class<?>[]
+                                    { Boolean.TYPE }), Status.SPECIAL_UUUPS).get();
+        exceptionThrowingProxy =
+                MonitoringProxy.create(ITest.class, new TestImpl()).timeoutMillis(TIMEOUT_MILLIS)
+                        .name(THREAD_NAME).get();
+    }
+
+    @Test
+    public void testVoid()
+    {
+        defaultReturningProxy.idle(false);
+    }
+
+    @Test
+    public void testVoidTimeoutNoException()
+    {
+        defaultReturningProxy.idle(true);
+    }
+
+    @Test(expectedExceptions = SignalException.class)
+    public void testThrowExceptionNullReturningPolicy()
+    {
+        defaultReturningProxy.throwSignalException();
+    }
+
+    @Test(expectedExceptions = SignalException.class)
+    public void testThrowExceptionExceptionThrowsPolicy()
+    {
+        exceptionThrowingProxy.throwSignalException();
+    }
+
+    @Test(expectedExceptions = TimeoutException.class)
+    public void testVoidTimeoutWithException()
+    {
+        exceptionThrowingProxy.idle(true);
+    }
+
+    @Test
+    public void testGetStringNullReturningPolicy()
+    {
+        assertEquals(THE_STRING, defaultReturningProxy.getString(false));
+    }
+
+    @Test
+    public void testGetStringExceptionThrowingPolicy()
+    {
+        assertEquals(THE_STRING, exceptionThrowingProxy.getString(false));
+    }
+
+    @Test
+    public void testGetStringTimeoutNoException()
+    {
+        assertNull(defaultReturningProxy.getString(true));
+    }
+
+    @Test(expectedExceptions = TimeoutException.class)
+    public void testGetStringTimeoutWithException()
+    {
+        exceptionThrowingProxy.getString(true);
+    }
+
+    @Test
+    public void testGetIntNullReturningPolicy()
+    {
+        assertEquals(THE_INTEGER, defaultReturningProxy.getInteger(false));
+    }
+
+    @Test
+    public void testGetIntExceptionThrowingPolicy()
+    {
+        assertEquals(THE_INTEGER, exceptionThrowingProxy.getInteger(false));
+    }
+
+    @Test
+    public void testGetBoolTimeoutReturnsDefault()
+    {
+        assertEquals(false, defaultReturningProxy.getBoolean(true));
+    }
+
+    @Test
+    public void testGetStatus()
+    {
+        assertEquals(THE_STATUS, defaultReturningProxy.getStatus(false));
+    }
+
+    @Test
+    public void testGetStatusTimeoutReturnsDefault()
+    {
+        assertEquals(Status.UUUPS, defaultReturningProxy.getStatus(true));
+    }
+
+    @Test
+    public void testGetSpecialStatusTimeoutReturnsMethodDefault()
+    {
+        assertEquals(Status.SPECIAL_UUUPS, defaultReturningProxy.getSpecialStatus(true));
+    }
+
+    @Test
+    public void testGetIntTimeoutReturnsDefault()
+    {
+        assertEquals(0, defaultReturningProxy.getInteger(true));
+    }
+
+    @Test(expectedExceptions = TimeoutException.class)
+    public void testGetIntTimeoutWithException()
+    {
+        exceptionThrowingProxy.getInteger(true);
+    }
+
+    @Test(expectedExceptions = StopException.class)
+    public void testInterruptTheUninterruptableThrowsException()
+    {
+        final ITest proxy =
+                MonitoringProxy.create(ITest.class, new TestImpl()).timeoutMillis(1000L).name(
+                        THREAD_NAME).get();
+        final Thread currentThread = Thread.currentThread();
+        final Timer timer = new Timer();
+        timer.schedule(new TimerTask()
+            {
+                @Override
+                public void run()
+                {
+                    currentThread.interrupt();
+                }
+            }, 50L);
+        // This call would not be interruptable if it wasn't proxied, but we get a StopException
+        // from the proxy.
+        proxy.idle(true);
+        timer.cancel();
+    }
+
+    @Test
+    public void testInterruptTheUninterruptableReturnsDefaultValue()
+    {
+        final String defaultReturnValue = "That's the default return value.";
+        final ITest proxy =
+                MonitoringProxy.create(ITest.class, new TestImpl()).timeoutMillis(1000L).name(
+                        THREAD_NAME).errorValueOnInterrupt().errorTypeValueMapping(String.class,
+                        defaultReturnValue).get();
+        final Thread currentThread = Thread.currentThread();
+        final Timer timer = new Timer();
+        timer.schedule(new TimerTask()
+            {
+                @Override
+                public void run()
+                {
+                    currentThread.interrupt();
+                }
+            }, 50L);
+        // This call would not be interruptable if it wasn't proxied, but we get the default return
+        // value for Strings here.
+        assertEquals(defaultReturnValue, proxy.getString(true));
+        timer.cancel();
+    }
+}
-- 
GitLab