diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java b/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..5be0a05bdf15bf5895151d209210192aecfaee0a --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java @@ -0,0 +1,130 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.utilities; + +import java.io.File; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; + +/** + * Asynchronous path handler. Queues tasks and processes them in a separate thread. Use {{@link #terminate()} to clean + * resources if you do not need the instance of this class anymore. + * + * @author Tomasz Pylak on Aug 24, 2007 + */ +public class QueuingPathHandler implements ITerminable, IPathHandler +{ + private static final Logger notificationLog = LogFactory.getLogger(LogCategory.NOTIFY, QueuingPathHandler.class); + + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, QueuingPathHandler.class); + + private final PathHandlerThread thread; + + private QueuingPathHandler(PathHandlerThread thread) + { + this.thread = thread; + } + + public static QueuingPathHandler create(final IPathHandler handler, String threadName) + { + PathHandlerThread thread = new PathHandlerThread(handler); + final QueuingPathHandler lazyHandler = new QueuingPathHandler(thread); + thread.setName(threadName); + thread.start(); + return lazyHandler; + } + + private static class PathHandlerThread extends Thread + { + private final BlockingQueue<File> queue; + + private final IPathHandler handler; + + public PathHandlerThread(IPathHandler handler) + { + this.queue = new LinkedBlockingQueue<File>(); + this.handler = handler; + } + + @Override + public void run() + { + try + { + while (isInterrupted() == false) + { + try + { + if (operationLog.isTraceEnabled()) + { + operationLog.trace("Waiting for new element in queue."); + } + File path = queue.take(); // blocks if empty + if (operationLog.isTraceEnabled()) + { + operationLog.trace("Processing path '" + path + "'"); + } + handler.handle(path); + } catch (InterruptedException ex) + { + return; + } + } + } catch (Exception ex) + { + // Just log it but ensure that the thread won't die. + notificationLog.error("An exception has occurred. (thread still running)", ex); + } + } + + private synchronized void queue(File resource) + { + queue.add(resource); + } + + } + + /** cleans resources */ + public boolean terminate() + { + if (operationLog.isInfoEnabled()) + { + operationLog.info("Terminating thread '" + thread.getName() + "'"); + } + thread.interrupt(); + return true; + } + + /** + * Queues <var>path</var> processing and exits immediately. + */ + public void handle(File path) + { + assert thread.isInterrupted() == false; + + if (operationLog.isTraceEnabled()) + { + operationLog.trace("Queing path '" + path + "'"); + } + thread.queue(path); + } +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dcc93f682194a7de15b67c79ee217729dd5ae576 --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.utilities; + +import static org.testng.AssertJUnit.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.testng.annotations.Test; + +import ch.systemsx.cisd.common.utilities.IPathHandler; +import ch.systemsx.cisd.common.utilities.QueuingPathHandler; + +/** + * Test cases for the {@link QueuingPathHandler}. + * + * @author Bernd Rinn + */ +public class QueuingPathHandlerTest +{ + + final static long MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH = 100; + + private static class RecordingIPathHandler implements IPathHandler + { + private final List<File> handled = new ArrayList<File>(); + + private final int blockBeforeFile; + + private final long blockMillis; + + private boolean interrupted; + + RecordingIPathHandler(int blockBeforeFile, long blockMillis) + { + this.blockBeforeFile = blockBeforeFile; + this.blockMillis = blockMillis; + } + + RecordingIPathHandler() + { + this(0, 0L); + } + + public void handle(File path) + { + if (handled.size() + 1 == blockBeforeFile) + { + try + { + Thread.sleep(blockMillis); + } catch (InterruptedException ex) + { + interrupted = true; + return; + } + } + handled.add(path); + } + + List<File> getHandledFiles() + { + return handled; + } + + boolean isInterrupted() + { + return interrupted; + } + + } + + @Test + public void testSingleProcessing() throws InterruptedException + { + final File testFile = new File("test_file_to_handle"); + final RecordingIPathHandler recorder = new RecordingIPathHandler(); + final IPathHandler qPathHandler = QueuingPathHandler.create(recorder, "test-thread"); + qPathHandler.handle(testFile); + Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); + assertEquals(Collections.singletonList(testFile), recorder.getHandledFiles()); + } + + @Test + public void testMultipleProcessing() throws InterruptedException + { + final List<File> fileList = new ArrayList<File>(10); + for (int i = 0; i < 10; ++i) + { + fileList.add(new File("File " + i)); + } + final RecordingIPathHandler recorder = new RecordingIPathHandler(); + final IPathHandler qPathHandler = QueuingPathHandler.create(recorder, "test-thread"); + for (File f : fileList) + { + qPathHandler.handle(f); + } + Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); + assertEquals(fileList, recorder.getHandledFiles()); + } + + @Test + public void testTermination() throws InterruptedException + { + final List<File> processedFileList = new ArrayList<File>(4); + final List<File> fileList = new ArrayList<File>(10); + final int FILES_TO_PROCESS = 4; + for (int i = 0; i < 10; ++i) + { + final File f = new File("File " + i); + if (i < FILES_TO_PROCESS) + { + processedFileList.add(f); + } + fileList.add(f); + } + final RecordingIPathHandler blocker = + new RecordingIPathHandler(FILES_TO_PROCESS + 1, MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH * 10L); + final QueuingPathHandler qPathHandler = QueuingPathHandler.create(blocker, "test-thread"); + for (File f : fileList) + { + qPathHandler.handle(f); + } + Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); + assertEquals(processedFileList, blocker.getHandledFiles()); + assertTrue(qPathHandler.terminate()); + Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); + assertTrue(blocker.isInterrupted()); + assertEquals(processedFileList, blocker.getHandledFiles()); + } + +}