Skip to content
Snippets Groups Projects
Commit 9c50540b authored by brinn's avatar brinn
Browse files

add: generic InactivityMonitor and unit tests

SVN: 7046
parent 1c487a5c
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.concurrent;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.log4j.Logger;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
/**
* An <code>InactivityMonitor</code> monitors some form of activity of a write activity on a
* <var>destinationStore</var> and triggers an alarm if there was a period of inactivity that
* exceeds a given inactivity period.
*
* @author Bernd Rinn
*/
public class InactivityMonitor
{
private static final Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, InactivityMonitor.class);
/**
* The sensor to get activity information from.
*/
public interface IActivitySensor
{
/**
* Returns the time of last activity in milli-seconds since start of the epoch.
* <p>
* The monitor assumes that a call of this method may be expensive and thus only calls it
* when necessary. However, the information returned is expected to be up-to-date.
*
* @param thresholdMillis The time threshold for activity (in milli-seconds) that qualifies
* as "recent enough" to terminate the search for even more recent activity. An
* implementation can safely ignore this, as it is just meant for the sensor to
* optimize its search.
*/
long getTimeOfLastActivityMoreRecentThan(long thresholdMillis);
/**
* Returns a string that describes the kind (and possibly reason) of recent inactivity.
* <p>
* Used for log messages. It can generally be assumed that this method is called after
* {@link #getTimeOfLastActivityMoreRecentThan(long)}.
*
* @param now The current time as it should be used in the description.
*/
String describeInactivity(long now);
}
/**
* The observer that gets updated when the activity monitor has exceeded the inactivity
* threshold.
*/
public interface IInactivityObserver
{
/**
* Method which is called to inform the observer of a period of inactivity above a
* threshold.
*
* @param inactiveSinceMillis The period of inactivity.
* @param descriptionOfInactivity A description of inactivity, supposed to be used for
* logging.
*/
void update(long inactiveSinceMillis, String descriptionOfInactivity);
}
private final IActivitySensor sensor;
private final IInactivityObserver observer;
private final Timer activityMonitoringTimer;
private final long inactivityThresholdMillis;
private final boolean stopAfterFirstEvent;
/**
* Creates an inactivity monitor.
*
* @param sensor The sensor to get the activity information from. Note that this store needs to
* detect and signal time out conditions itself. <i>If an operation on this sensor
* hangs infinitely, then the InactivityMonitor hangs, too!</i>
* @param observer The observer to inform when the inactivity threshold has been exceeded.
* @param checkIntervallMillis The interval the monitor should use for checking activity status.
* @param inactivityThresholdMillis The threshold of a period of inactivity that needs to be
* exceeded before the inactivity observer gets informed.
* @param stopAfterFirstEvent If <code>true</code>, the monitor will stop itself after the
* first event of exceeded inactivity threshold has happened, otherwise, the monitor
* will continue to look for such events.
*/
public InactivityMonitor(IActivitySensor sensor, IInactivityObserver observer,
long checkIntervallMillis, long inactivityThresholdMillis, boolean stopAfterFirstEvent)
{
assert sensor != null;
assert observer != null;
assert checkIntervallMillis > 0;
assert inactivityThresholdMillis > 0;
this.sensor = sensor;
this.observer = observer;
this.inactivityThresholdMillis = inactivityThresholdMillis;
this.stopAfterFirstEvent = stopAfterFirstEvent;
final String currentThreadName = Thread.currentThread().getName();
final String threadNamePrefix;
if ("main".equals(currentThreadName))
{
threadNamePrefix = "";
} else
{
threadNamePrefix = currentThreadName + " - ";
}
activityMonitoringTimer = new Timer(threadNamePrefix + "Activity Monitor", true);
final InactivityMonitoringTimerTask inactivityMonitoringTimerTask =
new InactivityMonitoringTimerTask();
activityMonitoringTimer.schedule(inactivityMonitoringTimerTask, 0L, checkIntervallMillis);
}
/**
* Stops the activity monitoring. The activity monitor must not be used after calling this
* method.
*/
public void stop()
{
activityMonitoringTimer.cancel();
}
/**
* A {@link TimerTask} that monitors inactivity by means of some {@link IActivitySensor}.
*/
private final class InactivityMonitoringTimerTask extends TimerTask
{
private long timeOfLastActivity = System.currentTimeMillis();
private long computePeriodOfInactivity(final long now)
{
return now - timeOfLastActivity;
}
/**
* Potentially time consuming as the sensor might need some time to determine the time of
* last activity.
*/
private void updateTimeOfActivity()
{
timeOfLastActivity =
sensor.getTimeOfLastActivityMoreRecentThan(inactivityThresholdMillis);
}
private boolean isInactivityThresholdExceeded(final long now)
{
return computePeriodOfInactivity(now) > inactivityThresholdMillis;
}
@Override
public void run()
{
if (operationLog.isTraceEnabled())
{
operationLog.trace("Start activity monitoring run.");
}
try
{
final long now = System.currentTimeMillis();
if (isInactivityThresholdExceeded(now) == false)
{
return;
}
updateTimeOfActivity();
if (isInactivityThresholdExceeded(now))
{
observer.update(computePeriodOfInactivity(now), sensor.describeInactivity(now));
if (stopAfterFirstEvent)
{
stop();
}
}
} catch (Exception ex)
{
operationLog.error("Exception when monitoring for activity.", ex);
} finally
{
if (operationLog.isTraceEnabled())
{
operationLog.trace("Finished activity monitoring run.");
}
}
}
}
}
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.concurrent;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.api.Invocation;
import org.jmock.lib.action.CustomAction;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IActivitySensor;
import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IInactivityObserver;
import ch.systemsx.cisd.common.logging.LogInitializer;
import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler;
/**
* Test cases for the inactivity monitor.
*
* @author Bernd Rinn
*/
public class InactivityMonitorTest
{
private final static long CHECK_INTERVAL_MILLIS = 10L;
private final static long INACTIVITY_THRESHOLD_MILLIS = 20L;
private static final long TIME_TO_WAIT_MILLIS = 4 * INACTIVITY_THRESHOLD_MILLIS;
private final static long DELTA = 10L;
private Mockery context;
private IActivitySensor sensor;
private IInactivityObserver observer;
private InactivityMonitor monitorUnderTest;
private StoringUncaughtExceptionHandler exceptionHandler;
private final class NowMatcher extends BaseMatcher<Long>
{
final long delta;
NowMatcher()
{
this.delta = DELTA;
}
public boolean matches(Object item)
{
final long actual = (Long) item;
return System.currentTimeMillis() - actual < delta;
}
public void describeTo(Description description)
{
description.appendValue("now");
}
}
private final class CloseEnoughMatcher extends BaseMatcher<Long>
{
final long value;
final long delta;
CloseEnoughMatcher(long value)
{
this.value = value;
this.delta = DELTA;
}
public boolean matches(Object item)
{
final long actual = (Long) item;
return Math.abs(value - actual) < delta;
}
public void describeTo(Description description)
{
description.appendValue("Close enough to " + value);
}
}
private final class ReturnNowMinus extends CustomAction
{
final long lagTimeMillis;
ReturnNowMinus(long lagTimeMillis)
{
super("returns now - " + lagTimeMillis);
this.lagTimeMillis = lagTimeMillis;
}
public Object invoke(Invocation invocation) throws Throwable
{
return (System.currentTimeMillis() - lagTimeMillis);
}
}
@BeforeClass
public void init()
{
LogInitializer.init();
exceptionHandler = new StoringUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler);
}
@BeforeMethod
public final void beforeMethod()
{
exceptionHandler.reset();
context = new Mockery();
sensor = context.mock(IActivitySensor.class);
observer = context.mock(IInactivityObserver.class);
}
@AfterMethod
public final void afterMethod() throws Throwable
{
if (monitorUnderTest != null)
{
monitorUnderTest.stop();
}
// To following lines of code should also be called at the end of each test method.
// Otherwise one do not known which test failed.
exceptionHandler.checkAndRethrowException();
context.assertIsSatisfied();
}
@Test
public void testHappyCase() throws Throwable
{
context.checking(new Expectations()
{
{
atLeast(1).of(sensor).getTimeOfLastActivityMoreRecentThan(
INACTIVITY_THRESHOLD_MILLIS);
will(new ReturnNowMinus(0L));
}
});
monitorUnderTest =
new InactivityMonitor(sensor, observer, CHECK_INTERVAL_MILLIS,
INACTIVITY_THRESHOLD_MILLIS, true);
ConcurrencyUtilities.sleep(TIME_TO_WAIT_MILLIS);
monitorUnderTest.stop();
exceptionHandler.checkAndRethrowException();
context.assertIsSatisfied();
}
@Test
public void testInactivity() throws Throwable
{
final String descriptionOfInactivity = "DESCRIPTION";
context.checking(new Expectations()
{
{
one(sensor).getTimeOfLastActivityMoreRecentThan(INACTIVITY_THRESHOLD_MILLIS);
will(new ReturnNowMinus(2 * INACTIVITY_THRESHOLD_MILLIS));
one(sensor).describeInactivity(with(new NowMatcher()));
will(returnValue(descriptionOfInactivity));
one(observer).update(
with(new CloseEnoughMatcher(INACTIVITY_THRESHOLD_MILLIS * 2)),
with(equal(descriptionOfInactivity)));
}
});
monitorUnderTest =
new InactivityMonitor(sensor, observer, CHECK_INTERVAL_MILLIS,
INACTIVITY_THRESHOLD_MILLIS, true);
ConcurrencyUtilities.sleep(TIME_TO_WAIT_MILLIS);
monitorUnderTest.stop();
exceptionHandler.checkAndRethrowException();
context.assertIsSatisfied();
}
@Test
public void testInactivityMultipleTimes() throws Throwable
{
final String descriptionOfInactivity = "DESCRIPTION";
context.checking(new Expectations()
{
{
atLeast(3).of(sensor).getTimeOfLastActivityMoreRecentThan(INACTIVITY_THRESHOLD_MILLIS);
will(new ReturnNowMinus(2 * INACTIVITY_THRESHOLD_MILLIS));
atLeast(3).of(sensor).describeInactivity(with(new NowMatcher()));
will(returnValue(descriptionOfInactivity));
atLeast(3).of(observer).update(
with(new CloseEnoughMatcher(INACTIVITY_THRESHOLD_MILLIS * 2)),
with(equal(descriptionOfInactivity)));
}
});
monitorUnderTest =
new InactivityMonitor(sensor, observer, CHECK_INTERVAL_MILLIS,
INACTIVITY_THRESHOLD_MILLIS, false);
ConcurrencyUtilities.sleep(TIME_TO_WAIT_MILLIS);
monitorUnderTest.stop();
exceptionHandler.checkAndRethrowException();
context.assertIsSatisfied();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment