diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java b/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java index 4c3899a418de1b1333b7ba91d692964f146266c4..32263d8c05f0c2336cdbca3286465915d662d0be 100644 --- a/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java +++ b/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java @@ -31,8 +31,9 @@ import ch.systemsx.cisd.common.logging.LogFactory; * resources if you do not need the instance of this class anymore. * * @author Tomasz Pylak on Aug 24, 2007 + * @author Bernd Rinn */ -public class QueuingPathHandler implements ITerminable, IPathHandler, IRecoverable +public class QueuingPathHandler implements ITerminable, IPathHandler { private static final Logger notificationLog = LogFactory.getLogger(LogCategory.NOTIFY, QueuingPathHandler.class); @@ -40,46 +41,89 @@ public class QueuingPathHandler implements ITerminable, IPathHandler, IRecoverab private final PathHandlerThread thread; - private QueuingPathHandler(PathHandlerThread thread) + /** An interface for special conditions. */ + public interface ISpecialCondition { - this.thread = thread; + /** Handle the special condition. */ + public void handle(); } - public static QueuingPathHandler create(final IPathHandler handler, String threadName) + private QueuingPathHandler(PathHandlerThread thread) { - return create(handler, null, threadName); + this.thread = thread; } - public static QueuingPathHandler create(final IPathHandler handler, final IRecoverable recoverableOrNull, - String threadName) + public static QueuingPathHandler create(final IPathHandler handler, String threadName) { assert handler != null; assert threadName != null; - final PathHandlerThread thread = new PathHandlerThread(handler, recoverableOrNull); + final PathHandlerThread thread = new PathHandlerThread(handler); final QueuingPathHandler lazyHandler = new QueuingPathHandler(thread); thread.setName(threadName); thread.start(); return lazyHandler; } + /** + * A class representing an incident in the {@link QueuingPathHandler}. + */ + private static class Incident + { + private final File path; + + private final ISpecialCondition condition; + + private final boolean blocking; + + Incident(File path) + { + this.path = path; + this.condition = null; + this.blocking = false; + } + + Incident(ISpecialCondition condition, boolean blocking) + { + this.condition = condition; + this.blocking = blocking; + this.path = null; + } + + File getPath() + { + return path; + } + + boolean isSpecialCondition() + { + return (condition != null); + } + + void handleSpecialCondition() + { + assert condition != null; + condition.handle(); + } + + boolean isBlocking() + { + return blocking; + } + } + private static class PathHandlerThread extends Thread { - private static final File DUMMY_FILE = new File("."); - - private final Semaphore recoverySemaphore = new Semaphore(1); - - private final BlockingQueue<File> queue; + private final Semaphore specialIncidentSemaphore = new Semaphore(0); - private final IPathHandler handler; + private final BlockingQueue<Incident> queue; - private final IRecoverable recoverableOrNull; + private final IPathHandler handler; - public PathHandlerThread(IPathHandler handler, IRecoverable recoverableOrNull) + public PathHandlerThread(IPathHandler handler) { - this.queue = new LinkedBlockingQueue<File>(); + this.queue = new LinkedBlockingQueue<Incident>(); this.handler = handler; - this.recoverableOrNull = recoverableOrNull; } @Override @@ -95,13 +139,17 @@ public class QueuingPathHandler implements ITerminable, IPathHandler, IRecoverab { operationLog.trace("Waiting for new element in queue."); } - File path = queue.take(); // blocks if empty - if (path == DUMMY_FILE) + final Incident incident = queue.take(); // blocks if empty + if (incident.isSpecialCondition()) { - runRecover(); - recoverySemaphore.release(); + incident.handleSpecialCondition(); + if (incident.isBlocking()) + { + specialIncidentSemaphore.release(); + } } else { + final File path = incident.getPath(); if (operationLog.isTraceEnabled()) { operationLog.trace("Processing path '" + path + "'"); @@ -120,29 +168,28 @@ public class QueuingPathHandler implements ITerminable, IPathHandler, IRecoverab } } - synchronized void queue(File resource) + void queue(File resource) { - queue.add(resource); + queue.add(new Incident(resource)); } - private void runRecover() + void queue(ISpecialCondition specialCondition, boolean blocking) { - if (recoverableOrNull != null) - { - if (operationLog.isInfoEnabled()) - { - operationLog.info("Triggering recovery."); - } - recoverableOrNull.recover(); - } + queue.add(new Incident(specialCondition, blocking)); } - synchronized void recover() throws InterruptedException + synchronized void handleSpecialCondition(ISpecialCondition condition, String nameForLogging, boolean blocking) throws InterruptedException { - queue(DUMMY_FILE); - recoverySemaphore.acquire(); + if (operationLog.isInfoEnabled()) + { + operationLog.info("Handling special condition '" + nameForLogging + "'."); + } + queue(condition, blocking); + if (blocking) + { + specialIncidentSemaphore.acquire(); + } } - } /** @@ -171,16 +218,23 @@ public class QueuingPathHandler implements ITerminable, IPathHandler, IRecoverab } thread.queue(path); } - - public void recover() + + /** + * Handles a special condition out of order. + * + * @param specialCondition The condition to handle. + * @param nameForLogging The name of the condition as written to the log. + * @param blocking If <code>true</code>, the method will only return when the condition has been handled. + */ + public void handle(ISpecialCondition specialCondition, String nameForLogging, boolean blocking) { try { - thread.recover(); + thread.handleSpecialCondition(specialCondition, nameForLogging, blocking); } catch (InterruptedException ex) { - // Recovery interrupted by shutdown - nothing we can do here. + // terminate() has been called. } } - + } diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java index ce523c6d00e3daafadd814f218fffc4e846fa07a..ce4a869849fedd9eb315d881551916e40465cb7b 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java @@ -146,31 +146,31 @@ public class QueuingPathHandlerTest assertEquals(processedFileList, blocker.getHandledFiles()); } - private static class RecordingRecoverable implements IRecoverable + private static class RecordingCondition implements QueuingPathHandler.ISpecialCondition { - boolean recoverCalled; + boolean called; - public void recover() + public void handle() { - recoverCalled = true; + called = true; } } @Test - public void testRecovery() throws InterruptedException + public void testHandleSpecialCondition() throws InterruptedException { final File testFile = new File("test_file_to_handle"); final RecordingIPathHandler recordingHandler = new RecordingIPathHandler(); - final RecordingRecoverable recordingRecoverable = new RecordingRecoverable(); + final RecordingCondition recordingCondition = new RecordingCondition(); final QueuingPathHandler qPathHandler = - QueuingPathHandler.create(recordingHandler, recordingRecoverable, "test-thread"); + QueuingPathHandler.create(recordingHandler, "test-thread"); Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); - assertFalse(recordingRecoverable.recoverCalled); - recordingRecoverable.recoverCalled = false; + assertFalse(recordingCondition.called); + recordingCondition.called = false; qPathHandler.handle(testFile); - qPathHandler.recover(); + qPathHandler.handle(recordingCondition, "recording", true); + assertTrue(recordingCondition.called); Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); - assertTrue(recordingRecoverable.recoverCalled); assertEquals(Collections.singletonList(testFile), recordingHandler.getHandledFiles()); }