diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/InactivityMonitor.java b/common/source/java/ch/systemsx/cisd/common/concurrent/InactivityMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..cab2161405cf38add8996916452446249d61d300 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/InactivityMonitor.java @@ -0,0 +1,210 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; + +/** + * An <code>InactivityMonitor</code> monitors some form of activity of a write activity on a + * <var>destinationStore</var> and triggers an alarm if there was a period of inactivity that + * exceeds a given inactivity period. + * + * @author Bernd Rinn + */ +public class InactivityMonitor +{ + + private static final Logger operationLog = + LogFactory.getLogger(LogCategory.OPERATION, InactivityMonitor.class); + + /** + * The sensor to get activity information from. + */ + public interface IActivitySensor + { + /** + * Returns the time of last activity in milli-seconds since start of the epoch. + * <p> + * The monitor assumes that a call of this method may be expensive and thus only calls it + * when necessary. However, the information returned is expected to be up-to-date. + * + * @param thresholdMillis The time threshold for activity (in milli-seconds) that qualifies + * as "recent enough" to terminate the search for even more recent activity. An + * implementation can safely ignore this, as it is just meant for the sensor to + * optimize its search. + */ + long getTimeOfLastActivityMoreRecentThan(long thresholdMillis); + + /** + * Returns a string that describes the kind (and possibly reason) of recent inactivity. + * <p> + * Used for log messages. It can generally be assumed that this method is called after + * {@link #getTimeOfLastActivityMoreRecentThan(long)}. + * + * @param now The current time as it should be used in the description. + */ + String describeInactivity(long now); + } + + /** + * The observer that gets updated when the activity monitor has exceeded the inactivity + * threshold. + */ + public interface IInactivityObserver + { + /** + * Method which is called to inform the observer of a period of inactivity above a + * threshold. + * + * @param inactiveSinceMillis The period of inactivity. + * @param descriptionOfInactivity A description of inactivity, supposed to be used for + * logging. + */ + void update(long inactiveSinceMillis, String descriptionOfInactivity); + } + + private final IActivitySensor sensor; + + private final IInactivityObserver observer; + + private final Timer activityMonitoringTimer; + + private final long inactivityThresholdMillis; + + private final boolean stopAfterFirstEvent; + + /** + * Creates an inactivity monitor. + * + * @param sensor The sensor to get the activity information from. Note that this store needs to + * detect and signal time out conditions itself. <i>If an operation on this sensor + * hangs infinitely, then the InactivityMonitor hangs, too!</i> + * @param observer The observer to inform when the inactivity threshold has been exceeded. + * @param checkIntervallMillis The interval the monitor should use for checking activity status. + * @param inactivityThresholdMillis The threshold of a period of inactivity that needs to be + * exceeded before the inactivity observer gets informed. + * @param stopAfterFirstEvent If <code>true</code>, the monitor will stop itself after the + * first event of exceeded inactivity threshold has happened, otherwise, the monitor + * will continue to look for such events. + */ + public InactivityMonitor(IActivitySensor sensor, IInactivityObserver observer, + long checkIntervallMillis, long inactivityThresholdMillis, boolean stopAfterFirstEvent) + { + assert sensor != null; + assert observer != null; + assert checkIntervallMillis > 0; + assert inactivityThresholdMillis > 0; + + this.sensor = sensor; + this.observer = observer; + this.inactivityThresholdMillis = inactivityThresholdMillis; + this.stopAfterFirstEvent = stopAfterFirstEvent; + + final String currentThreadName = Thread.currentThread().getName(); + final String threadNamePrefix; + if ("main".equals(currentThreadName)) + { + threadNamePrefix = ""; + } else + { + threadNamePrefix = currentThreadName + " - "; + } + activityMonitoringTimer = new Timer(threadNamePrefix + "Activity Monitor", true); + final InactivityMonitoringTimerTask inactivityMonitoringTimerTask = + new InactivityMonitoringTimerTask(); + activityMonitoringTimer.schedule(inactivityMonitoringTimerTask, 0L, checkIntervallMillis); + } + + /** + * Stops the activity monitoring. The activity monitor must not be used after calling this + * method. + */ + public void stop() + { + activityMonitoringTimer.cancel(); + } + + /** + * A {@link TimerTask} that monitors inactivity by means of some {@link IActivitySensor}. + */ + private final class InactivityMonitoringTimerTask extends TimerTask + { + private long timeOfLastActivity = System.currentTimeMillis(); + + private long computePeriodOfInactivity(final long now) + { + return now - timeOfLastActivity; + } + + /** + * Potentially time consuming as the sensor might need some time to determine the time of + * last activity. + */ + private void updateTimeOfActivity() + { + timeOfLastActivity = + sensor.getTimeOfLastActivityMoreRecentThan(inactivityThresholdMillis); + } + + private boolean isInactivityThresholdExceeded(final long now) + { + return computePeriodOfInactivity(now) > inactivityThresholdMillis; + } + + @Override + public void run() + { + if (operationLog.isTraceEnabled()) + { + operationLog.trace("Start activity monitoring run."); + } + try + { + final long now = System.currentTimeMillis(); + if (isInactivityThresholdExceeded(now) == false) + { + return; + } + updateTimeOfActivity(); + if (isInactivityThresholdExceeded(now)) + { + observer.update(computePeriodOfInactivity(now), sensor.describeInactivity(now)); + if (stopAfterFirstEvent) + { + stop(); + } + } + } catch (Exception ex) + { + operationLog.error("Exception when monitoring for activity.", ex); + } finally + { + if (operationLog.isTraceEnabled()) + { + operationLog.trace("Finished activity monitoring run."); + } + } + } + } + +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/InactivityMonitorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/InactivityMonitorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a6e7449633aec3f36a12098559d5748c8ea3bf90 --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/InactivityMonitorTest.java @@ -0,0 +1,221 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.concurrent; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.jmock.api.Invocation; +import org.jmock.lib.action.CustomAction; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IActivitySensor; +import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IInactivityObserver; +import ch.systemsx.cisd.common.logging.LogInitializer; +import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler; + +/** + * Test cases for the inactivity monitor. + * + * @author Bernd Rinn + */ +public class InactivityMonitorTest +{ + private final static long CHECK_INTERVAL_MILLIS = 10L; + + private final static long INACTIVITY_THRESHOLD_MILLIS = 20L; + + private static final long TIME_TO_WAIT_MILLIS = 4 * INACTIVITY_THRESHOLD_MILLIS; + + private final static long DELTA = 10L; + + private Mockery context; + + private IActivitySensor sensor; + + private IInactivityObserver observer; + + private InactivityMonitor monitorUnderTest; + + private StoringUncaughtExceptionHandler exceptionHandler; + + private final class NowMatcher extends BaseMatcher<Long> + { + final long delta; + + NowMatcher() + { + this.delta = DELTA; + } + + public boolean matches(Object item) + { + final long actual = (Long) item; + return System.currentTimeMillis() - actual < delta; + } + + public void describeTo(Description description) + { + description.appendValue("now"); + } + } + + private final class CloseEnoughMatcher extends BaseMatcher<Long> + { + final long value; + + final long delta; + + CloseEnoughMatcher(long value) + { + this.value = value; + this.delta = DELTA; + } + + public boolean matches(Object item) + { + final long actual = (Long) item; + return Math.abs(value - actual) < delta; + } + + public void describeTo(Description description) + { + description.appendValue("Close enough to " + value); + } + } + + private final class ReturnNowMinus extends CustomAction + { + final long lagTimeMillis; + + ReturnNowMinus(long lagTimeMillis) + { + super("returns now - " + lagTimeMillis); + this.lagTimeMillis = lagTimeMillis; + } + + public Object invoke(Invocation invocation) throws Throwable + { + return (System.currentTimeMillis() - lagTimeMillis); + } + + } + + @BeforeClass + public void init() + { + LogInitializer.init(); + exceptionHandler = new StoringUncaughtExceptionHandler(); + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler); + } + + @BeforeMethod + public final void beforeMethod() + { + exceptionHandler.reset(); + context = new Mockery(); + sensor = context.mock(IActivitySensor.class); + observer = context.mock(IInactivityObserver.class); + } + + @AfterMethod + public final void afterMethod() throws Throwable + { + if (monitorUnderTest != null) + { + monitorUnderTest.stop(); + } + // To following lines of code should also be called at the end of each test method. + // Otherwise one do not known which test failed. + exceptionHandler.checkAndRethrowException(); + context.assertIsSatisfied(); + } + + @Test + public void testHappyCase() throws Throwable + { + context.checking(new Expectations() + { + { + atLeast(1).of(sensor).getTimeOfLastActivityMoreRecentThan( + INACTIVITY_THRESHOLD_MILLIS); + will(new ReturnNowMinus(0L)); + } + }); + monitorUnderTest = + new InactivityMonitor(sensor, observer, CHECK_INTERVAL_MILLIS, + INACTIVITY_THRESHOLD_MILLIS, true); + ConcurrencyUtilities.sleep(TIME_TO_WAIT_MILLIS); + monitorUnderTest.stop(); + exceptionHandler.checkAndRethrowException(); + context.assertIsSatisfied(); + } + + @Test + public void testInactivity() throws Throwable + { + final String descriptionOfInactivity = "DESCRIPTION"; + context.checking(new Expectations() + { + { + one(sensor).getTimeOfLastActivityMoreRecentThan(INACTIVITY_THRESHOLD_MILLIS); + will(new ReturnNowMinus(2 * INACTIVITY_THRESHOLD_MILLIS)); + one(sensor).describeInactivity(with(new NowMatcher())); + will(returnValue(descriptionOfInactivity)); + one(observer).update( + with(new CloseEnoughMatcher(INACTIVITY_THRESHOLD_MILLIS * 2)), + with(equal(descriptionOfInactivity))); + } + }); + monitorUnderTest = + new InactivityMonitor(sensor, observer, CHECK_INTERVAL_MILLIS, + INACTIVITY_THRESHOLD_MILLIS, true); + ConcurrencyUtilities.sleep(TIME_TO_WAIT_MILLIS); + monitorUnderTest.stop(); + exceptionHandler.checkAndRethrowException(); + context.assertIsSatisfied(); + } + + @Test + public void testInactivityMultipleTimes() throws Throwable + { + final String descriptionOfInactivity = "DESCRIPTION"; + context.checking(new Expectations() + { + { + atLeast(3).of(sensor).getTimeOfLastActivityMoreRecentThan(INACTIVITY_THRESHOLD_MILLIS); + will(new ReturnNowMinus(2 * INACTIVITY_THRESHOLD_MILLIS)); + atLeast(3).of(sensor).describeInactivity(with(new NowMatcher())); + will(returnValue(descriptionOfInactivity)); + atLeast(3).of(observer).update( + with(new CloseEnoughMatcher(INACTIVITY_THRESHOLD_MILLIS * 2)), + with(equal(descriptionOfInactivity))); + } + }); + monitorUnderTest = + new InactivityMonitor(sensor, observer, CHECK_INTERVAL_MILLIS, + INACTIVITY_THRESHOLD_MILLIS, false); + ConcurrencyUtilities.sleep(TIME_TO_WAIT_MILLIS); + monitorUnderTest.stop(); + exceptionHandler.checkAndRethrowException(); + context.assertIsSatisfied(); + } +}