diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/CleansingPathHandlerDecorator.java b/datamover/source/java/ch/systemsx/cisd/datamover/CleansingPathHandlerDecorator.java deleted file mode 100644 index 5188ba598e7158c241e4788d47038de73821cfb5..0000000000000000000000000000000000000000 --- a/datamover/source/java/ch/systemsx/cisd/datamover/CleansingPathHandlerDecorator.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover; - -import java.io.File; -import java.io.FileFilter; - -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import ch.systemsx.cisd.common.logging.ISimpleLogger; -import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; -import ch.systemsx.cisd.common.logging.LogCategory; -import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.common.utilities.FileUtilities; -import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IPathHandler; - -/** - * A class that decorates an {@link IPathHandler} with a selective cleansing task. The files that should be removed in - * the cleansing step are selected by a {@link FileFilter}. Note: if a <var>path</var> that should be handled itself - * is accepted by the <code>FileFilter</code>, the complete directory is removed and the decorated handler will never - * be called on this path. - * - * @author Bernd Rinn - */ -public class CleansingPathHandlerDecorator implements IPathHandler -{ - - private final Logger operationLog = - LogFactory.getLogger(LogCategory.OPERATION, CleansingPathHandlerDecorator.class); - - private final FileFilter filter; - - private final IPathHandler decoratedHandler; - - public CleansingPathHandlerDecorator(FileFilter filter, IPathHandler decoratedHandler) - { - assert filter != null; - assert decoratedHandler != null; - - this.filter = filter; - this.decoratedHandler = decoratedHandler; - } - - public boolean handle(File path) - { - assert path != null; - - final ISimpleLogger logger = - operationLog.isDebugEnabled() ? new Log4jSimpleLogger(Level.DEBUG, operationLog) : null; - final boolean pathDeleted = FileUtilities.deleteRecursively(path, filter, logger); - if (pathDeleted == false) - { - return decoratedHandler.handle(path); - } else - { - return true; - } - } -} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java index 6a729a8d6236645fadf50db3ff67998c9f9b0cdd..36966f370e90afaba1b0f602a91ce2ac2447d4cc 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java @@ -28,6 +28,8 @@ import org.apache.log4j.Logger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.ITerminable; +import ch.systemsx.cisd.datamover.intf.IReadPathOperations; +import ch.systemsx.cisd.datamover.intf.ITimingParameters; /** * A <code>CopyActivityMonitor</code> monitors write activity on a <var>destinationPath</var> and triggers an alarm @@ -44,7 +46,7 @@ public class CopyActivityMonitor private final File destinationDirectory; - private final IPathLastChangedChecker checker; + private final IReadPathOperations readOperations; private final long checkIntervallMillis; @@ -92,12 +94,12 @@ public class CopyActivityMonitor * Creates a monitor. * * @param destinationDirectory The directory to monitor for write access. - * @param factory The provider to get the {@link IPathLastChangedChecker} from. + * @param readOperations Provides read-only access to the file system. * @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy * process gets stuck. * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from. */ - public CopyActivityMonitor(File destinationDirectory, IFileSysOperationsFactory factory, ITerminable copyProcess, + public CopyActivityMonitor(File destinationDirectory, IReadPathOperations readOperations, ITerminable copyProcess, ITimingParameters timingParameters) { this.monitoredPathLastChecked = new AtomicLong(0); @@ -105,15 +107,14 @@ public class CopyActivityMonitor this.pathToBeCopied = new AtomicReference<File>(null); assert destinationDirectory != null; - assert factory != null; + assert readOperations != null; assert copyProcess != null; assert timingParameters != null; this.destinationDirectory = destinationDirectory; - this.checker = factory.getChecker(); + this.readOperations = readOperations; this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); - assert this.checker != null; assert this.checkIntervallMillis > 0; final String currentThreadName = Thread.currentThread().getName(); @@ -147,7 +148,7 @@ public class CopyActivityMonitor ++currentNumberOfActivityMonitor; activityMonitoringTimer = new Timer(threadNamePrefix + "Activity Monitor " + currentNumberOfActivityMonitor, true); - activityMonitoringTimerTask = new ActivityMonitoringTimerTask(checker); + activityMonitoringTimerTask = new ActivityMonitoringTimerTask(); activityMonitoringTimer.schedule(activityMonitoringTimerTask, 0, checkIntervallMillis); } @@ -184,9 +185,9 @@ public class CopyActivityMonitor private AtomicBoolean terminated = new AtomicBoolean(false); - private ActivityMonitoringTimerTask(IPathLastChangedChecker checker) + private ActivityMonitoringTimerTask() { - assert checker != null; + assert readOperations != null; assert pathToBeCopied != null; assert monitoredPathLastChanged != null; assert destinationDirectory != null; @@ -211,21 +212,21 @@ public class CopyActivityMonitor final File pathToCheck = new File(destinationDirectory, path.getName()); if (operationLog.isTraceEnabled()) { - operationLog.trace(String.format("Asking checker %s for last change time of path '%s'.", checker - .getClass().getName(), pathToCheck)); + operationLog.trace(String.format("Asking checker %s for last change time of path '%s'.", + readOperations.getClass().getName(), pathToCheck)); } - if (pathToCheck.exists() == false) + if (readOperations.exists(pathToCheck) == false) { operationLog.warn(String.format("File or directory '%s' does not (yet?) exist.", pathToCheck)); monitoredPathLastChecked.set(System.currentTimeMillis()); return; } - final long lastChangedAsFoundByPathChecker = checker.lastChanged(pathToCheck); + final long lastChangedAsFoundByPathChecker = readOperations.lastChanged(pathToCheck); if (operationLog.isTraceEnabled()) { operationLog.trace(String.format( - "Checker %s reported last changed time of path '%s' to be %3$tF %3$tT.", checker.getClass() - .getName(), pathToCheck.getPath(), lastChangedAsFoundByPathChecker)); + "Checker %s reported last changed time of path '%s' to be %3$tF %3$tT.", readOperations + .getClass().getName(), pathToCheck.getPath(), lastChangedAsFoundByPathChecker)); } if (terminated.get()) // Don't modify the time variables any more if we got terminated. { diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/FSPathLastChangedChecker.java b/datamover/source/java/ch/systemsx/cisd/datamover/FSPathLastChangedChecker.java deleted file mode 100644 index bd332506ea57dbfc0deadd886e364b27bef82865..0000000000000000000000000000000000000000 --- a/datamover/source/java/ch/systemsx/cisd/datamover/FSPathLastChangedChecker.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover; - -import java.io.File; - -import ch.systemsx.cisd.common.utilities.FileUtilities; - -/** - * Checks for last change time of a directory in the file system. - * - * @author Bernd Rinn - */ -public class FSPathLastChangedChecker implements IPathLastChangedChecker -{ - - /** - * @return The time when any file below <var>directory</var> has last been changed in the file system. - * @throws ch.systemsx.cisd.common.exceptions.EnvironmentFailureException If the <var>directory</var> does not - * exist, is not readable, or is not a directory. - */ - public long lastChanged(File path) - { - return FileUtilities.lastChanged(path); - } - -} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/FSPathRemover.java b/datamover/source/java/ch/systemsx/cisd/datamover/FSPathRemover.java index 81d31886e202cc3b3b84128c444d81b4cb3f9821..0e26dd1d1e502d4c7aa40e467c32c8dc2b3e71df 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/FSPathRemover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/FSPathRemover.java @@ -21,6 +21,7 @@ import java.io.File; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.StatusFlag; import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.datamover.intf.IPathRemover; /** * Removes a path (file or directory) from the file system, if necessary recursively. diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/GatePathHandlerDecorator.java b/datamover/source/java/ch/systemsx/cisd/datamover/GatePathHandlerDecorator.java deleted file mode 100644 index be9812c961826bf00aaac119863a1b2e41091827..0000000000000000000000000000000000000000 --- a/datamover/source/java/ch/systemsx/cisd/datamover/GatePathHandlerDecorator.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover; - -import java.io.File; -import java.io.FileFilter; -import java.util.Date; - -import org.apache.log4j.Logger; - -import ch.systemsx.cisd.common.logging.LogCategory; -import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IPathHandler; - -/** - * A class that decorates an {@link IPathHandler} with a selective gate. If a path is picked up by a {@link FileFilter}, - * the first handler is chosen to handle the path, if it doesn't, the second one. The gate relates only to the top-level - * path (may it be a file or a directory), not to any sub-directories. - * - * @author Bernd Rinn - */ -public class GatePathHandlerDecorator implements IPathHandler -{ - - private static final Logger operationLog = - LogFactory.getLogger(LogCategory.OPERATION, GatePathHandlerDecorator.class); - - private final FileFilter filter; - - private final IPathHandler defaultHandler; - - private final IPathHandler filteredHandler; - - public GatePathHandlerDecorator(FileFilter filter, IPathHandler defaultHandler, IPathHandler filteredHandler) - { - assert filter != null; - assert defaultHandler != null; - assert filteredHandler != null; - - this.filter = filter; - this.defaultHandler = defaultHandler; - this.filteredHandler = filteredHandler; - } - - public boolean handle(File path) - { - assert path != null; - - final String absolutePath = path.getAbsolutePath(); - final boolean filtered = filter.accept(path); - if (filtered) - { - if (operationLog.isInfoEnabled()) - { - operationLog.info(String.format("FILTERED %s [created: %2$tY-%2$tm-%2$td %2$tH:%2$tM:%2$tS]", - absolutePath, new Date(path.lastModified()))); - } - return filteredHandler.handle(path); - } else - { - if (operationLog.isInfoEnabled()) - { - operationLog.info(String.format("DEFAULT %s [created: %2$tY-%2$tm-%2$td %2$tH:%2$tM:%2$tS]", - absolutePath, new Date(path.lastModified()))); - } - return defaultHandler.handle(path); - } - } - -} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IPathLastChangedChecker.java b/datamover/source/java/ch/systemsx/cisd/datamover/IPathLastChangedChecker.java deleted file mode 100644 index 31da677613246bd2bdf23961437ffde712d0f07a..0000000000000000000000000000000000000000 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IPathLastChangedChecker.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover; - -import java.io.File; - -/** - * Interface that represents a role that can check when there has been the last write access to a given directory. - * - * @author Bernd Rinn - */ -public interface IPathLastChangedChecker -{ - - /** - * Returns the last time when there was a write access to <var>directory</var>. - * - * @param path The path to check for last write activity. - * @return The time (in milliseconds since the start of the epoch) when <var>path</var> was last changed. - */ - public long lastChanged(File path); - -} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/LazyPathHandler.java b/datamover/source/java/ch/systemsx/cisd/datamover/LazyPathHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..94b8750745f0a6af97e6ef4008f27d607053885d --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/LazyPathHandler.java @@ -0,0 +1,140 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover; + +import java.io.File; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.utilities.ITerminable; +import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IPathHandler; + +/** + * Asynchronous path handler. Queues tasks and processes them in a separate thread. Use {{@link #terminate()} to clean + * resources if you do not need the instance of this class anymore. + * + * @author Tomasz Pylak on Aug 24, 2007 + */ +public class LazyPathHandler implements ITerminable, IPathHandler +{ + private static final Logger notificationLog = LogFactory.getLogger(LogCategory.NOTIFY, LazyPathHandler.class); + + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, LazyPathHandler.class); + + private final PathHandlerThread thread; + + private LazyPathHandler(PathHandlerThread thread) + { + this.thread = thread; + } + + public static LazyPathHandler create(final IPathHandler handler, String threadName) + { + PathHandlerThread thread = new PathHandlerThread(handler); + final LazyPathHandler lazyHandler = new LazyPathHandler(thread); + thread.setName(threadName); + thread.start(); + return lazyHandler; + } + + private static class PathHandlerThread extends Thread + { + private final BlockingQueue<File> queue; + + private final IPathHandler handler; + + private boolean terminate; + + public PathHandlerThread(IPathHandler handler) + { + this.queue = new LinkedBlockingQueue<File>(); + this.handler = handler; + this.terminate = false; + } + + @Override + public void run() + { + while (terminate == false) + { + try + { + File resource = queue.take(); // blocks if empty + boolean ok = handler.handle(resource); + logHandlingResult(resource, ok); + } catch (InterruptedException ex) + { + if (!terminate) + { + operationLog.info("Processing was unexpectedly interrupted. Thread stops."); + } + return; + } + } + } + + private void logHandlingResult(File resource, boolean ok) + { + if (ok) + { + operationLog.info("Processing succeded: " + resource.getAbsolutePath()); + } else + { + notificationLog.error("Processing failed: " + resource.getAbsolutePath()); + } + } + + public synchronized void process(File resource) + { + queue.add(resource); + } + + public synchronized void terminate() + { + this.terminate = true; + } + + public synchronized boolean isTerminated() + { + return terminate; + } + } + + /** cleans resources */ + public boolean terminate() + { + thread.terminate(); + thread.interrupt(); + return true; + } + + /** + * queues resource processing and exits immediately + * + * @return always true + */ + public boolean handle(File resource) + { + assert thread.isTerminated() == false; + thread.process(resource); + return true; + } +} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/LocalBufferDirs.java b/datamover/source/java/ch/systemsx/cisd/datamover/LocalBufferDirs.java new file mode 100644 index 0000000000000000000000000000000000000000..e2223d594e988428f4c4666dab1462769abd28f7 --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/LocalBufferDirs.java @@ -0,0 +1,70 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover; + +import java.io.File; + +import ch.systemsx.cisd.datamover.helper.FileSystemHelper; + +/** + * Paths to different local buffer directories. + * @author Tomasz Pylak on Aug 29, 2007 + */ +public class LocalBufferDirs +{ + private final File copyInProgressDir; + + private final File copyCompleteDir; + + private final File readyToMoveDir; + + private final File tempDir; + + public LocalBufferDirs(Parameters parameters, String copyInProgressDirName, String copyCompleteDirName, + String readyToMoveDirName, String tempDirName) + { + File buffer = parameters.getBufferStore().getPath(); + this.copyInProgressDir = FileSystemHelper.ensureDirectoryExists(buffer, copyInProgressDirName); + this.copyCompleteDir = FileSystemHelper.ensureDirectoryExists(buffer, copyCompleteDirName); + this.readyToMoveDir = FileSystemHelper.ensureDirectoryExists(buffer, readyToMoveDirName); + this.tempDir = FileSystemHelper.ensureDirectoryExists(buffer, tempDirName); + } + + /** here data are copied from incoming */ + public File getCopyInProgressDir() + { + return copyInProgressDir; + } + + /** here data are moved when copy is complete */ + public File getCopyCompleteDir() + { + return copyCompleteDir; + } + + /** from here data are moved to outgoing directory */ + public File getReadyToMoveDir() + { + return readyToMoveDir; + } + + /** auxiliary directory used if we need to make a copy of incoming data */ + public File getTempDir() + { + return tempDir; + } +} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessorHandler.java b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessorHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..b36cf5cef735d66f0b5542efa2668b69f9c8198a --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessorHandler.java @@ -0,0 +1,238 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover; + +import java.io.File; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.logging.ISimpleLogger; +import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.common.utilities.RegexFileFilter; +import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IPathHandler; +import ch.systemsx.cisd.common.utilities.RegexFileFilter.PathType; +import ch.systemsx.cisd.datamover.helper.FileSystemHelper; +import ch.systemsx.cisd.datamover.intf.IPathImmutableCopier; + +/** + * Processing of the files on the local machine. This class does not scan its input directory, all resources must + * registered with a handler by someone else, also in the case of recovery after shutdown. + * + * @author Tomasz Pylak on Aug 24, 2007 + */ +public class LocalProcessorHandler implements IPathHandler +{ + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, LocalProcessorHandler.class); + + private final Parameters parameters; + + private final IPathImmutableCopier copier; + + // output: from here data are moved when processing is finished. + private final File outputDir; + + // auxiliary directory used if we need to make a copy of incoming data + // Making a copy can take some time, so we do that in the temporary directory. Than we move it from + // temporary the final destination. In this way external process can start moving data from final + // destination as soon as they appear there. + private final File tempDir; + + private final File extraCopyDirOrNull; + + private final LazyPathHandler outgoingHandler; + + private LocalProcessorHandler(Parameters parameters, File outputDir, File tempDir, LazyPathHandler outgoingHandler, + IPathImmutableCopier copier) + { + this.parameters = parameters; + this.outputDir = outputDir; + this.tempDir = tempDir; + this.outgoingHandler = outgoingHandler; + this.extraCopyDirOrNull = parameters.tryGetExtraCopyDir(); + this.copier = copier; + } + + public static final IPathHandler createAndRecover(Parameters parameters, File inputDir, File outputDir, + File bufferDir, LazyPathHandler lastStepHandler, IPathImmutableCopier copier) + { + LocalProcessorHandler handler = + new LocalProcessorHandler(parameters, outputDir, bufferDir, lastStepHandler, copier); + handler.recoverAfterShutdown(inputDir); + return handler; + } + + // ---------------- + + private void recoverAfterShutdown(File inputDir) + { + recoverTemporaryExtraCopy(tempDir, inputDir, extraCopyDirOrNull); + recoverRegisterReadyForOutgoing(outputDir, outgoingHandler); + } + + private static void recoverTemporaryExtraCopy(File tempDir, File inputDir, File extraCopyDirOrNull) + { + File[] files = FileSystemHelper.listFiles(tempDir); + if (files == null || files.length == 0) + return; // directory is empty, no recovery is needed + + for (int i = 0; i < files.length; i++) + { + File file = files[i]; + if (fileExists(inputDir, file)) + { + FileUtilities.deleteRecursively(file); // partial copy, delete it + } else + { + // if in previous run we were creating an extra copy, and now we do not, we leave the resource in tmp + // directory. If now we do create copies, it's not clear what to do, because the destination directory + // could change. We move the copy to that directory to ensure clean recovery from errors. + if (extraCopyDirOrNull != null) + { + FileSystemHelper.tryMoveLocal(file, extraCopyDirOrNull); + } + } + } + } + + private static boolean fileExists(File inputDir, File file) + { + return new File(inputDir, file.getName()).exists(); + } + + private static void recoverRegisterReadyForOutgoing(File outputDir, IPathHandler outgoingHandler) + { + File[] files = FileSystemHelper.listFiles(outputDir); + if (files == null || files.length == 0) + return; // directory is empty, no recovery is needed + + for (int i = 0; i < files.length; i++) + { + outgoingHandler.handle(files[i]); + } + } + + // ---------------- + + public boolean handle(File path) + { + Boolean result = tryMoveManualOrClean(path); + if (result != null) + return result.booleanValue(); // stop processing + + boolean ok = true; + File extraTmpCopy = null; + if (extraCopyDirOrNull != null) + { + extraTmpCopy = copier.tryCopy(path, tempDir); + ok = ok && (extraTmpCopy != null); + } + + File movedFile = FileSystemHelper.tryMoveLocal(path, outputDir); + if (movedFile != null) + { + outgoingHandler.handle(movedFile); + } + ok = ok && (movedFile != null); + + if (extraTmpCopy != null) + { + assert extraCopyDirOrNull != null; + File extraCopy = FileSystemHelper.tryMoveLocal(extraTmpCopy, extraCopyDirOrNull); + ok = ok && (extraCopy != null); + } + return ok; + } + + // @return true if successed, false if failed, null if succeded and file still exists + private Boolean tryMoveManualOrClean(File file) + { + EFileManipResult manualMoveStatus = doManualIntervention(file); + if (manualMoveStatus == EFileManipResult.FAILURE) + { + return Boolean.FALSE; + } else if (manualMoveStatus == EFileManipResult.STOP) + { + return Boolean.TRUE; + } else if (manualMoveStatus == EFileManipResult.CONTINUE) + { + // continue processing + } + boolean wholeDeleted = doCleansing(file); + if (wholeDeleted) + { + return Boolean.TRUE; + } + return null; // else continue processing + + } + + // @return true if the whole resource was deleted + private boolean doCleansing(File resource) + { + final RegexFileFilter cleansingFilter = new RegexFileFilter(); + Pattern cleansingRegex = parameters.getCleansingRegex(); + if (cleansingRegex != null) + { + log(resource, "Doing cleansing"); + cleansingFilter.add(PathType.FILE, cleansingRegex); + } + final ISimpleLogger logger = + operationLog.isDebugEnabled() ? new Log4jSimpleLogger(Level.DEBUG, operationLog) : null; + boolean pathDeleted = FileUtilities.deleteRecursively(resource, cleansingFilter, logger); + return pathDeleted; + } + + private enum EFileManipResult + { + CONTINUE, FAILURE, STOP + } + + private EFileManipResult doManualIntervention(File resource) + { + RegexFileFilter manualInterventionFilter = new RegexFileFilter(); + Pattern manualInterventionRegex = parameters.getManualInterventionRegex(); + if (manualInterventionRegex != null) + { + manualInterventionFilter.add(PathType.ALL, manualInterventionRegex); + } + + boolean filtered = manualInterventionFilter.accept(resource); + if (filtered) + { + log(resource, "Moving to manual intervention directory"); + File manualInterventionDir = parameters.getManualInterventionDirectory(); + File movedFile = FileSystemHelper.tryMoveLocal(resource, manualInterventionDir); + return (movedFile != null) ? EFileManipResult.STOP : EFileManipResult.FAILURE; + } else + { + return EFileManipResult.CONTINUE; + } + } + + private static void log(File path, String description) + { + if (operationLog.isInfoEnabled()) + { + operationLog.info(String.format("%s on %s", description, path.getPath())); + } + } +} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Main.java b/datamover/source/java/ch/systemsx/cisd/datamover/Main.java index 355f89a07c067d48adeba9f64bbfa1feb81af191..418577250b4a29b85939633274280be3882ff9f0 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/Main.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/Main.java @@ -30,8 +30,15 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.utilities.BuildAndEnvironmentInfo; +import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.OSUtilities; import ch.systemsx.cisd.datamover.hardlink.RecursiveHardLinkMaker; +import ch.systemsx.cisd.datamover.helper.FileSystemHelper; +import ch.systemsx.cisd.datamover.intf.IFileSysOperationsFactory; +import ch.systemsx.cisd.datamover.intf.IPathCopier; +import ch.systemsx.cisd.datamover.intf.IPathImmutableCopier; +import ch.systemsx.cisd.datamover.intf.IReadPathOperations; +import ch.systemsx.cisd.datamover.intf.IPathRemover; import ch.systemsx.cisd.datamover.rsync.RsyncCopier; import ch.systemsx.cisd.datamover.xcopy.XcopyCopier; @@ -158,7 +165,7 @@ public class Main { sshExecutable = new File(sshExecutablePath); } else - // Explicitely disable tunneling via ssh on the command line. + // Explicitly disable tunneling via ssh on the command line. { sshExecutable = null; } @@ -174,6 +181,41 @@ public class Main return sshExecutable; } + private static IPathCopier getPathCopier(Parameters parameters, File destinationDirectory) + { + IPathCopier copyProcess = suggestPathCopier(parameters, false); + boolean requiresDeletionBeforeCreation = + SelfTest.requiresDeletionBeforeCreation(copyProcess, destinationDirectory); + return suggestPathCopier(parameters, requiresDeletionBeforeCreation); + } + + private static IFileSysOperationsFactory createFileSysOperations(final Parameters parameters) + { + final IReadPathOperations readAccessor = FileSystemHelper.createPathReadOperations(); + return new IFileSysOperationsFactory() + { + public IPathCopier getCopier(File destinationDirectory) + { + return getPathCopier(parameters, destinationDirectory); + } + + public IPathRemover getRemover() + { + return new FSPathRemover(); + } + + public IPathImmutableCopier getImmutableCopier() + { + return getImmutablePathCopier(parameters); + } + + public IReadPathOperations getReadAccessor() + { + return readAccessor; + } + }; + } + /** * performs a self-test. */ @@ -187,9 +229,10 @@ public class Main stores.add(parameters.getBufferStore()); stores.add(parameters.getOutgoingStore()); stores.add(parameters.getManualInterventionStore()); - if (parameters.tryGetExtraCopyStore() != null) + if (parameters.tryGetExtraCopyDir() != null) { - stores.add(parameters.tryGetExtraCopyStore()); + FileStore dummyStore = new FileStore(parameters.tryGetExtraCopyDir(), "extra-copy", null, false); + stores.add(dummyStore); } SelfTest.check(copyProcess, stores.toArray(new FileStore[] {})); } catch (HighLevelException e) @@ -204,45 +247,17 @@ public class Main } } - /** - * Returns the path copier - */ - private static IPathCopier getPathCopier(Parameters parameters, File destinationDirectory) + /** exposed for testing purposes */ + static ITerminable startupServer(Parameters parameters, LocalBufferDirs bufferDirs) { - IPathCopier copyProcess = suggestPathCopier(parameters, false); - boolean requiresDeletionBeforeCreation = - SelfTest.requiresDeletionBeforeCreation(copyProcess, destinationDirectory); - return suggestPathCopier(parameters, requiresDeletionBeforeCreation); + final IFileSysOperationsFactory factory = createFileSysOperations(parameters); + return MonitorStarter.start(parameters, factory, bufferDirs); } - private static void startupServer(final Parameters parameters) + private static void startupServer(Parameters parameters) { - selfTest(parameters); - final IFileSysOperationsFactory factory = new IFileSysOperationsFactory() - { - public IPathLastChangedChecker getChecker() - { - return new FSPathLastChangedChecker(); - } - - public IPathCopier getCopier(File destinationDirectory) - { - return getPathCopier(parameters, destinationDirectory); - } - - public IPathRemover getRemover() - { - return new FSPathRemover(); - } - - public IPathImmutableCopier getImmutableCopier() - { - return getImmutablePathCopier(parameters); - } - }; - - final MonitorStarter starter = new MonitorStarter(parameters, factory); - starter.start(); + final IFileSysOperationsFactory factory = createFileSysOperations(parameters); + MonitorStarter.start(parameters, factory); } public static void main(String[] args) @@ -250,6 +265,7 @@ public class Main initLog(); final Parameters parameters = new Parameters(args); printInitialLogMessage(parameters); + selfTest(parameters); startupServer(parameters); operationLog.info("datamover ready and waiting for data."); } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java b/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java index 530b1781e8055b945ab62cd498e2f3e03ad9fe47..a3c2680222327ccf28c0ebb77ca395ad59282a91 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java @@ -17,27 +17,30 @@ package ch.systemsx.cisd.datamover; import java.io.File; -import java.io.FileFilter; import java.util.Timer; -import java.util.TimerTask; -import ch.systemsx.cisd.common.Constants; -import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; -import ch.systemsx.cisd.common.utilities.IntraFSPathMover; -import ch.systemsx.cisd.common.utilities.NamePrefixFileFilter; -import ch.systemsx.cisd.common.utilities.RegexFileFilter; +import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IPathHandler; -import ch.systemsx.cisd.common.utilities.RegexFileFilter.PathType; +import ch.systemsx.cisd.datamover.helper.CopyFinishedMarker; +import ch.systemsx.cisd.datamover.helper.FileSystemHelper; +import ch.systemsx.cisd.datamover.intf.IFileSysOperationsFactory; +import ch.systemsx.cisd.datamover.intf.IPathCopier; +import ch.systemsx.cisd.datamover.intf.IPathImmutableCopier; +import ch.systemsx.cisd.datamover.intf.IPathRemover; +import ch.systemsx.cisd.datamover.intf.IReadPathOperations; /** - * A class that starts up the monitoring processes, based on the parameters provided. + * A class that starts up the processing pipeline and its monitoring, based on the parameters provided. * * @author Bernd Rinn + * @author Tomasz Pylak on Aug 24, 2007 */ public class MonitorStarter { - private final static String LOCAL_IN_PROGRESS_DIR = "in-progress"; + private final static String LOCAL_COPY_IN_PROGRESS_DIR = "copy-in-progress"; + + private final static String LOCAL_COPY_COMPLETE_DIR = "copy-complete"; private final static String LOCAL_READY_TO_MOVE_DIR = "ready-to-move"; @@ -47,210 +50,260 @@ public class MonitorStarter private final IFileSysOperationsFactory factory; - private final File inProgressDir; // here data are copied from incoming + private final LocalBufferDirs bufferDirs; - private final File readyToMoveDir;// from here data are moved to outgoing directory + /** + * starts the process of moving data and monitoring it + * + * @return object which can be used to terminate the process and all its threads + */ + public static final ITerminable start(Parameters parameters, IFileSysOperationsFactory factory) + { + LocalBufferDirs localBufferDirs = + new LocalBufferDirs(parameters, LOCAL_COPY_IN_PROGRESS_DIR, LOCAL_COPY_COMPLETE_DIR, + LOCAL_READY_TO_MOVE_DIR, LOCAL_TEMP_DIR); + return start(parameters, factory, localBufferDirs); + } - private final File tempDir;// auxiliary directory used if we need to make a copy of incoming data + /** Allows to specify buffer directories. Exposed for testing purposes. */ + public static final ITerminable start(Parameters parameters, IFileSysOperationsFactory factory, + LocalBufferDirs localBufferDirs) + { + return new MonitorStarter(parameters, factory, localBufferDirs).start(); + } - public MonitorStarter(Parameters parameters, IFileSysOperationsFactory factory) + private MonitorStarter(Parameters parameters, IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs) { this.parameters = parameters; this.factory = factory; - File buffer = parameters.getBufferStore().getPath(); - this.inProgressDir = ensureDirectoryExists(buffer, LOCAL_IN_PROGRESS_DIR); - this.readyToMoveDir = ensureDirectoryExists(buffer, LOCAL_READY_TO_MOVE_DIR); - this.tempDir = ensureDirectoryExists(buffer, LOCAL_TEMP_DIR); + this.bufferDirs = bufferDirs; } - public void start() + private ITerminable start() { - startupIncomingMovingProcess(parameters.getIncomingStore()); - startupOutgoingMovingProcess(readyToMoveDir, parameters.getOutgoingStore()); + final LazyPathHandler outgoingProcessor = startupOutgoingMovingProcess(parameters.getOutgoingStore()); + final LazyPathHandler localProcessor = startupLocalProcessing(outgoingProcessor); + final Timer incomingProcessor = startupIncomingMovingProcess(parameters.getIncomingStore(), localProcessor); + return createTerminable(outgoingProcessor, localProcessor, incomingProcessor); } - private void startupIncomingMovingProcess(FileStore incomingStore) + private static ITerminable createTerminable(final LazyPathHandler outgoingProcessor, + final LazyPathHandler localProcessor, final Timer incomingProcessor) { - final File manualInterventionDir = parameters.getManualInterventionDirectory(); - final RegexFileFilter cleansingFilter = new RegexFileFilter(); - if (parameters.getCleansingRegex() != null) - { - cleansingFilter.add(PathType.FILE, parameters.getCleansingRegex()); - } - final RegexFileFilter manualInterventionFilter = new RegexFileFilter(); - if (parameters.getManualInterventionRegex() != null) - { - manualInterventionFilter.add(PathType.ALL, parameters.getManualInterventionRegex()); - } + return new ITerminable() + { + public boolean terminate() + { + incomingProcessor.cancel(); + boolean ok = localProcessor.terminate(); + ok = ok && outgoingProcessor.terminate(); + return ok; + } + }; + } + + private LazyPathHandler startupLocalProcessing(LazyPathHandler outgoingHandler) + { + IPathImmutableCopier copier = factory.getImmutableCopier(); + IPathHandler localProcesingHandler = + LocalProcessorHandler.createAndRecover(parameters, bufferDirs.getCopyCompleteDir(), bufferDirs + .getReadyToMoveDir(), bufferDirs.getTempDir(), outgoingHandler, copier); + return LazyPathHandler.create(localProcesingHandler, "Local Processor"); + } + + // --- Incoming data processing ----------------------- + + private Timer startupIncomingMovingProcess(FileStore incomingStore, LazyPathHandler localProcessor) + { + IReadPathOperations readOperations = factory.getReadAccessor(); + boolean isIncomingRemote = parameters.getTreatIncomingAsRemote(); + + recoverIncomingAfterShutdown(incomingStore, readOperations, isIncomingRemote, localProcessor); IPathHandler pathHandler = - createIncomingMovingPathHandler(incomingStore.getHost(), manualInterventionDir, - manualInterventionFilter, cleansingFilter); + createIncomingMovingPathHandler(incomingStore.getHost(), localProcessor, isIncomingRemote); final DirectoryScanningTimerTask movingTask = - new DirectoryScanningTimerTask(incomingStore.getPath(), new QuietPeriodFileFilter(parameters, factory), - pathHandler); + new DirectoryScanningTimerTask(incomingStore.getPath(), new QuietPeriodFileFilter(parameters, + readOperations), pathHandler); final Timer movingTimer = new Timer("Mover of Incomming Data"); - schedule(movingTimer, movingTask, 0, parameters.getCheckIntervalMillis(), parameters.getTreatIncomingAsRemote()); + // The moving task is scheduled at fixed rate. It makes sense especially if the task is moving data from the + // remote share. The rationale behind this is that if new items are + // added to the source directory while the remote timer task has been running for a long time, busy moving data, + // the task shoulnd't sit idle for the check time when there is actually work to do. + movingTimer.scheduleAtFixedRate(movingTask, 0, parameters.getCheckIntervalMillis()); + return movingTimer; } - private IPathHandler createIncomingMovingPathHandler(String sourceHost, File manualInterventionDir, - RegexFileFilter manualInterventionFilter, RegexFileFilter cleansingFilter) + private void recoverIncomingAfterShutdown(FileStore incomingStore, IReadPathOperations incomingReadOperations, + boolean isIncomingRemote, LazyPathHandler localProcessor) { - IPathHandler processMoved = - createProcessMovedFile(readyToMoveDir, tempDir, parameters.tryGetExtraCopyStore(), factory); - IPathHandler moveAndProcess = - createMoveAndProcess(sourceHost, processMoved, parameters.getTreatIncomingAsRemote()); - IPathHandler manualInterventionMover = createPathMoverToLocal(sourceHost, manualInterventionDir); - CleansingPathHandlerDecorator cleansingOrMover = - new CleansingPathHandlerDecorator(cleansingFilter, moveAndProcess); - return new GatePathHandlerDecorator(manualInterventionFilter, cleansingOrMover, manualInterventionMover); - } + if (isIncomingRemote == false) + return; // no recovery is needed - private IPathHandler createMoveAndProcess(String sourceHost, final IPathHandler processMoved, - final boolean isIncomingRemote) - { - final IPathHandler moveFromIncoming = createPathMoverToLocal(sourceHost, inProgressDir); - IPathHandler moveAndProcess = new IPathHandler() - { - public boolean handle(File path) - { - boolean ok = moveFromIncoming.handle(path); - if (ok) - { - // create path in destination directory - File movedFile = new File(inProgressDir, path.getName()); - File markFile = new File(inProgressDir, Constants.IS_FINISHED_PREFIX + path.getName()); - assert movedFile.exists(); - if (isIncomingRemote) - { - assert markFile.exists(); - } - ok = processMoved.handle(movedFile); - if (isIncomingRemote) - { - markFile.delete(); // process even if mark file could not be deleted - } - } - return ok; - } - }; - return moveAndProcess; + recoverIncomingInProgress(incomingStore, incomingReadOperations, bufferDirs.getCopyInProgressDir(), bufferDirs + .getCopyCompleteDir()); + recoverIncomingCopyComplete(bufferDirs.getCopyCompleteDir(), localProcessor); } - private static IPathHandler createProcessMovedFile(File destDirectory, File tempDir, - FileStore extraCopyStoreOrNull, IFileSysOperationsFactory factory) + private static void recoverIncomingInProgress(FileStore incomingStore, IReadPathOperations incomingReadOperations, + File copyInProgressDir, File copyCompleteDir) { - FileFilter cleanMarkers = new NamePrefixFileFilter(Constants.IS_FINISHED_PREFIX, true); - IPathHandler moveToDone = new IntraFSPathMover(destDirectory); - IPathHandler processHandler; - if (extraCopyStoreOrNull != null) - { - IPathImmutableCopier copier = factory.getImmutableCopier(); - IPathHandler extraCopyHandler = createExtraCopyHandler(tempDir, extraCopyStoreOrNull.getPath(), copier); - processHandler = combineHandlers(extraCopyHandler, moveToDone); - } else + File[] files = FileSystemHelper.listFiles(copyInProgressDir); + if (files == null || files.length == 0) + return; // directory is empty, no recovery is needed + + for (int i = 0; i < files.length; i++) { - processHandler = moveToDone; + File file = files[i]; + recoverIncomingAfterShutdown(file, incomingStore, incomingReadOperations, copyCompleteDir); } - return new CleansingPathHandlerDecorator(cleanMarkers, processHandler); } - private static IPathHandler combineHandlers(final IPathHandler first, final IPathHandler second) + private static void recoverIncomingAfterShutdown(File unfinishedFile, FileStore incomingStore, + IReadPathOperations incomingReadOperations, File copyCompleteDir) { - return new IPathHandler() + if (CopyFinishedMarker.isMarker(unfinishedFile)) + { + File markerFile = unfinishedFile; + File localCopy = CopyFinishedMarker.extractOriginal(markerFile); + if (localCopy.exists()) + { + // copy and marker exist - do nothing, recovery will be done for copied resource + } else + { + // copy finished, resource moved, but marker was not deleted + markerFile.delete(); + } + } else + // handle local copy + { + File localCopy = unfinishedFile; + File markerFile = CopyFinishedMarker.extractMarker(localCopy); + if (markerFile.exists()) + { + // copy and marker exist - copy finished, but copied resource not moved + tryMoveFromInProgressToFinished(localCopy, markerFile, copyCompleteDir); + } else + // no marker { - public boolean handle(File path) + File incomingDir = incomingStore.getPath(); + File originalInIncoming = new File(incomingDir, localCopy.getName()); + if (incomingReadOperations.exists(originalInIncoming)) + { + // partial copy - nothing to do, will be copied again + } else { - boolean firstOk = first.handle(path); - boolean secondOk = second.handle(path); - return firstOk && secondOk; + // move finished, but marker not created + tryMoveFromInProgressToFinished(localCopy, null, copyCompleteDir); } - }; + } + } } - private static IPathHandler createExtraCopyHandler(final File tempDir, final File finalDir, - final IPathImmutableCopier copier) + // schedule processing of all resources which were previously copied + private static void recoverIncomingCopyComplete(File copyCompleteDir, LazyPathHandler localProcessor) { - // making a copy can take some time, so we do that in the temporary directory. Than we move it from temporary - // the final destination. In this way external process can start moving data from final destination as soon as - // they appear there. - final IPathHandler moveToFinal = new IntraFSPathMover(finalDir); + File[] files = FileSystemHelper.listFiles(copyCompleteDir); + if (files == null || files.length == 0) + return; // directory is empty, no recovery is needed + for (int i = 0; i < files.length; i++) + { + localProcessor.handle(files[i]); + } + } + + private IPathHandler createIncomingMovingPathHandler(final String sourceHostOrNull, + final LazyPathHandler localProcessor, final boolean isIncomingRemote) + { return new IPathHandler() { - public boolean handle(File path) + public boolean handle(File sourceFile) { - File copiedFile = copier.tryCopy(path, tempDir); - if (copiedFile == null) + if (isIncomingRemote) { - return false; + return moveFromRemoteIncoming(sourceFile, sourceHostOrNull, localProcessor); + } else + { + return moveFromLocalIncoming(sourceFile, localProcessor); } - return moveToFinal.handle(copiedFile); } }; } - private IPathHandler createPathMoverToLocal(String sourceHost, final File localDestDir) + private boolean moveFromLocalIncoming(File source, LazyPathHandler localProcessor) + { + File finalFile = tryMoveLocal(source, bufferDirs.getCopyCompleteDir()); + if (finalFile == null) + return false; + return localProcessor.handle(finalFile); + } + + private boolean moveFromRemoteIncoming(File source, String sourceHostOrNull, LazyPathHandler localProcessor) + { + // 1. move from incoming: copy, delete, create copy-finished-marker + File copyInProgressDir = bufferDirs.getCopyInProgressDir(); + boolean ok = moveFromRemoteToLocal(source, sourceHostOrNull, copyInProgressDir); + if (ok == false) + return false; + File copiedFile = new File(copyInProgressDir, source.getName()); + assert copiedFile.exists(); + File markerFile = CopyFinishedMarker.extractMarker(copiedFile); + assert markerFile.exists(); + + // 2. Move to final directory, delete marker + File finalFile = tryMoveFromInProgressToFinished(copiedFile, markerFile, bufferDirs.getCopyCompleteDir()); + if (finalFile == null) + return false; + + // 3. schedule local processing, always successful + localProcessor.handle(finalFile); + return true; + } + + private static File tryMoveFromInProgressToFinished(File copiedFile, File markerFileOrNull, File copyCompleteDir) { - if (parameters.getTreatIncomingAsRemote()) + File finalFile = tryMoveLocal(copiedFile, copyCompleteDir); + if (finalFile != null) { - return createRemotePathMover(sourceHost, localDestDir, /* local host */null); + if (markerFileOrNull != null) + { + markerFileOrNull.delete(); // process even if marker file could not be deleted + } + return finalFile; } else { - return new IntraFSPathMover(localDestDir); + return null; } } - private IPathHandler createRemotePathMover(String sourceHost, File destinationDirectory, String destinationHost) + private boolean moveFromRemoteToLocal(File source, String sourceHostOrNull, File localDestDir) { - IPathCopier copier = factory.getCopier(destinationDirectory); - CopyActivityMonitor monitor = new CopyActivityMonitor(destinationDirectory, factory, copier, parameters); - IPathRemover remover = factory.getRemover(); - return new RemotePathMover(destinationDirectory, destinationHost, monitor, remover, copier, sourceHost, - parameters); + return createRemotePathMover(sourceHostOrNull, localDestDir, null).handle(source); } - private void startupOutgoingMovingProcess(File srcDir, FileStore destDir) + private static File tryMoveLocal(File sourceFile, File destinationDir) { - final File outgoingDirectory = destDir.getPath(); - final String outgoingHost = destDir.getHost(); - final IPathHandler remoteMover = createRemotePathMover(null, outgoingDirectory, outgoingHost); - final DirectoryScanningTimerTask remoteMovingTask = - new DirectoryScanningTimerTask(srcDir, new NamePrefixFileFilter(Constants.IS_FINISHED_PREFIX, false), - remoteMover); - final Timer remoteMovingTimer = new Timer("Remote Mover"); - - // Implementation notes: - // The startup of the remote moving task is delayed for half the time of the check interval. Thus the - // incoming - // moving task should have enough time to finish its job. - schedule(remoteMovingTimer, remoteMovingTask, parameters.getCheckIntervalMillis() / 2, parameters - .getCheckIntervalMillis(), true); + return FileSystemHelper.tryMoveLocal(sourceFile, destinationDir); } - private void schedule(Timer timer, TimerTask task, long delay, long period, boolean isRemote) + // -------------------------- + + private LazyPathHandler startupOutgoingMovingProcess(FileStore outputDir) { - // The remote moving task is scheduled at fixed rate. The rationale behind this is that if new items are - // added to the source directory while the remote timer task has been running for a long time, busy moving data - // to or from - // remote, the task shoulnd't sit idle for the check time when there is actually work to do. - if (isRemote) - { - timer.scheduleAtFixedRate(task, delay, period); - } else - { - timer.schedule(task, delay, period); - } + final File outgoingDirectory = outputDir.getPath(); + final String outgoingHost = outputDir.getHost(); + final IPathHandler remoteMover = createRemotePathMover(null, outgoingDirectory, outgoingHost); + return LazyPathHandler.create(remoteMover, "Final Destination Mover"); } - private static File ensureDirectoryExists(File dir, String newDirName) + private IPathHandler createRemotePathMover(String sourceHost, File destinationDirectory, String destinationHost) { - File dataDir = new File(dir, newDirName); - if (dataDir.exists() == false) - { - if (dataDir.mkdir() == false) - throw new EnvironmentFailureException("Could not create local data directory " + dataDir); - } - return dataDir; + IPathCopier copier = factory.getCopier(destinationDirectory); + CopyActivityMonitor monitor = + new CopyActivityMonitor(destinationDirectory, factory.getReadAccessor(), copier, parameters); + IPathRemover remover = factory.getRemover(); + return new RemotePathMover(destinationDirectory, destinationHost, monitor, remover, copier, sourceHost, + parameters); } } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java index 932d53c612685e5e9a665e417d2ef15a0d6a5fab..9b1b5a9bc083435fba5859f5802e99013959989f 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java @@ -38,6 +38,7 @@ import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.BuildAndEnvironmentInfo; import ch.systemsx.cisd.common.utilities.IExitHandler; import ch.systemsx.cisd.common.utilities.SystemExit; +import ch.systemsx.cisd.datamover.intf.ITimingParameters; /** * The class to process the command line parameters. @@ -224,11 +225,6 @@ public class Parameters implements ITimingParameters */ private final FileStore manualInterventionStore; - /** - * The store where we create an additional copy of incoming data (optional). - */ - private final FileStore extraCopyStoreOrNull; - /** * The regular expression to use for deciding whether a path in the incoming directory needs manual intervention. */ @@ -322,13 +318,6 @@ public class Parameters implements ITimingParameters bufferStore = new FileStore(bufferDirectory, "buffer", null, false); manualInterventionStore = new FileStore(manualInterventionDirectory, "manual intervention", null, false); outgoingStore = new FileStore(outgoingDirectory, "outgoing", outgoingHost, true); - if (extraCopyDirectory != null) - { - extraCopyStoreOrNull = new FileStore(extraCopyDirectory, "extra-copy", null, false); - } else - { - extraCopyStoreOrNull = null; - } } catch (Exception ex) { outputException(ex); @@ -558,12 +547,12 @@ public class Parameters implements ITimingParameters } /** - * @return The store where we create an additional copy of incoming data or <code>null</code> if it is not + * @return The directory where we create an additional copy of incoming data or <code>null</code> if it is not * specified. Note that this directory needs to be on the same file system as {@link #getBufferStore}. */ - public FileStore tryGetExtraCopyStore() + public File tryGetExtraCopyDir() { - return extraCopyStoreOrNull; + return extraCopyDirectory; } /** diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java b/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java index 60b43f805573bdf0441e81d9256d6eefe9f8cdd6..83423268ee2e46de4720359fb648b2169575fc04 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java @@ -19,6 +19,9 @@ package ch.systemsx.cisd.datamover; import java.io.File; import java.io.FileFilter; +import ch.systemsx.cisd.datamover.intf.IReadPathOperations; +import ch.systemsx.cisd.datamover.intf.ITimingParameters; + /** * A {@link FileFilter} that picks all entries that haven't been changed for longer than some given quiet period. * @@ -29,30 +32,28 @@ public class QuietPeriodFileFilter implements FileFilter private final long quietPeriodMillis; - private final IPathLastChangedChecker checker; + private final IReadPathOperations readOperations; /** * Creates a <var>QuietPeriodFileFilter</var>. * * @param timingParameters The timing paramter object to get the quiet period from. - * @param factory The factory object to create implementation to use to check when a pathname was - * changed. + * @param readOperations Used to check when a pathname was changed. */ - public QuietPeriodFileFilter(ITimingParameters timingParameters, IFileSysOperationsFactory factory) + public QuietPeriodFileFilter(ITimingParameters timingParameters, IReadPathOperations readOperations) { assert timingParameters != null; - assert factory != null; + assert readOperations != null; - quietPeriodMillis = timingParameters.getQuietPeriodMillis(); - checker = factory.getChecker(); + this.quietPeriodMillis = timingParameters.getQuietPeriodMillis(); + this.readOperations = readOperations; assert quietPeriodMillis > 0; - assert checker != null; } public boolean accept(File pathname) { - return (System.currentTimeMillis() - checker.lastChanged(pathname)) > quietPeriodMillis; + return (System.currentTimeMillis() - readOperations.lastChanged(pathname)) > quietPeriodMillis; } } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java index ce42a2e65bd1bbb6f002b09a3dd6517fe2c3d561..62752cb969fc9035abcad6740cf7dd6c20371f4a 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java @@ -21,13 +21,16 @@ import java.io.IOException; import org.apache.log4j.Logger; -import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.StatusFlag; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.datamover.helper.CopyFinishedMarker; +import ch.systemsx.cisd.datamover.intf.IPathCopier; +import ch.systemsx.cisd.datamover.intf.IPathRemover; +import ch.systemsx.cisd.datamover.intf.ITimingParameters; /** * A class that moves files and directories to remote directories. This class monitors the copy process and, if @@ -71,7 +74,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa private final IPathCopier copier; private final IPathRemover remover; - + private final String sourceHost; private final CopyActivityMonitor monitor; @@ -89,7 +92,8 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa * @param monitor The activity monitor to inform about actions. * @param remover Allows to remove files. * @param copier Allows to copy files - * @param sourceHost The host to move paths from, or <code>null</code>, if data will be moved from the local file system + * @param sourceHost The host to move paths from, or <code>null</code>, if data will be moved from the local file + * system * @param timingParameters The timing parametes used for monitoring and reporting stall situations. */ public RemotePathMover(File destinationDirectory, String destinationHost, CopyActivityMonitor monitor, @@ -198,7 +202,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa private boolean markAsFinishedLocal(File path) { - final File markFile = new File(destinationDirectory, Constants.IS_FINISHED_PREFIX + path.getName()); + final File markFile = new File(destinationDirectory, CopyFinishedMarker.getMarkerName(path.getName())); try { markFile.createNewFile(); @@ -217,7 +221,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa private boolean markAsFinishedRemote(File path) { - final File markFile = new File(path.getParent(), Constants.IS_FINISHED_PREFIX + path.getName()); + final File markFile = new File(path.getParent(), CopyFinishedMarker.getMarkerName(path.getName())); try { markFile.createNewFile(); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java b/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java index 46d7976180628165a18ab28292967427bd09b2d4..6fc42582dd7f54df7b8a5a4a54f80020fabb684d 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java @@ -30,6 +30,7 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.datamover.intf.IPathCopier; /** * A class that can perform a self test of the data mover. diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMaker.java b/datamover/source/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMaker.java index a58255acb569129f2af0602fb667ab0d7cfa201b..b6447b531ebf7969252043c010140dfbf214f2cc 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMaker.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMaker.java @@ -25,8 +25,8 @@ import org.apache.log4j.Logger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.OSUtilities; -import ch.systemsx.cisd.datamover.IPathImmutableCopier; import ch.systemsx.cisd.datamover.helper.CmdLineHelper; +import ch.systemsx.cisd.datamover.intf.IPathImmutableCopier; /** * Utility to create a hard link of a file or copy recursively a directories structure, creating a hard link for each diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/helper/CopyFinishedMarker.java b/datamover/source/java/ch/systemsx/cisd/datamover/helper/CopyFinishedMarker.java new file mode 100644 index 0000000000000000000000000000000000000000..11e57163190d5d1869025eb8c618bd0ef107cc75 --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/helper/CopyFinishedMarker.java @@ -0,0 +1,52 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.helper; + +import java.io.File; + +import ch.systemsx.cisd.common.Constants; +import ch.systemsx.cisd.common.utilities.FileUtilities; + +/** + * Manipulations on the file which is used as a marker of finished copy operation. + * + * @author Tomasz Pylak on Aug 27, 2007 + */ +public class CopyFinishedMarker +{ + + public static String getMarkerName(String originalFileName) + { + return Constants.IS_FINISHED_PREFIX + originalFileName; + } + + public static boolean isMarker(File file) + { + return file.getName().startsWith(Constants.IS_FINISHED_PREFIX); + } + + public static File extractOriginal(File markerFile) + { + assert isMarker(markerFile); + return FileUtilities.removePrefixFromFileName(markerFile, Constants.IS_FINISHED_PREFIX); + } + + public static File extractMarker(File originalFile) + { + return new File(originalFile.getParent(), getMarkerName(originalFile.getName())); + } +} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/helper/FileSystemHelper.java b/datamover/source/java/ch/systemsx/cisd/datamover/helper/FileSystemHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..4f33ca6565ef13a7f380a7a8533d67f6eb8300e5 --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/helper/FileSystemHelper.java @@ -0,0 +1,109 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.helper; + +import java.io.File; +import java.io.FileFilter; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; +import ch.systemsx.cisd.common.logging.ISimpleLogger; +import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.common.utilities.IntraFSPathMover; +import ch.systemsx.cisd.datamover.LocalProcessorHandler; +import ch.systemsx.cisd.datamover.intf.IReadPathOperations; + +/** + * Basic file system operations helper. + * + * @author Tomasz Pylak on Aug 27, 2007 + */ +public class FileSystemHelper +{ + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, LocalProcessorHandler.class); + + public static File ensureDirectoryExists(File dir, String newDirName) + { + File dataDir = new File(dir, newDirName); + if (dataDir.exists() == false) + { + if (dataDir.mkdir() == false) + throw new EnvironmentFailureException("Could not create directory " + dataDir); + } + return dataDir; + } + + /** + * Lists all resources in a given directory, logs errors. + */ + public static File[] listFiles(File directory) + { + final ISimpleLogger errorLogger = new Log4jSimpleLogger(Level.ERROR, operationLog); + /** + * Lists all resources in a given directory, logs errors. + */ + FileFilter acceptAll = new FileFilter() + { + public boolean accept(File file) + { + return true; + } + }; + return FileUtilities.listFiles(directory, acceptAll, errorLogger); + } + + public static IReadPathOperations createPathReadOperations() + { + return new IReadPathOperations() + { + + public boolean exists(File file) + { + return file.exists(); + } + + public long lastChanged(File path) + { + return FileUtilities.lastChanged(path); + } + + public File[] listFiles(File directory, FileFilter filter, ISimpleLogger loggerOrNull) + { + return FileUtilities.listFiles(directory, filter, loggerOrNull); + } + + public File[] listFiles(File directory, ISimpleLogger logger) + { + return FileSystemHelper.listFiles(directory); + } + }; + } + + /** moves source file to destination directory */ + public static File tryMoveLocal(File sourceFile, File destinationDir) + { + boolean ok = new IntraFSPathMover(destinationDir).handle(sourceFile); + if (!ok) + return null; + return new File(destinationDir, sourceFile.getName()); + } +} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IFileSysOperationsFactory.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysOperationsFactory.java similarity index 91% rename from datamover/source/java/ch/systemsx/cisd/datamover/IFileSysOperationsFactory.java rename to datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysOperationsFactory.java index d1f7cef2e50eb1fda1832b6965d99ea9f559a586..88702d5f13d652233b93051ee7b52ec848880ae9 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IFileSysOperationsFactory.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysOperationsFactory.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ch.systemsx.cisd.datamover; +package ch.systemsx.cisd.datamover.intf; import java.io.File; - /** * A role that provides access to the roles which perform file system operations. * @@ -31,6 +30,5 @@ public interface IFileSysOperationsFactory public IPathRemover getRemover(); - public IPathLastChangedChecker getChecker(); - + public IReadPathOperations getReadAccessor(); } \ No newline at end of file diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathCopier.java similarity index 98% rename from datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java rename to datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathCopier.java index b31576df50a39c083e21b85ece6d143cb0fc9866..245b5d473f75039804a6b8c9ef4f5ae151f4601c 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathCopier.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package ch.systemsx.cisd.datamover; +package ch.systemsx.cisd.datamover.intf; import java.io.File; diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IPathImmutableCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathImmutableCopier.java similarity index 96% rename from datamover/source/java/ch/systemsx/cisd/datamover/IPathImmutableCopier.java rename to datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathImmutableCopier.java index afc0ff03c69efd8cc3097c70d4905ba65185277a..aeddc25649aae710945a5538dd88cb779ce8074b 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IPathImmutableCopier.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathImmutableCopier.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package ch.systemsx.cisd.datamover; +package ch.systemsx.cisd.datamover.intf; import java.io.File; diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IPathRemover.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathRemover.java similarity index 96% rename from datamover/source/java/ch/systemsx/cisd/datamover/IPathRemover.java rename to datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathRemover.java index 0ce9795e165db933ce3b3071af03660abbbff303..a79ae7e9bbf0f34fad68cd155c6ee02c1ab65486 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IPathRemover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IPathRemover.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package ch.systemsx.cisd.datamover; +package ch.systemsx.cisd.datamover.intf; import java.io.File; diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/intf/IReadPathOperations.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IReadPathOperations.java new file mode 100644 index 0000000000000000000000000000000000000000..424787482db235894b2830690f3e12f00d76e425 --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IReadPathOperations.java @@ -0,0 +1,52 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.intf; + +import java.io.File; +import java.io.FileFilter; + +import ch.systemsx.cisd.common.logging.ISimpleLogger; + +/** + * Allows for checking the status of the file system. Performs only read-only operations. + * + * @author Tomasz Pylak on Aug 27, 2007 + */ +public interface IReadPathOperations +{ + /** + * lists files in a given directory matching a given filter + * + * @param loggerOrNull logger, if <code>null</code> than no logging occurs + */ + File[] listFiles(File directory, FileFilter filter, ISimpleLogger loggerOrNull); + + /** lists all files in a given directory, logs errors */ + File[] listFiles(File directory, ISimpleLogger logger); + + /** checks if a file exists */ + boolean exists(File file); + + /** + * Returns the last time when there was a write access to <var>resource</var>. + * + * @param resource The path to check for last write activity. + * @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was last changed. + */ + long lastChanged(File resource); + +} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/ITimingParameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/ITimingParameters.java similarity index 97% rename from datamover/source/java/ch/systemsx/cisd/datamover/ITimingParameters.java rename to datamover/source/java/ch/systemsx/cisd/datamover/intf/ITimingParameters.java index 9b0f56e5bfabc02a90164b8830ddf650ed63be5a..e8ea301c2cdf78e38afc78f04775bdf102088955 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/ITimingParameters.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/ITimingParameters.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ch.systemsx.cisd.datamover; +package ch.systemsx.cisd.datamover.intf; /** * A role that provides the parameters regarding time intervals to be used in the operation. diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java index e39c43b6dabaffb048745a9c16531b8398d5d8ba..c46e1341066421efd2441aafd886c622d58e39d4 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java @@ -36,8 +36,8 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.OSUtilities; -import ch.systemsx.cisd.datamover.IPathCopier; import ch.systemsx.cisd.datamover.helper.CmdLineHelper; +import ch.systemsx.cisd.datamover.intf.IPathCopier; import ch.systemsx.cisd.datamover.rsync.RsyncVersionChecker.RsyncVersion; /** diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java index 71fbd6ead01ff3e3c0663ab90d5d51300c3dff3f..6f9e5f32028b7bd7e04e5368e0b82fd5764ae5f7 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java @@ -34,8 +34,8 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.OSUtilities; -import ch.systemsx.cisd.datamover.IPathCopier; import ch.systemsx.cisd.datamover.helper.CmdLineHelper; +import ch.systemsx.cisd.datamover.intf.IPathCopier; /** * A class that encapsulates the <code>xcopy</code> call for doing an archive copy. diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CleansingPathHandlerDecoratorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CleansingPathHandlerDecoratorTest.java deleted file mode 100644 index 38f53b85b3a7c7712830a51477dae06e404e4e2e..0000000000000000000000000000000000000000 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CleansingPathHandlerDecoratorTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; - -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import ch.systemsx.cisd.common.logging.LogInitializer; -import ch.systemsx.cisd.common.utilities.FileUtilities; -import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IPathHandler; - - -/** - * Test cases for the {@link CleansingPathHandlerDecorator}. - * - * @author Bernd Rinn - */ -public class CleansingPathHandlerDecoratorTest -{ - - private static final File unitTestRootDirectory = new File("targets" + File.separator + "unit-test-wd"); - - private static final File workingDirectory = new File(unitTestRootDirectory, "CleansingPathHandlerDecoratorTest"); - - @BeforeClass - public void init() - { - LogInitializer.init(); - unitTestRootDirectory.mkdirs(); - assert unitTestRootDirectory.isDirectory(); - } - - @BeforeMethod - public void setUp() - { - FileUtilities.deleteRecursively(workingDirectory); - workingDirectory.mkdirs(); - workingDirectory.deleteOnExit(); - } - - private void createStructure() throws IOException - { - final File dir1 = createDirectory("dir1"); - final File dir1a = createDirectory(dir1, "dir1a"); - final File dir1b = createDirectory(dir1, "dir1b"); - final File dir2 = createDirectory("dir2"); - createFile(dir1a, "file1a"); - createFile(dir1b, "file1b"); - createFile("file2"); - createFile("file3"); - createFile(dir2, "file4"); - } - - private File createFile(String name) throws IOException - { - return createFile(workingDirectory, name); - } - - private File createFile(File directory, String name) throws IOException - { - final File file = new File(directory, name); - file.createNewFile(); - assert file.isFile(); - file.deleteOnExit(); - return file; - } - - private File createDirectory(String name) throws IOException - { - return createDirectory(workingDirectory, name); - } - - private File createDirectory(File directory, String name) throws IOException - { - final File file = new File(directory, name); - file.mkdir(); - assert file.isDirectory(); - file.deleteOnExit(); - return file; - } - - @Test - public void testDecoratedPathHandlerDoesntSeeDeletedFile() throws IOException - { - createStructure(); - CleansingPathHandlerDecorator cph = new CleansingPathHandlerDecorator(new FileFilter() - { - public boolean accept(File pathname) - { - return pathname.getName().equals("file2"); - } - }, new IPathHandler() - { - public boolean handle(File path) - { - assert path.getName().equals("file2") == false; - return false; - } - - }); - cph.handle(workingDirectory); - } - - @Test - public void testDecoratedPathHandlerDoesntSeeDeletedDirectory() throws IOException - { - createStructure(); - CleansingPathHandlerDecorator cph = new CleansingPathHandlerDecorator(new FileFilter() - { - public boolean accept(File pathname) - { - return pathname.getName().equals("dir2"); - } - }, new IPathHandler() - { - public boolean handle(File path) - { - assert path.getName().equals("dir2") == false && path.getName().equals("file4") == false; - return false; - } - - }); - cph.handle(workingDirectory); - } - -} diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java index 9e7e52755c62f996f63e39f36a72873fda6324fb..d9167bd1c4f03762b5f0c667b247ab2e205dcbf3 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.datamover; import java.io.File; +import java.io.FileFilter; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -24,12 +25,16 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.logging.LogMonitoringAppender; import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.StoringUncaughtExceptionHandler; +import ch.systemsx.cisd.datamover.helper.FileSystemHelper; +import ch.systemsx.cisd.datamover.intf.IReadPathOperations; +import ch.systemsx.cisd.datamover.intf.ITimingParameters; /** * Test cases for the {@link CopyActivityMonitor} class. @@ -78,41 +83,39 @@ public class CopyActivityMonitorTest } } - private final class MyFileSystemFactory implements IFileSysOperationsFactory + private static class ReadOperationsOriginalImpl implements IReadPathOperations { - /** - * - */ - private final IPathLastChangedChecker checker; + private final IReadPathOperations impl; - private MyFileSystemFactory(IPathLastChangedChecker checker) + public ReadOperationsOriginalImpl() { - this.checker = checker; + this.impl = FileSystemHelper.createPathReadOperations(); } - public IPathLastChangedChecker getChecker() + public long lastChanged(File path) { - return checker; + return impl.lastChanged(path); } - public IPathCopier getCopier(File destinationDirectory) + public boolean exists(File file) { - throw new AssertionError("call not expected"); + return impl.exists(file); } - public IPathRemover getRemover() + public File[] listFiles(File directory, FileFilter filter, ISimpleLogger loggerOrNull) { - throw new AssertionError("call not expected"); + return impl.listFiles(directory, filter, loggerOrNull); } - public IPathImmutableCopier getImmutableCopier() + public File[] listFiles(File directory, ISimpleLogger logger) { - throw new AssertionError("call not expected"); + return impl.listFiles(directory, logger); } } - private final class HappyPathLastChangedChecker implements IPathLastChangedChecker + private final class HappyPathLastChangedChecker extends ReadOperationsOriginalImpl { + @Override public long lastChanged(File path) { return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2; @@ -191,12 +194,11 @@ public class CopyActivityMonitorTest { "slow" }) public void testHappyPath() throws Throwable { - final IPathLastChangedChecker checker = new HappyPathLastChangedChecker(); - final IFileSysOperationsFactory factory = new MyFileSystemFactory(checker); + final IReadPathOperations checker = new HappyPathLastChangedChecker(); final ITerminable dummyTerminable = new DummyTerminable(); final ITimingParameters parameters = new MyTimingParameters(0); final CopyActivityMonitor monitor = - new CopyActivityMonitor(workingDirectory, factory, dummyTerminable, parameters); + new CopyActivityMonitor(workingDirectory, checker, dummyTerminable, parameters); final File directory = new File(workingDirectory, "some-directory"); directory.mkdir(); directory.deleteOnExit(); @@ -209,12 +211,10 @@ public class CopyActivityMonitorTest { "slow" }) public void testCopyStalled() throws Throwable { - final IPathLastChangedChecker checker = new PathLastChangedCheckerStalled(); - final IFileSysOperationsFactory factory = new MyFileSystemFactory(checker); + final IReadPathOperations checker = new PathLastChangedCheckerStalled(); final MockTerminable copyProcess = new MockTerminable(); final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(workingDirectory, factory, copyProcess, parameters); + final CopyActivityMonitor monitor = new CopyActivityMonitor(workingDirectory, checker, copyProcess, parameters); final File file = new File(workingDirectory, "some-directory"); file.mkdir(); monitor.start(file); @@ -223,10 +223,11 @@ public class CopyActivityMonitorTest assert copyProcess.isTerminated(); } - private final class SimulateShortInterruptionChangedChecker implements IPathLastChangedChecker + private final class SimulateShortInterruptionChangedChecker extends ReadOperationsOriginalImpl { private int numberOfTimesCalled = 0; + @Override public long lastChanged(File path) { ++numberOfTimesCalled; @@ -255,12 +256,10 @@ public class CopyActivityMonitorTest { "slow" }) public void testCopySeemsStalledButActuallyIsFine() throws Throwable { - final IPathLastChangedChecker checker = new SimulateShortInterruptionChangedChecker(); - final IFileSysOperationsFactory factory = new MyFileSystemFactory(checker); + final IReadPathOperations checker = new SimulateShortInterruptionChangedChecker(); final MockTerminable copyProcess = new MockTerminable(); final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(workingDirectory, factory, copyProcess, parameters); + final CopyActivityMonitor monitor = new CopyActivityMonitor(workingDirectory, checker, copyProcess, parameters); final File file = new File(workingDirectory, "some-directory"); file.mkdir(); monitor.start(file); @@ -269,8 +268,9 @@ public class CopyActivityMonitorTest assert copyProcess.isTerminated() == false; } - private final class PathLastChangedCheckerStalled implements IPathLastChangedChecker + private final class PathLastChangedCheckerStalled extends ReadOperationsOriginalImpl { + @Override public long lastChanged(File path) { return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS * 2; @@ -284,12 +284,10 @@ public class CopyActivityMonitorTest LogMonitoringAppender appender = LogMonitoringAppender.addAppender(LogCategory.OPERATION, "Activity monitor got terminated"); LogFactory.getLogger(LogCategory.OPERATION, CopyActivityMonitor.class).addAppender(appender); - final PathLastChangedCheckerStuck checker = new PathLastChangedCheckerStuck(); - final IFileSysOperationsFactory factory = new MyFileSystemFactory(checker); + final IReadPathOperations checker = new PathLastChangedCheckerStuck(); final MockTerminable copyProcess = new MockTerminable(); final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(workingDirectory, factory, copyProcess, parameters); + final CopyActivityMonitor monitor = new CopyActivityMonitor(workingDirectory, checker, copyProcess, parameters); final File directory = new File(workingDirectory, "some-directory"); directory.mkdir(); directory.deleteOnExit(); @@ -301,10 +299,11 @@ public class CopyActivityMonitorTest appender.verifyLogHasHappened(); } - private final class PathLastChangedCheckerStuck implements IPathLastChangedChecker + private final class PathLastChangedCheckerStuck extends ReadOperationsOriginalImpl { private boolean interrupted = false; + @Override public long lastChanged(File path) { try diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/MainTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/MainTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4c4296bc63683b870aae2360eb7271954c3175b0 --- /dev/null +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/MainTest.java @@ -0,0 +1,500 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover; + +import static ch.systemsx.cisd.datamover.testhelper.FileSystemHelper.assertEmptyDir; +import static ch.systemsx.cisd.datamover.testhelper.FileSystemHelper.createDir; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.common.logging.LogInitializer; +import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.common.utilities.ITerminable; +import ch.systemsx.cisd.datamover.testhelper.FileStructEngine; + +/** + * @author Tomasz Pylak on Aug 29, 2007 + */ +public class MainTest +{ + private static final FileStructEngine DEFAULT_STRUCT = new FileStructEngine("test"); + + // time needed be a single test to complete. After this time we kill data mover and check if results are correct. It + // can happen, that tests running on slow machines fail because they need more time. Than this constant should be + // adjusted. + private static final long DATA_MOVER_COMPLETION_TIME = 4000; + + private static final int CHECK_INTERVAL = 1; + + private static final int QUIET_PERIOD = 2; + + private static final File unitTestRootDirectory = new File("targets" + File.separator + "unit-test-wd"); + + private static final File workingDirectory = new File(unitTestRootDirectory, MainTest.class.getSimpleName()); + + @BeforeClass + public void init() + { + LogInitializer.init(); + unitTestRootDirectory.mkdirs(); + assert unitTestRootDirectory.isDirectory(); + } + + @BeforeMethod + public void setUp() + { + FileUtilities.deleteRecursively(workingDirectory); + workingDirectory.mkdirs(); + } + + @AfterClass + public void clean() + { + // FileUtilities.deleteRecursively(unitTestRootDirectory); + } + + // ----------------- auxiliary data structures + + private static class ExternalDirs + { + private static final String INCOMING = "incoming"; + + private static final String BUFFER = "buffer"; + + private static final String OUTGOING = "outgoing"; + + private static final String EXTRA_COPY_DIR = "extra-copy"; + + private static final String MANUAL_INTERV_DIR = "manual-intervention"; + + public File incoming; + + public File outgoing; + + public File buffer; + + public File extraCopy; + + public File manualIntervDir; + + public ExternalDirs(File workingDirectory) throws IOException + { + incoming = createDir(workingDirectory, INCOMING); + buffer = createDir(workingDirectory, BUFFER); + outgoing = createDir(workingDirectory, OUTGOING); + extraCopy = createDir(workingDirectory, EXTRA_COPY_DIR); + manualIntervDir = createDir(workingDirectory, MANUAL_INTERV_DIR); + } + } + + // ----------------- higher level assertions + + private static void assertSampleStructMovedWithCopy(ExternalDirs dirs, LocalBufferDirs bufferDirs, + FileStructEngine structEngine) throws IOException + { + assertEmptyIncomingAndBufferDir(dirs, bufferDirs); + assertSampleStructInOutgoing(dirs, structEngine); + structEngine.assertSampleStructureExists(dirs.extraCopy); + } + + private static void assertSampleStructInOutgoing(ExternalDirs dirs, FileStructEngine structEngine) + throws IOException + { + structEngine.assertSampleStructureExists(dirs.outgoing); + structEngine.assertSampleStructFinishMarkerExists(dirs.outgoing); + } + + private static void assertSampleStructMovedWithCopy(ExternalDirs dirs, LocalBufferDirs bufferDirs) + throws IOException + { + assertEmptyIncomingAndBufferDir(dirs, bufferDirs); + assertSampleStructInOutgoing(dirs); + assertSampleStructureExists(dirs.extraCopy); + } + + private static void assertSampleStructInOutgoing(ExternalDirs dirs) throws IOException + { + assertSampleStructureExists(dirs.outgoing); + assertSampleStructFinishMarkerExists(dirs.outgoing); + // we use default structure engine, so there is exactly one directory in outgoing + the marker file + assert dirs.outgoing.list().length == 2; + } + + private static void assertEmptyIncomingAndBufferDir(ExternalDirs dirs, LocalBufferDirs bufferDirs) + { + assertEmptyDir(dirs.incoming); + assertEmptyBufferDirs(bufferDirs); + } + + private static void assertEmptyBufferDirs(LocalBufferDirs dirs) + { + assertEmptyDir(dirs.getCopyCompleteDir()); + assertEmptyDir(dirs.getCopyInProgressDir()); + assertEmptyDir(dirs.getReadyToMoveDir()); + assertEmptyDir(dirs.getTempDir()); + } + + private static void createSampleStructure(File parentDir) throws IOException + { + DEFAULT_STRUCT.createSampleStructure(parentDir); + } + + private static void createPartialSampleStructure(File parentDir) throws IOException + { + DEFAULT_STRUCT.createPartialSampleStructure(parentDir); + } + + private static void createSampleMarkerFile(File parentDir) + { + DEFAULT_STRUCT.createSampleMarkerFile(parentDir); + } + + private static void assertSampleStructureExists(File parentDir) throws IOException + { + DEFAULT_STRUCT.assertSampleStructureExists(parentDir); + } + + private static void assertSampleStructFinishMarkerExists(File parentDir) + { + DEFAULT_STRUCT.assertSampleStructFinishMarkerExists(parentDir); + } + + // -------------------- testing engine and configuration + + private static void assertNumberOfResources(File file, int size) + { + assert file.list().length == size; + } + + private static ArrayList<String> getDefaultParameters(ExternalDirs dirs) + { + return createList("--incoming-dir", dirs.incoming.getPath(), "--buffer-dir", dirs.buffer.getPath(), + "--outgoing-dir", dirs.outgoing.getPath(), "--extra-copy-dir", dirs.extraCopy.getPath(), + "--check-interval", Integer.toString(CHECK_INTERVAL), "--quiet-period", Integer.toString(QUIET_PERIOD), + "--treat-incoming-as-remote"); + } + + private static ArrayList<String> getManualInterventionParameters(ExternalDirs dirs, String filteredName) + { + return createList("--manual-intervention-dir", dirs.manualIntervDir.getPath(), "--manual-intervention-regex", + filteredName); + } + + private ArrayList<String> getCleansingParameters(ExternalDirs dirs, String cleansingStruct) + { + return createList("--cleansing-regex", cleansingStruct); + } + + private static String[] asStringArray(ArrayList<String> list) + { + return list.toArray(new String[] {}); + } + + private static ArrayList<String> createList(String... args) + { + return new ArrayList<String>(Arrays.asList(args)); + } + + private static Parameters createParameters(ArrayList<String> args) + { + Parameters parameters = new Parameters(asStringArray(args)); + parameters.log(); + return parameters; + } + + private static Parameters createDefaultParameters(ExternalDirs dirs) + { + return createParameters(getDefaultParameters(dirs)); + } + + private Parameters createParametersWithFilter(ExternalDirs dirs, String manualIntervName, String cleansingStruct) + { + ArrayList<String> list = getDefaultParameters(dirs); + list.addAll(getManualInterventionParameters(dirs, manualIntervName)); + list.addAll(getCleansingParameters(dirs, cleansingStruct)); + return createParameters(list); + } + + private static LocalBufferDirs createBufferDirs(Parameters parameters) + { + return new LocalBufferDirs(parameters, "test-copy-in-progress", "test-copy-complete", "test-ready-to-move", + "test-temp"); + } + + private static interface IFSPreparator + { + // prepares structure in the file system to mimic possible state of directories before data mover is launched + void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception; + } + + private static void performGenericTest(IFSPreparator preparator) throws Exception + { + ExternalDirs dirs = new ExternalDirs(workingDirectory); + Parameters parameters = createDefaultParameters(dirs); + LocalBufferDirs bufferDirs = createBufferDirs(parameters); + + preparator.prepareState(dirs, bufferDirs); + + runDataMover(parameters, bufferDirs); + + assertSampleStructMovedWithCopy(dirs, bufferDirs); + } + + private static void runDataMover(Parameters parameters, LocalBufferDirs bufferDirs) throws InterruptedException + { + ITerminable terminable = Main.startupServer(parameters, bufferDirs); + Thread.sleep(DATA_MOVER_COMPLETION_TIME); + assert terminable.terminate(); + } + + // --------------------- tests + + @Test(groups = + { "slow" }) + // recovery after copy from incoming has been done, but no marker exists nor the source was not deleted + public void testRecoveryIncomingCopiedNotDeleted() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(dirs.incoming); + createSampleStructure(bufferDirs.getCopyInProgressDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure during coping from incoming + public void testRecoveryIncomingPartialCopy() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(dirs.incoming); + createPartialSampleStructure(bufferDirs.getCopyInProgressDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after data from incoming has been moved, but no marker was created yet + public void testRecoveryIncomingNoMarkFile() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getCopyInProgressDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure before data are moved to 'copy-completed' + public void testRecoveryIncomingCompleteNotMoved() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getCopyInProgressDir()); + createSampleMarkerFile(bufferDirs.getCopyInProgressDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure when data are moved to 'copy-completed', but before the marker in in-progress is deleted + public void testRecoveryIncomingCompleteMarkerNotDeleted() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getCopyCompleteDir()); + createSampleMarkerFile(bufferDirs.getCopyInProgressDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure when incoming has finished processing and local processing has not started + public void testRecoveryLocalProcessingNotStarted() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getCopyCompleteDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure during local processing when extra copy in temp dir is partial + public void testRecoveryLocalProcessingPartialExtraCopyInTmp() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getCopyCompleteDir()); + createPartialSampleStructure(bufferDirs.getTempDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure during local processing, extra copy created in temp-dir, data moved to read-to-move + public void testRecoveryLocalProcessingExtraCopyInTmp() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getTempDir()); + createSampleStructure(bufferDirs.getReadyToMoveDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure during local processing, extra copy created in temp-dir, data moved to read-to-move, + // outgoing processing has not started. It tests also outgoing process recovery. + public void testRecoveryLocalProcessingExtraCopyInTmpAndReadyToMove() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(bufferDirs.getTempDir()); + createSampleStructure(bufferDirs.getReadyToMoveDir()); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure when partial copy has been done in outgoing + public void testRecoveryOutgoingPartialCopy() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(dirs.extraCopy); + createSampleStructure(bufferDirs.getReadyToMoveDir()); + createPartialSampleStructure(dirs.outgoing); + } + }); + } + + @Test(groups = + { "slow" }) + // some data are in incoming, test the whole pipeline + public void testWholePipeline() throws Exception + { + performGenericTest(new IFSPreparator() + { + public void prepareState(ExternalDirs dirs, LocalBufferDirs bufferDirs) throws Exception + { + createSampleStructure(dirs.incoming); + } + }); + } + + @Test(groups = + { "slow" }) + // recovery after failure when partial copy has been done in outgoing + public void testRecoveryAllThreadsPartial() throws Exception + { + ExternalDirs dirs = new ExternalDirs(workingDirectory); + Parameters parameters = createDefaultParameters(dirs); + LocalBufferDirs bufferDirs = createBufferDirs(parameters); + + // incoming recovery: structure 1 partial in in-progress + FileStructEngine struct1 = new FileStructEngine("test1"); + struct1.createSampleStructure(dirs.incoming); + struct1.createPartialSampleStructure(bufferDirs.getCopyInProgressDir()); + + // local processing recovery: structure 2 in copy-complete + FileStructEngine struct2 = new FileStructEngine("test2"); + struct2.createSampleStructure(bufferDirs.getCopyCompleteDir()); + + // outgoing and local processing recovery: structure 3 in ready-to-move and temp + FileStructEngine struct3 = new FileStructEngine("test3"); + struct3.createSampleStructure(bufferDirs.getReadyToMoveDir()); + struct3.createSampleStructure(bufferDirs.getTempDir()); + + runDataMover(parameters, bufferDirs); + + assertSampleStructMovedWithCopy(dirs, bufferDirs, struct1); + assertSampleStructMovedWithCopy(dirs, bufferDirs, struct2); + assertSampleStructMovedWithCopy(dirs, bufferDirs, struct3); + assertNumberOfResources(dirs.outgoing, 6); + } + + @Test(groups = + { "slow" }) + // some data are in incoming, test the whole pipeline taking manual intervention and cleansing scenario into account + public void testWholePipelineCleansingManualIntervention() throws Exception + { + ExternalDirs dirs = new ExternalDirs(workingDirectory); + + // this structure should end up in manual-intervention-directory + FileStructEngine manualIntervStruct = new FileStructEngine("test1"); + // this structure should be deleted + FileStructEngine cleansinStruct = new FileStructEngine("test2"); + + Parameters parameters = + createParametersWithFilter(dirs, manualIntervStruct.getMainStructName(), cleansinStruct + .getSampleCleansingRegExp()); + LocalBufferDirs bufferDirs = createBufferDirs(parameters); + + manualIntervStruct.createSampleStructure(dirs.incoming); + cleansinStruct.createSampleStructure(dirs.incoming); + + runDataMover(parameters, bufferDirs); + + cleansinStruct.assertSampleStructureCleaned(dirs.outgoing); + assertNumberOfResources(dirs.outgoing, 2); + + cleansinStruct.assertSampleStructureCleaned(dirs.extraCopy); + assertNumberOfResources(dirs.extraCopy, 1); + + manualIntervStruct.assertSampleStructureExists(dirs.manualIntervDir); + assertNumberOfResources(dirs.manualIntervDir, 1); + } +} diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java index 48784845d9cf1c30a20d2fea14104643eab52351..664cece9056bd1c6811ab36d3b1720d8aadd7b1d 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java @@ -239,7 +239,7 @@ public class ParametersTest assertEquals(LOCAL_TEMPDIR, parameters.getBufferStore().getPath().getPath()); assertEquals(REMOTE_DATADIR, parameters.getOutgoingStore().getPath().getPath()); assertEquals(REMOTE_HOST, parameters.getOutgoingStore().getHost()); - assertEquals(EXTRA_COPY_DIR, parameters.tryGetExtraCopyStore().getPath().getPath()); + assertEquals(EXTRA_COPY_DIR, parameters.tryGetExtraCopyDir().getPath()); assertEquals(1000 * CHECK_INTERVAL, parameters.getCheckIntervalMillis()); assertEquals(1000 * QUIET_PERIOD, parameters.getQuietPeriodMillis()); assertEquals(true, parameters.getTreatIncomingAsRemote()); diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java index 32232f892fa6fbc793873585241ae5a18d9bfd80..2f7bd15f546797b55a247b81b969ced52e71e118 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java @@ -26,6 +26,7 @@ import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.utilities.FileUtilities; +import ch.systemsx.cisd.datamover.intf.IPathCopier; /** * Test cases for the {@link SelfTest}. diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMakerTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMakerTest.java index 8f8b359046c8d3a858aa5ed2f315b1c08e77a291..201a5f6c814b51136dd791ac4ae5eac4cb17194b 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMakerTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/hardlink/RecursiveHardLinkMakerTest.java @@ -31,7 +31,7 @@ import org.testng.annotations.Test; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.utilities.CollectionIO; import ch.systemsx.cisd.common.utilities.FileUtilities; -import ch.systemsx.cisd.datamover.IPathImmutableCopier; +import ch.systemsx.cisd.datamover.intf.IPathImmutableCopier; /** * Test cases for the {@link RecursiveHardLinkMaker}. diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/testhelper/FileStructEngine.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/testhelper/FileStructEngine.java new file mode 100644 index 0000000000000000000000000000000000000000..47c147b168513c6b07df7f7a22dcf028fe6c8999 --- /dev/null +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/testhelper/FileStructEngine.java @@ -0,0 +1,120 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.testhelper; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static ch.systemsx.cisd.datamover.testhelper.FileSystemHelper.*; +import ch.systemsx.cisd.common.utilities.CollectionIO; +import ch.systemsx.cisd.datamover.helper.CopyFinishedMarker; + +/** + * Immutable helper for creating a sample directory structure and manipulating on it. + * + * @author Tomasz Pylak on Aug 29, 2007 + */ +public class FileStructEngine +{ + private static final String SAMPLE_FILE1 = "f1"; + + private static final String SAMPLE_FILE2 = "f2"; + + private final String sampleMovedDir; + + public FileStructEngine(String name) + { + this.sampleMovedDir = name + "_dir"; + } + + public String getMainStructName() + { + return sampleMovedDir; + } + + public String getSampleCleansingRegExp() + { + return SAMPLE_FILE1; + } + + public void assertSampleStructureCleaned(File parentDir) throws IOException + { + File dirInOutgoing = assertDirExists(parentDir, sampleMovedDir); + assertSampleFileContent(dirInOutgoing, SAMPLE_FILE2); + } + + public void createSampleStructure(File parentDir) throws IOException + { + File dir1 = createDir(parentDir, sampleMovedDir); + createSampleFile(dir1, SAMPLE_FILE1); + createSampleFile(dir1, SAMPLE_FILE2); + } + + public void createPartialSampleStructure(File parentDir) throws IOException + { + File dir1 = createDir(parentDir, sampleMovedDir); + createSampleFile(dir1, SAMPLE_FILE1); + } + + public void createSampleMarkerFile(File parentDir) + { + createEmptyFile(createMarkerFile(parentDir, sampleMovedDir)); + } + + private static File createMarkerFile(File parentDir, String originalName) + { + return new File(parentDir, CopyFinishedMarker.getMarkerName(originalName)); + } + + private static List<String> createSampleFileContent() + { + String[] lines = new String[] + { "test line 1", "test line 2", "test line 3" }; + List<String> lineList = Arrays.asList(lines); + return lineList; + } + + private static File createSampleFile(File dir, String name) + { + return createFile(dir, name, createSampleFileContent()); + } + + private static void assertSampleFileContent(File dir, String fileName) + { + File file = new File(dir, fileName); + assert file.isFile(); + List<String> lineList = new ArrayList<String>(); + assert CollectionIO.readCollection(file, lineList); + assert lineList.equals(createSampleFileContent()); + } + + public void assertSampleStructureExists(File parentDir) throws IOException + { + File dirInOutgoing = assertDirExists(parentDir, sampleMovedDir); + assertSampleFileContent(dirInOutgoing, SAMPLE_FILE1); + assertSampleFileContent(dirInOutgoing, SAMPLE_FILE2); + } + + public void assertSampleStructFinishMarkerExists(File parentDir) + { + File marker = createMarkerFile(parentDir, sampleMovedDir); + assert marker.exists(); + } +} diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/testhelper/FileSystemHelper.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/testhelper/FileSystemHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..c10af4715027ea70b74c7e0090e6663d434b893d --- /dev/null +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/testhelper/FileSystemHelper.java @@ -0,0 +1,71 @@ +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.testhelper; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import ch.systemsx.cisd.common.utilities.CollectionIO; + +/** + * Helper for files and directories manipulations. + * + * @author Tomasz Pylak on Aug 29, 2007 + */ +public class FileSystemHelper +{ + public static File createFile(File dir, String name, List<String> lines) + { + File file = new File(dir, name); + createFile(file, lines); + return file; + } + + private static void createFile(File file, List<String> lines) + { + CollectionIO.writeIterable(file, lines); + } + + public static void createEmptyFile(File file) + { + CollectionIO.writeIterable(file, new ArrayList<String>()); + } + + public static File createDir(File directory, String name) throws IOException + { + final File file = new File(directory, name); + file.mkdir(); + assert file.isDirectory(); + file.deleteOnExit(); + return file; + } + + public static File assertDirExists(File parentDir, String dirName) + { + File dir = new File(parentDir, dirName); + assert dir.isDirectory(); + return dir; + } + + public static void assertEmptyDir(File dir) + { + assert dir.isDirectory(); + assert dir.list().length == 0; + } +}