diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java index b10131e93d6e5df7caa6443e7384e1e73fff032f..3ffd77fd154f922258b8325e38939e17b2171c7e 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java @@ -49,6 +49,12 @@ public class DataMoverProcess implements ITerminable this.timer = new Timer(taskName); this.terminable = TimerHelper.asTerminable(timer); } + + // @Private + TimerTask getDataMoverTimerTask() + { + return dataMoverTimerTask; + } /** * Starts up the process with a the given <var>delay</var> and <var>period</var> in milli diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index 91ceb84593a7a5c468585bff8beb9da6e95bcc01..c7a9702296646eb80b8280f23049a8b86fec48cc 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -16,6 +16,8 @@ package ch.systemsx.cisd.datamover; +import static ch.systemsx.cisd.common.utilities.SystemTimeProvider.SYSTEM_TIME_PROVIDER; + import java.io.File; import java.util.TimerTask; @@ -28,6 +30,7 @@ import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IStoreHandler; +import ch.systemsx.cisd.common.utilities.ITimeProvider; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.common.MarkerFile; import ch.systemsx.cisd.datamover.filesystem.FileStoreFactory; @@ -71,16 +74,24 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory private final QuietPeriodFileFilter quietPeriodFileFilter; - public static final DataMoverProcess createMovingProcess(final Parameters parameters, - final IFileSysOperationsFactory factory, final LocalBufferDirs bufferDirs) + public static final DataMoverProcess createMovingProcess(Parameters parameters, + IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs) { - final IncomingProcessor processor = new IncomingProcessor(parameters, factory, bufferDirs); + return createMovingProcess(parameters, factory, SYSTEM_TIME_PROVIDER, bufferDirs); + } + + static final DataMoverProcess createMovingProcess(Parameters parameters, + IFileSysOperationsFactory factory, ITimeProvider timeProvider, + LocalBufferDirs bufferDirs) + { + final IncomingProcessor processor = + new IncomingProcessor(parameters, factory, timeProvider, bufferDirs); return processor.create(); } - private IncomingProcessor(final Parameters parameters, final IFileSysOperationsFactory factory, - final LocalBufferDirs bufferDirs) + private IncomingProcessor(Parameters parameters, IFileSysOperationsFactory factory, + ITimeProvider timeProvider, LocalBufferDirs bufferDirs) { this.parameters = parameters; this.prefixForIncoming = parameters.getPrefixForIncoming(); @@ -88,9 +99,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory this.pathMover = factory.getMover(); this.factory = factory; this.bufferDirs = bufferDirs; - this.quietPeriodFileFilter = new QuietPeriodFileFilter(incomingStore, parameters); + this.quietPeriodFileFilter = new QuietPeriodFileFilter(incomingStore, parameters, timeProvider); } - + public TimerTask createRecoverableTimerTask() { return new IncomingProcessorRecoveryTask(); diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b57a0e7f14f79b4be13c31d4f58340bf7dc41369 --- /dev/null +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover; + +import java.io.File; +import java.io.IOException; +import java.util.TimerTask; + +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.common.utilities.IExitHandler; +import ch.systemsx.cisd.common.utilities.MockTimeProvider; +import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; +import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover; +import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover; +import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; + +/** + * + * + * @author Franz-Josef Elmer + */ +public class IncomingProcessorTest +{ + private static final File TEST_FOLDER = new File("targets/unit-test/IncomingProcessorTest"); + private static final String INCOMING_DIR = "incoming"; + private static final String COPY_IN_PROGRESS_DIR = "copy-in-progress"; + private static final String COPY_COMPLETE_DIR = "copy-complete"; + private static final String READY_TO_MOVE_DIR = "ready-to-move"; + private static final String TEMP_DIR = "temp"; + + private Mockery context; + private IFileSysOperationsFactory fileSysOpertationFactory; + private IPathMover mover; + private IPathRemover remover; + private DataMoverProcess process; + private File incomingDir; + private IExitHandler exitHandler; + private File copyInProgressDir; + private File copyCompleteDir; + + @BeforeMethod + public void setUp() + { + context = new Mockery(); + fileSysOpertationFactory = context.mock(IFileSysOperationsFactory.class); + mover = context.mock(IPathMover.class); + remover = context.mock(IPathRemover.class); + exitHandler = context.mock(IExitHandler.class); + + FileUtilities.deleteRecursively(TEST_FOLDER); + TEST_FOLDER.mkdirs(); + incomingDir = new File(TEST_FOLDER, INCOMING_DIR); + incomingDir.mkdir(); + copyInProgressDir = new File(TEST_FOLDER, COPY_IN_PROGRESS_DIR); + copyInProgressDir.mkdir(); + copyCompleteDir = new File(TEST_FOLDER, COPY_COMPLETE_DIR); + copyCompleteDir.mkdir(); + new File(TEST_FOLDER, READY_TO_MOVE_DIR).mkdir(); + new File(TEST_FOLDER, TEMP_DIR).mkdir(); + + String[] parameterArguments = new String[] + { "--" + PropertyNames.INCOMING_DIR, incomingDir.toString(), "-q", "1" }; + Parameters parameters = new Parameters(parameterArguments, exitHandler); + LocalBufferDirs localBufferDirs = + new LocalBufferDirs(new FileWithHighwaterMark(TEST_FOLDER), COPY_IN_PROGRESS_DIR, + COPY_COMPLETE_DIR, READY_TO_MOVE_DIR, TEMP_DIR); + context.checking(new Expectations() + { + { + allowing(fileSysOpertationFactory).getMover(); + will(returnValue(mover)); + + allowing(fileSysOpertationFactory).getRemover(); + will(returnValue(remover)); + } + }); + process = + IncomingProcessor.createMovingProcess(parameters, fileSysOpertationFactory, + new MockTimeProvider(), localBufferDirs); + } + + @AfterMethod + public void tearDown() + { + // To following line of code should also be called at the end of each test method. + // Otherwise one do not known which test failed. + context.assertIsSatisfied(); + } + + @Test + public void testHappyCase() throws IOException + { + final File testDataFile = new File(incomingDir, "test-data.txt"); + testDataFile.createNewFile(); + context.checking(new Expectations() + { + { + one(mover).tryMove(testDataFile, copyCompleteDir, ""); + will(returnValue(new File(copyCompleteDir, testDataFile.getName()))); + } + }); + + TimerTask dataMoverTimerTask = process.getDataMoverTimerTask(); + dataMoverTimerTask.run(); // 1. round finds a file to process + dataMoverTimerTask.run(); // 2. round finds that quiet period is over + + context.assertIsSatisfied(); + } +}