From 62ffffc12930c4b013a143a931ee5ca811b27d0b Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Sun, 30 Sep 2007 18:31:09 +0000 Subject: [PATCH] refactor: move QueuingPathHandler to common SVN: 1877 --- .../ch/systemsx/cisd/datamover/DataMover.java | 2 +- .../datamover/utils/QueuingPathHandler.java | 132 ---------------- .../utils/QueuingPathHandlerTest.java | 148 ------------------ 3 files changed, 1 insertion(+), 281 deletions(-) delete mode 100644 datamover/source/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandler.java delete mode 100644 datamover/sourceTest/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandlerTest.java diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index ba1be989e78..91472eb9f14 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -20,10 +20,10 @@ import java.io.File; import ch.systemsx.cisd.common.utilities.IPathHandler; import ch.systemsx.cisd.common.utilities.ITerminable; +import ch.systemsx.cisd.common.utilities.QueuingPathHandler; import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.utils.FileStore; -import ch.systemsx.cisd.datamover.utils.QueuingPathHandler; import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; /** diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandler.java b/datamover/source/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandler.java deleted file mode 100644 index 36216636dd8..00000000000 --- a/datamover/source/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandler.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover.utils; - -import java.io.File; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.log4j.Logger; - -import ch.systemsx.cisd.common.logging.LogCategory; -import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.common.utilities.IPathHandler; -import ch.systemsx.cisd.common.utilities.ITerminable; - -/** - * Asynchronous path handler. Queues tasks and processes them in a separate thread. Use {{@link #terminate()} to clean - * resources if you do not need the instance of this class anymore. - * - * @author Tomasz Pylak on Aug 24, 2007 - */ -public class QueuingPathHandler implements ITerminable, IPathHandler -{ - private static final Logger notificationLog = LogFactory.getLogger(LogCategory.NOTIFY, QueuingPathHandler.class); - - private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, QueuingPathHandler.class); - - private final PathHandlerThread thread; - - private QueuingPathHandler(PathHandlerThread thread) - { - this.thread = thread; - } - - public static QueuingPathHandler create(final IPathHandler handler, String threadName) - { - PathHandlerThread thread = new PathHandlerThread(handler); - final QueuingPathHandler lazyHandler = new QueuingPathHandler(thread); - thread.setName(threadName); - thread.start(); - return lazyHandler; - } - - private static class PathHandlerThread extends Thread - { - private final BlockingQueue<File> queue; - - private final IPathHandler handler; - - public PathHandlerThread(IPathHandler handler) - { - this.queue = new LinkedBlockingQueue<File>(); - this.handler = handler; - } - - @Override - public void run() - { - try - { - while (isInterrupted() == false) - { - try - { - if (operationLog.isTraceEnabled()) - { - operationLog.trace("Waiting for new element in queue."); - } - File path = queue.take(); // blocks if empty - if (operationLog.isTraceEnabled()) - { - operationLog.trace("Processing path '" + path + "'"); - } - handler.handle(path); - } catch (InterruptedException ex) - { - return; - } - } - } catch (Exception ex) - { - // Just log it but ensure that the thread won't die. - notificationLog.error("An exception has occurred. (thread still running)", ex); - } - } - - private synchronized void queue(File resource) - { - queue.add(resource); - } - - } - - /** cleans resources */ - public boolean terminate() - { - if (operationLog.isInfoEnabled()) - { - operationLog.info("Terminating thread '" + thread.getName() + "'"); - } - thread.interrupt(); - return true; - } - - /** - * Queues <var>path</var> processing and exits immediately. - */ - public void handle(File path) - { - assert thread.isInterrupted() == false; - - if (operationLog.isTraceEnabled()) - { - operationLog.trace("Queing path '" + path + "'"); - } - thread.queue(path); - } -} diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandlerTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandlerTest.java deleted file mode 100644 index 067d32f089f..00000000000 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/utils/QueuingPathHandlerTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover.utils; - -import static org.testng.AssertJUnit.*; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.testng.annotations.Test; - -import ch.systemsx.cisd.common.utilities.IPathHandler; - -/** - * Test cases for the {@link QueuingPathHandler}. - * - * @author Bernd Rinn - */ -public class QueuingPathHandlerTest -{ - - final static long MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH = 100; - - private static class RecordingIPathHandler implements IPathHandler - { - private final List<File> handled = new ArrayList<File>(); - - private final int blockBeforeFile; - - private final long blockMillis; - - private boolean interrupted; - - RecordingIPathHandler(int blockBeforeFile, long blockMillis) - { - this.blockBeforeFile = blockBeforeFile; - this.blockMillis = blockMillis; - } - - RecordingIPathHandler() - { - this(0, 0L); - } - - public void handle(File path) - { - if (handled.size() + 1 == blockBeforeFile) - { - try - { - Thread.sleep(blockMillis); - } catch (InterruptedException ex) - { - interrupted = true; - return; - } - } - handled.add(path); - } - - List<File> getHandledFiles() - { - return handled; - } - - boolean isInterrupted() - { - return interrupted; - } - - } - - @Test - public void testSingleProcessing() throws InterruptedException - { - final File testFile = new File("test_file_to_handle"); - final RecordingIPathHandler recorder = new RecordingIPathHandler(); - final IPathHandler qPathHandler = QueuingPathHandler.create(recorder, "test-thread"); - qPathHandler.handle(testFile); - Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); - assertEquals(Collections.singletonList(testFile), recorder.getHandledFiles()); - } - - @Test - public void testMultipleProcessing() throws InterruptedException - { - final List<File> fileList = new ArrayList<File>(10); - for (int i = 0; i < 10; ++i) - { - fileList.add(new File("File " + i)); - } - final RecordingIPathHandler recorder = new RecordingIPathHandler(); - final IPathHandler qPathHandler = QueuingPathHandler.create(recorder, "test-thread"); - for (File f : fileList) - { - qPathHandler.handle(f); - } - Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); - assertEquals(fileList, recorder.getHandledFiles()); - } - - @Test - public void testTermination() throws InterruptedException - { - final List<File> processedFileList = new ArrayList<File>(4); - final List<File> fileList = new ArrayList<File>(10); - final int FILES_TO_PROCESS = 4; - for (int i = 0; i < 10; ++i) - { - final File f = new File("File " + i); - if (i < FILES_TO_PROCESS) - { - processedFileList.add(f); - } - fileList.add(f); - } - final RecordingIPathHandler blocker = - new RecordingIPathHandler(FILES_TO_PROCESS + 1, MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH * 10L); - final QueuingPathHandler qPathHandler = QueuingPathHandler.create(blocker, "test-thread"); - for (File f : fileList) - { - qPathHandler.handle(f); - } - Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); - assertEquals(processedFileList, blocker.getHandledFiles()); - assertTrue(qPathHandler.terminate()); - Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH); - assertTrue(blocker.isInterrupted()); - assertEquals(processedFileList, blocker.getHandledFiles()); - } - -} -- GitLab