From b259b6a4abac59577eff1787bd5e9ba329d91184 Mon Sep 17 00:00:00 2001
From: tpylak <tpylak>
Date: Wed, 15 Aug 2007 09:23:25 +0000
Subject: [PATCH] DMV-2 incomming directory can be located on a remote share

SVN: 1375
---
 datamover/dist/etc/service.properties         |   4 +-
 .../cisd/datamover/CopyActivityMonitor.java   |   2 +-
 ...ns.java => IFileSysOperationsFactory.java} |   6 +-
 .../systemsx/cisd/datamover/IPathCopier.java  |   7 +-
 .../java/ch/systemsx/cisd/datamover/Main.java |  34 ++--
 .../cisd/datamover/MonitorStarter.java        | 174 ++++++++++++++----
 .../systemsx/cisd/datamover/Parameters.java   |  67 +++++--
 .../cisd/datamover/QuietPeriodFileFilter.java |   2 +-
 .../cisd/datamover/RemotePathMover.java       |  24 ++-
 .../ch/systemsx/cisd/datamover/SelfTest.java  |  13 +-
 .../cisd/datamover/rsync/RsyncCopier.java     |  52 +++---
 .../cisd/datamover/xcopy/XcopyCopier.java     |   8 +-
 .../datamover/CopyActivityMonitorTest.java    |  14 +-
 .../cisd/datamover/ParametersTest.java        |   8 +-
 .../systemsx/cisd/datamover/SelfTestTest.java |   4 +-
 15 files changed, 288 insertions(+), 131 deletions(-)
 rename datamover/source/java/ch/systemsx/cisd/datamover/{IFileSystemOperations.java => IFileSysOperationsFactory.java} (87%)

diff --git a/datamover/dist/etc/service.properties b/datamover/dist/etc/service.properties
index 19101af51c1..1f5e05091bf 100644
--- a/datamover/dist/etc/service.properties
+++ b/datamover/dist/etc/service.properties
@@ -13,10 +13,12 @@ manual-intervention-dir = data/manual_intervention
 # rsync-executable = <path to rsync>
 # ssh-executable = <path to ssh>
 # outgoing-host = <host where the outgoing directory is located (specify only when using an ssh tunnel)>
+# incoming-host = <host where the incoming directory is located (specify only when using an ssh tunnel)>
 # check-interval = <time interval between two checks in seconds>
 # inactivity-period = <time period before a copy process is considered stalled in seconds>
 # quiet-period = <time period that a path (file or directory)  in incoming is required to be "quiet" before it is copied in seconds>
 # failure-interval = <time interval to wait after a failure before the operation is re-tried in seconds>
 # max-retries = <maximal number of retries when an operation fails>
 # cleansing-regex = <regex of paths that should be removed before moving a path to outgoing>
-# manual-intervention-regex = <regex of paths that need manual intervention> 
\ No newline at end of file
+# manual-intervention-regex = <regex of paths that need manual intervention>
+# treat-incoming-as-remote = <true or false, when switched on, than incoming directory is treated as remote> 
\ No newline at end of file
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java
index 2a914b1d438..69d1b32015d 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/CopyActivityMonitor.java
@@ -97,7 +97,7 @@ public class CopyActivityMonitor
      *            process gets stuck.
      * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from.
      */
-    public CopyActivityMonitor(File destinationDirectory, IFileSystemOperations operations, ITerminable copyProcess,
+    public CopyActivityMonitor(File destinationDirectory, IFileSysOperationsFactory operations, ITerminable copyProcess,
             ITimingParameters timingParameters)
     {
         this.monitoredPathLastChecked = new AtomicLong(0);
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IFileSystemOperations.java b/datamover/source/java/ch/systemsx/cisd/datamover/IFileSysOperationsFactory.java
similarity index 87%
rename from datamover/source/java/ch/systemsx/cisd/datamover/IFileSystemOperations.java
rename to datamover/source/java/ch/systemsx/cisd/datamover/IFileSysOperationsFactory.java
index bb07c542293..9d3726694e4 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/IFileSystemOperations.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/IFileSysOperationsFactory.java
@@ -15,15 +15,17 @@
  */
 package ch.systemsx.cisd.datamover;
 
+import java.io.File;
+
 /**
  * A role that provides access to the roles which perform file system operations.
  * 
  * @author Bernd Rinn
  */
-public interface IFileSystemOperations
+public interface IFileSysOperationsFactory
 {
 
-    public IPathCopier getCopier();
+    public IPathCopier getCopier(File destinationDirectory);
 
     public IPathRemover getRemover();
 
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java
index 5281e599bbb..b31576df50a 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/IPathCopier.java
@@ -35,7 +35,7 @@ public interface IPathCopier extends ITerminable, ISelfTestable
 
     /**
      * @return <code>true</code> if this copier supports explicit specifying a destination host, that is, if
-     *         {@link #copy(File, File, String)} may be called.
+     *         {@link #copy(File, String, File, String)} may be called.
      */
     public boolean supportsExplicitHost();
     
@@ -63,13 +63,14 @@ public interface IPathCopier extends ITerminable, ISelfTestable
      * {@link #supportsExplicitHost()} returns <code>false</code>.
      * 
      * @param sourcePath The source to copy. Can be a file or a directory. It needs to exist and be readable.
+     * @param sourceHost The host where the <var>sourcePath</var> resides or null if it is local.
      * @param destinationDirectory The directory to use as a destination in the copy operation. It must be readable and
      *            writable. If <var>destinationDir/sourcePath</var> exists, it will be overwritten.
-     * @param destinationHost The host where the <var>destinationDirectory</var> resides.
+     * @param destinationHost The host where the <var>destinationDirectory</var> resides  or null if it is local.
      * @return The status of the operation, {@link Status#OK} if everything went OK.
      * @throws IllegalStateException If this copier does not support explicitely specifying a destination host, that is
      *             if {@link #supportsExplicitHost()} returns <code>false</code>.
      */
-    public Status copy(File sourcePath, File destinationDirectory, String destinationHost);
+    public Status copy(File sourcePath, String sourceHost, File destinationDirectory, String destinationHost);
 
 }
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Main.java b/datamover/source/java/ch/systemsx/cisd/datamover/Main.java
index 47227897663..e4387a1ab6b 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/Main.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/Main.java
@@ -130,15 +130,13 @@ public class Main
     }
 
     /**
-     * Returns the path copier and performs a self-test.
+     * performs a self-test.
      */
-    private static IPathCopier getPathCopier(final Parameters parameters)
+    private static void selfTest(final Parameters parameters)
     {
-        IPathCopier copyProcess = null; // Convince Eclipse compiler that the variable has been initialized.
-
         try
         {
-            copyProcess = suggestPathCopier(parameters, false); // This is part of the self-test.
+            IPathCopier copyProcess = suggestPathCopier(parameters, false);
             SelfTest.check(copyProcess, parameters.getIncomingStore(), parameters.getBufferStore(), parameters
                     .getOutgoingStore(), parameters.getManualInterventionStore());
         } catch (HighLevelException e)
@@ -151,28 +149,32 @@ public class Main
             e.printStackTrace();
             System.exit(1);
         }
-        if (SelfTest.requiresDeletionBeforeCreation(copyProcess, parameters.getBufferStore().getPath(), parameters
-                .getOutgoingStore().getPath()))
-        {
-            copyProcess = suggestPathCopier(parameters, true);
-        }
-        return copyProcess;
     }
 
-    private static void startupServer(final Parameters parameters)
+    /**
+     * Returns the path copier
+     */
+    private static IPathCopier getPathCopier(Parameters parameters, File destinationDirectory)
     {
-        final IPathCopier copyProcess = getPathCopier(parameters);
+        IPathCopier copyProcess = suggestPathCopier(parameters, false);
+        boolean requiresDeletionBeforeCreation =
+                SelfTest.requiresDeletionBeforeCreation(copyProcess, destinationDirectory);
+        return suggestPathCopier(parameters, requiresDeletionBeforeCreation);
+    }
 
-        final IFileSystemOperations operations = new IFileSystemOperations()
+    private static void startupServer(final Parameters parameters)
+    {
+        selfTest(parameters);
+        final IFileSysOperationsFactory operations = new IFileSysOperationsFactory()
             {
                 public IPathLastChangedChecker getChecker()
                 {
                     return new FSPathLastChangedChecker();
                 }
 
-                public IPathCopier getCopier()
+                public IPathCopier getCopier(File destinationDirectory)
                 {
-                    return copyProcess;
+                    return getPathCopier(parameters, destinationDirectory);
                 }
 
                 public IPathRemover getRemover()
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java b/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java
index 8eb469a2fde..189bb9385be 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/MonitorStarter.java
@@ -17,9 +17,16 @@
 package ch.systemsx.cisd.datamover;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
 
 import ch.systemsx.cisd.common.Constants;
+import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
+import ch.systemsx.cisd.common.logging.LogCategory;
+import ch.systemsx.cisd.common.logging.LogFactory;
 import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask;
 import ch.systemsx.cisd.common.utilities.IntraFSPathMover;
 import ch.systemsx.cisd.common.utilities.NamePrefixFileFilter;
@@ -29,33 +36,43 @@ import ch.systemsx.cisd.common.utilities.RegexFileFilter.PathType;
 
 /**
  * A class that starts up the monitoring processes, based on the parameters provided.
- *
+ * 
  * @author Bernd Rinn
  */
 public class MonitorStarter
 {
-    
+    private final static String LOCAL_IN_PROGRESS_DIR = "in-progress";
+
+    private final static String LOCAL_READY_TO_MOVE_DIR = "ready-to-move";
+
+    private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, RemotePathMover.class);
+
     private final Parameters parameters;
-    
-    private final IFileSystemOperations operations;
-    
-    public MonitorStarter(Parameters parameters, IFileSystemOperations operations)
+
+    private final IFileSysOperationsFactory operations;
+
+    public MonitorStarter(Parameters parameters, IFileSysOperationsFactory operations)
     {
         this.parameters = parameters;
         this.operations = operations;
     }
-    
+
     public void start()
     {
-        startupIncomingMovingProcess();
-        startupOutgoingMovingProcess();
+        File buffer = parameters.getBufferStore().getPath();
+        // here data are copied from incoming
+        File inProgressDir = ensureDirectoryExists(buffer, LOCAL_IN_PROGRESS_DIR);
+        // from here data are moved to outgoing directory
+        File readyToMoveDir = ensureDirectoryExists(buffer, LOCAL_READY_TO_MOVE_DIR);
+
+        startupIncomingMovingProcess(parameters.getIncomingStore(), inProgressDir, readyToMoveDir);
+        startupOutgoingMovingProcess(readyToMoveDir, parameters.getOutgoingStore());
     }
 
-    private void startupIncomingMovingProcess()
+    private void startupIncomingMovingProcess(FileStore incomingStore, File inProgressDir, File readyToMoveDir)
     {
-        final File incomingDirectory = parameters.getIncomingStore().getPath();
-        final File bufferDirectory = parameters.getBufferStore().getPath();
-        final File manualInterventionDirectory = parameters.getManualInterventionDirectory();
+
+        final File manualInterventionDir = parameters.getManualInterventionDirectory();
         final RegexFileFilter cleansingFilter = new RegexFileFilter();
         if (parameters.getCleansingRegex() != null)
         {
@@ -66,41 +83,122 @@ public class MonitorStarter
         {
             manualInterventionFilter.add(PathType.ALL, parameters.getManualInterventionRegex());
         }
-        final IPathHandler localPathMover =
-                new GatePathHandlerDecorator(manualInterventionFilter, new CleansingPathHandlerDecorator(
-                        cleansingFilter, new IntraFSPathMover(bufferDirectory)), new IntraFSPathMover(
-                        manualInterventionDirectory));
-        final DirectoryScanningTimerTask localMovingTask =
-                new DirectoryScanningTimerTask(incomingDirectory, new QuietPeriodFileFilter(parameters, operations),
-                        localPathMover);
-        final Timer localMovingTimer = new Timer("Local Mover");
-        localMovingTimer.schedule(localMovingTask, 0, parameters.getCheckIntervalMillis());
+        IPathHandler pathHandler =
+                createIncomingMovingPathHandler(incomingStore.getHost(), inProgressDir, readyToMoveDir,
+                        manualInterventionDir, manualInterventionFilter, cleansingFilter);
+
+        final DirectoryScanningTimerTask movingTask =
+                new DirectoryScanningTimerTask(incomingStore.getPath(), new QuietPeriodFileFilter(parameters,
+                        operations), pathHandler);
+        final Timer movingTimer = new Timer("Mover of Incomming Data");
+        schedule(movingTimer, movingTask, 0, parameters.getCheckIntervalMillis(), parameters.getTreatIncomingAsRemote());
+    }
+
+    private IPathHandler createIncomingMovingPathHandler(String sourceHost, File inProgressDir, File readyToMoveDir,
+            File manualInterventionDir, RegexFileFilter manualInterventionFilter, RegexFileFilter cleansingFilter)
+    {
+        IPathHandler moveFromIncoming = createPathMoverToLocal(sourceHost, inProgressDir);
+        IPathHandler processMoved = createProcessMovedFile(readyToMoveDir);
+        IPathHandler moveAndProcess = createMoveAndProcess(moveFromIncoming, inProgressDir, processMoved);
+        IPathHandler manualInterventionMover = createPathMoverToLocal(sourceHost, manualInterventionDir);
+        CleansingPathHandlerDecorator cleansingOrMover =
+                new CleansingPathHandlerDecorator(cleansingFilter, moveAndProcess);
+        return new GatePathHandlerDecorator(manualInterventionFilter, cleansingOrMover, manualInterventionMover);
+    }
+
+    private static IPathHandler createProcessMovedFile(File destDirectory)
+    {
+        FileFilter cleanMarkers = new NamePrefixFileFilter(Constants.IS_FINISHED_PREFIX, true);
+        // TODO [2007-08-13 tpylak] add possibility to make hard-link copy for images analysis
+        IPathHandler moveToDone = new IntraFSPathMover(destDirectory);
+        return new CleansingPathHandlerDecorator(cleanMarkers, moveToDone);
+    }
+
+    private static IPathHandler createMoveAndProcess(final IPathHandler moveFromIncoming, final File destinationDir,
+            final IPathHandler processMovedFile)
+    {
+        return new IPathHandler()
+            {
+                public boolean handle(File path)
+                {
+                    boolean ok = moveFromIncoming.handle(path);
+                    if (ok)
+                    {
+                        // create path in destination directory
+                        File movedFile = new File(destinationDir, path.getName());
+                        File markFile = new File(destinationDir, Constants.IS_FINISHED_PREFIX + path.getName());
+                        assert movedFile.exists();
+                        assert markFile.exists();
+                        operationLog.info(String.format("Processing moved file locally %s\n.", movedFile.getAbsoluteFile()));
+                        markFile.delete(); // process even if mark file could not be deleted
+                        ok = processMovedFile.handle(movedFile);
+                    }
+                    return ok;
+                }
+            };
+    }
+
+    private IPathHandler createPathMoverToLocal(String sourceHost, final File localDestDir)
+    {
+        if (parameters.getTreatIncomingAsRemote())
+        {
+            return createRemotePathMover(sourceHost, localDestDir, /* local host */null);
+        } else
+        {
+            return new IntraFSPathMover(localDestDir);
+        }
+    }
 
+    private IPathHandler createRemotePathMover(String sourceHost, File destinationDirectory, String destinationHost)
+    {
+        IPathCopier copier = operations.getCopier(destinationDirectory);
+        CopyActivityMonitor monitor = new CopyActivityMonitor(destinationDirectory, operations, copier, parameters);
+        IPathRemover remover = operations.getRemover();
+        return new RemotePathMover(destinationDirectory, destinationHost, monitor, remover, copier, sourceHost,
+                parameters);
     }
 
-    private void startupOutgoingMovingProcess()
+    private void startupOutgoingMovingProcess(File srcDir, FileStore destDir)
     {
-        final File bufferDirectory = parameters.getBufferStore().getPath();
-        final File outgoingDirectory = parameters.getOutgoingStore().getPath();
-        final String outgoingHost = parameters.getOutgoingStore().getHost();
-        final CopyActivityMonitor monitor =
-                new CopyActivityMonitor(outgoingDirectory, operations, operations.getCopier(), parameters);
-        final IPathHandler remoteMover =
-                new RemotePathMover(outgoingDirectory, outgoingHost, monitor, operations, parameters);
+        final File outgoingDirectory = destDir.getPath();
+        final String outgoingHost = destDir.getHost();
+        final IPathHandler remoteMover = createRemotePathMover(null, outgoingDirectory, outgoingHost);
         final DirectoryScanningTimerTask remoteMovingTask =
-                new DirectoryScanningTimerTask(bufferDirectory, new NamePrefixFileFilter(Constants.IS_FINISHED_PREFIX,
-                        false), remoteMover);
+                new DirectoryScanningTimerTask(srcDir, new NamePrefixFileFilter(Constants.IS_FINISHED_PREFIX, false),
+                        remoteMover);
         final Timer remoteMovingTimer = new Timer("Remote Mover");
 
         // Implementation notes:
-        // 1. The startup of the remote moving task is delayed for half the time of the check interval. Thus the local
+        // The startup of the remote moving task is delayed for half the time of the check interval. Thus the
+        // incoming
         // moving task should have enough time to finish its job.
-        // 2. The remote moving task is scheduled at fixed rate. The rationale behind this is that if new items are
-        // added
-        // to the local temp directory while the remote timer task has been running for a long time, busy moving data to
+        schedule(remoteMovingTimer, remoteMovingTask, parameters.getCheckIntervalMillis() / 2, parameters
+                .getCheckIntervalMillis(), true);
+    }
+
+    private void schedule(Timer timer, TimerTask task, long delay, long period, boolean isRemote)
+    {
+        // The remote moving task is scheduled at fixed rate. The rationale behind this is that if new items are
+        // added to the source directory while the remote timer task has been running for a long time, busy moving data
+        // to or from
         // remote, the task shoulnd't sit idle for the check time when there is actually work to do.
-        remoteMovingTimer.scheduleAtFixedRate(remoteMovingTask, parameters.getCheckIntervalMillis() / 2, parameters
-                .getCheckIntervalMillis());
+        if (isRemote)
+        {
+            timer.scheduleAtFixedRate(task, delay, period);
+        } else
+        {
+            timer.schedule(task, delay, period);
+        }
     }
 
+    private static File ensureDirectoryExists(File dir, String newDirName)
+    {
+        File dataDir = new File(dir, newDirName);
+        if (!dataDir.exists())
+        {
+            if (!dataDir.mkdir())
+                throw new EnvironmentFailureException("Could not create local data directory " + dataDir);
+        }
+        return dataDir;
+    }
 }
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java
index 933290f471f..a24b79f154a 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java
@@ -116,6 +116,19 @@ public class Parameters implements ITimingParameters
             + "before retrying the operation (in seconds) [default: 1800].", handler = MillisecondConversionOptionHandler.class)
     private long intervalToWaitAfterFailureMillis;
 
+    /**
+     * Default treatment of the incoming data directory - should it be treated as on a remote share?
+     */
+    private static final boolean DEFAULT_TREAT_INCOMING_AS_REMOTE = false;
+
+    /**
+     * If set to true, than directory with incoming data is supposed to be on a remote share. It implies that a special
+     * care will be taken when coping is performed from that directory.
+     */
+    @Option(name = "r", longName = "treat-incoming-as-remote", usage = "If set to true, than directory with incoming data "
+            + "is supposed to be on a remote share.")
+    private boolean treatIncomingAsRemote;
+
     /**
      * Default number of retries after a failure has occurred.
      */
@@ -129,17 +142,22 @@ public class Parameters implements ITimingParameters
             + "datamover gives up on it. [default: 10].")
     private int maximalNumberOfRetries;
 
+    /**
+     * The remote host to copy the data from or null if data are available on a local/remote share
+     */
+    @Option(longName = "incoming-host", metaVar = "HOST", usage = "The remote host to move the data from")
+    private String incomingHost = null;
+    
     /**
      * The directory to monitor for new files and directories to move to outgoing.
      */
     @Option(longName = "incoming-dir", metaVar = "DIR", usage = "The local directory where "
             + "the data producer writes to.")
     private File incomingDirectory = null;
-
+ 
+ 
     /**
-     * The directory to move files and directories to that have been quiet in the incoming directory for long enough and
-     * thus are considered to be ready to be moved to outgoing. Note that this directory needs to be on the same file
-     * system than {@link #incomingDirectory}.
+     * The directory for local files and directories manipulations.
      */
     @Option(longName = "buffer-dir", metaVar = "DIR", usage = "The local directory to "
             + "store the paths to be transfered temporarily.")
@@ -148,7 +166,7 @@ public class Parameters implements ITimingParameters
     /**
      * The directory to move files and directories to that have been quiet in the incoming directory for long enough and
      * that need manual intervention. Note that this directory needs to be on the same file system than
-     * {@link #incomingDirectory}.
+     * {@link #bufferDirectory}.
      */
     @Option(longName = "manual-intervention-dir", metaVar = "DIR", usage = "The local directory to "
             + "store paths that need manual intervention.")
@@ -178,22 +196,22 @@ public class Parameters implements ITimingParameters
      * The store where the files come in.
      */
     private final FileStore incomingStore;
-    
+
     /**
      * The store to buffer the files before copying to outgoing.
      */
     private final FileStore bufferStore;
-    
+
     /**
      * The store to copy the files to.
      */
     private final FileStore outgoingStore;
-    
+
     /**
      * The store to copy files to that need manual intervention.
      */
     private final FileStore manualInterventionStore;
-    
+
     /**
      * The regular expression to use for deciding whether a path in the incoming directory needs manual intervention.
      */
@@ -283,7 +301,7 @@ public class Parameters implements ITimingParameters
                 throw new ConfigurationFailureException(
                         "No 'manual-intervention-dir' defined, but 'manual-intervention-regex'.");
             }
-            incomingStore = new FileStore(incomingDirectory, "incoming", null, false);
+            incomingStore = new FileStore(incomingDirectory, "incoming", incomingHost, treatIncomingAsRemote);
             bufferStore = new FileStore(bufferDirectory, "buffer", null, false);
             manualInterventionStore = new FileStore(manualInterventionDirectory, "manual intervention", null, false);
             outgoingStore = new FileStore(outgoingDirectory, "outgoing", outgoingHost, true);
@@ -331,10 +349,14 @@ public class Parameters implements ITimingParameters
         maximalNumberOfRetries =
                 Integer.parseInt(serviceProperties.getProperty("max-retries", Integer
                         .toString(DEFAULT_MAXIMAL_NUMBER_OF_RETRIES)));
+        treatIncomingAsRemote =
+                Boolean.parseBoolean(serviceProperties.getProperty("treat-incoming-as-remote", Boolean
+                        .toString(DEFAULT_TREAT_INCOMING_AS_REMOTE)));
         if (serviceProperties.getProperty("incoming-dir") != null)
         {
             incomingDirectory = new File(serviceProperties.getProperty("incoming-dir"));
         }
+        incomingHost = serviceProperties.getProperty("incoming-host");
         if (serviceProperties.getProperty("buffer-dir") != null)
         {
             bufferDirectory = new File(serviceProperties.getProperty("buffer-dir"));
@@ -453,9 +475,16 @@ public class Parameters implements ITimingParameters
     }
 
     /**
-     * @return The store to move files and directories to that have been quiet in the incoming directory for long
-     *         enough and thus are considered to be ready to be moved to remote. Note that this directory needs to be on
-     *         the same file system as {@link #getIncomingStore}.
+     * @return true if directory with incoming data is supposed to be on a remote share. It implies that a special care
+     *         will be taken when coping is performed from that directory.
+     */
+    public boolean getTreatIncomingAsRemote()
+    {
+        return treatIncomingAsRemote;
+    }
+
+    /**
+     * @return The directory for local files and directories manipulations.
      */
     public FileStore getBufferStore()
     {
@@ -472,8 +501,8 @@ public class Parameters implements ITimingParameters
 
     /**
      * @return The directory to move files and directories to that have been quiet in the local data directory for long
-     *         enough and that need manual intervention. Note that this directory needs to be on the same file system
-     *         as {@link #getIncomingStore}.
+     *         enough and that need manual intervention. Note that this directory needs to be on the same file system as
+     *         {@link #getBufferStore}.
      */
     public File getManualInterventionDirectory()
     {
@@ -482,8 +511,8 @@ public class Parameters implements ITimingParameters
 
     /**
      * @return The store to move files and directories to that have been quiet in the local data directory for long
-     *         enough and that need manual intervention. Note that this directory needs to be on the same file system
-     *         as {@link #getIncomingStore}.
+     *         enough and that need manual intervention. Note that this directory needs to be on the same file system as
+     *         {@link #getBufferStore}.
      */
     public FileStore getManualInterventionStore()
     {
@@ -525,8 +554,8 @@ public class Parameters implements ITimingParameters
             }
             if (null != getManualInterventionDirectory())
             {
-                operationLog.info(String.format("Manual interventions directory: '%s'.",
-                        manualInterventionDirectory.getAbsolutePath()));
+                operationLog.info(String.format("Manual interventions directory: '%s'.", manualInterventionDirectory
+                        .getAbsolutePath()));
             }
             operationLog.info(String.format("Check intervall: %d s.", getCheckIntervalMillis() / 1000));
             operationLog.info(String.format("Quiet period: %d s.", getQuietPeriodMillis() / 1000));
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java b/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java
index 2e00ed087f7..86e82b58c8a 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/QuietPeriodFileFilter.java
@@ -38,7 +38,7 @@ public class QuietPeriodFileFilter implements FileFilter
      * @param operations The operations object to ask for the implementation to use to check when a pathname was
      *            changed.
      */
-    public QuietPeriodFileFilter(ITimingParameters timingParameters, IFileSystemOperations operations)
+    public QuietPeriodFileFilter(ITimingParameters timingParameters, IFileSysOperationsFactory operations)
     {
         assert timingParameters != null;
         assert operations != null;
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java
index d08cbde57ea..034fd714ae0 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/RemotePathMover.java
@@ -51,7 +51,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
     private static final String MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE =
             "Moving path '%s' to remote directory '%s' failed.";
 
-    private static final String REMOVING_LOCAL_PATH_FAILED_TEMPLATE = "Removing local path '%s' failed.";
+    private static final String REMOVING_LOCAL_PATH_FAILED_TEMPLATE = "Removing local path '%s' failed (%s).";
 
     private static final String FAILED_TO_CREATE_MARK_FILE_TEMPLATE = "Failed to create mark file '%s'";
 
@@ -71,6 +71,8 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
     private final IPathCopier copier;
 
     private final IPathRemover remover;
+    
+    private final String sourceHost;
 
     private final CopyActivityMonitor monitor;
 
@@ -85,15 +87,18 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
      * @param destinationHost The host to move paths to, or <code>null</code>, if <var>destinationDirectory</var> is
      *            a remote share.
      * @param monitor The activity monitor to inform about actions.
-     * @param operations The implementations of the file system operations to use for moving.
+     * @param remover Allows to remove files.
+     * @param copier Allows to copy files
+     * @param sourceHost The host to move paths from, or <code>null</code>, if data will be moved from the local file system
      * @param timingParameters The timing parametes used for monitoring and reporting stall situations.
      */
     public RemotePathMover(File destinationDirectory, String destinationHost, CopyActivityMonitor monitor,
-            IFileSystemOperations operations, ITimingParameters timingParameters)
+            IPathRemover remover, IPathCopier copier, String sourceHost, ITimingParameters timingParameters)
     {
         assert destinationDirectory != null;
         assert monitor != null;
-        assert operations != null;
+        assert remover != null;
+        assert copier != null;
         assert timingParameters != null;
         assert FileUtilities.checkDirectoryFullyAccessible(destinationDirectory, "destination") == null : FileUtilities
                 .checkDirectoryFullyAccessible(destinationDirectory, "destination");
@@ -101,8 +106,9 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
         this.destinationDirectory = destinationDirectory;
         this.destinationHost = destinationHost;
         this.monitor = monitor;
-        this.copier = operations.getCopier();
-        this.remover = operations.getRemover();
+        this.copier = copier;
+        this.remover = remover;
+        this.sourceHost = sourceHost;
         this.intervallToWaitAfterFailure = timingParameters.getIntervalToWaitAfterFailure();
         this.maximalNumberOfRetries = timingParameters.getMaximalNumberOfRetries();
 
@@ -131,7 +137,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
             }
             final long startTime = System.currentTimeMillis();
             monitor.start(path);
-            final Status copyStatus = copier.copy(path, destinationDirectory, destinationHost);
+            final Status copyStatus = copier.copy(path, sourceHost, destinationDirectory, destinationHost);
             monitor.stop();
             if (StatusFlag.OK.equals(copyStatus.getFlag()))
             {
@@ -145,7 +151,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
                 if (Status.OK.equals(removalStatus) == false)
                 {
                     // We don't retry this, because the path is local and removal really shouldn't fail.
-                    notificationLog.error(String.format(REMOVING_LOCAL_PATH_FAILED_TEMPLATE, path));
+                    notificationLog.error(String.format(REMOVING_LOCAL_PATH_FAILED_TEMPLATE, path, removalStatus));
                 } else if (operationLog.isInfoEnabled())
                 {
                     operationLog.info(String.format(REMOVED_PATH_TEMPLATE, path.getPath()));
@@ -218,7 +224,7 @@ public final class RemotePathMover implements DirectoryScanningTimerTask.IPathHa
         {
             markFile.createNewFile();
             monitor.start(path);
-            final Status copyStatus = copier.copy(markFile, destinationDirectory, destinationHost);
+            final Status copyStatus = copier.copy(markFile, sourceHost, destinationDirectory, destinationHost);
             monitor.stop();
             if (StatusFlag.OK.equals(copyStatus.getFlag()))
             {
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java b/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java
index 09be1b3197e..46d79761806 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/SelfTest.java
@@ -167,19 +167,19 @@ public class SelfTest
      * @return <code>true</code> if the <var>copyProcess</var> on the file system where the <var>destinationDirectory</var>
      *         resides requires deleting an existing file before it can be overwritten.
      */
-    public static boolean requiresDeletionBeforeCreation(IPathCopier copyProcess, File sourceDirectory,
-            File destinationDirectory)
+    public static boolean requiresDeletionBeforeCreation(IPathCopier copyProcess, File destinationDirectory)
     {
         assert copyProcess != null;
-        assert sourceDirectory != null;
-        assert sourceDirectory.isDirectory();
         assert destinationDirectory != null;
         assert destinationDirectory.isDirectory();
 
-        final File sourceFile = new File(sourceDirectory, ".requiresDeletionBeforeCreation");
-        final File destinationFile = new File(destinationDirectory, ".requiresDeletionBeforeCreation");
+        String fileName = ".requiresDeletionBeforeCreation";
+        final File destinationFile = new File(destinationDirectory, fileName);
+        final File tmpSourceDir = new File(destinationDirectory, ".DataMover-OverrideTest");
+        final File sourceFile = new File(tmpSourceDir, fileName);
         try
         {
+            tmpSourceDir.mkdir();
             sourceFile.createNewFile();
             destinationFile.createNewFile();
             // If we have e.g. a Cellera NAS server, the next call will raise an IOException.
@@ -211,6 +211,7 @@ public class SelfTest
         {
             // We don't check for success because there is nothing we can do if we fail.
             sourceFile.delete();
+            tmpSourceDir.delete();
             destinationFile.delete();
         }
     }
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java
index fb8273d3253..898c7eebfcf 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/rsync/RsyncCopier.java
@@ -80,7 +80,7 @@ public class RsyncCopier implements IPathCopier
     private final String sshExecutable;
 
     private final List<String> additionalCmdLineFlags;
-    
+
     /**
      * If <code>true</code>, the file system of the destination directory requires that already existing files and
      * directories on the remote side are removed before the copy process is started.
@@ -124,15 +124,16 @@ public class RsyncCopier implements IPathCopier
 
     public Status copy(File sourcePath, File destinationDirectory)
     {
-        return copy(sourcePath, destinationDirectory, null);
+        return copy(sourcePath, null, destinationDirectory, null);
     }
 
-    public Status copy(File sourcePath, File destinationDirectory, String destinationHost)
+    public Status copy(File sourcePath, String sourceHost, File destinationDirectory, String destinationHost)
     {
         assert sourcePath != null;
-        assert sourcePath.exists();
+        assert sourceHost != null || sourcePath.exists();
         assert destinationDirectory != null;
         assert destinationHost != null || destinationDirectory.isDirectory();
+        assert sourceHost == null || destinationHost == null; // only one side can be remote
 
         final File destinationPath = new File(destinationDirectory, sourcePath.getName());
         if (destinationDirectoryRequiresDeletionBeforeCreation && destinationPath.exists())
@@ -143,12 +144,13 @@ public class RsyncCopier implements IPathCopier
                         "Remove path '%s' since it exists and the remote file system doesn't support overwriting.",
                         destinationPath));
             }
+            // TODO [2007-08-13 tpylak] Bernd, what if destinationHost is set?
             FileUtilities.deleteRecursively(destinationPath);
         }
         try
         {
             final ProcessBuilder copyProcessBuilder =
-                    new ProcessBuilder(createCommandLine(sourcePath, destinationDirectory, destinationHost));
+                    new ProcessBuilder(createCommandLine(sourcePath, sourceHost, destinationDirectory, destinationHost));
             copyProcessBuilder.redirectErrorStream(true);
             if (operationLog.isDebugEnabled())
             {
@@ -174,12 +176,14 @@ public class RsyncCopier implements IPathCopier
         }
     }
 
-    private List<String> createCommandLine(File sourcePath, File destinationDirectory, String destinationHost)
+    private List<String> createCommandLine(File sourcePath, String sourceHost, File destinationDirectory,
+            String destinationHost)
     {
-        assert sourcePath != null && sourcePath.exists();
-        assert destinationDirectory != null && destinationDirectory.isDirectory();
-        assert (destinationHost != null && sshExecutable != null) || (destinationHost == null); 
-        
+        assert sourcePath != null && (sourceHost != null || sourcePath.exists());
+        assert destinationDirectory != null && (destinationHost != null || destinationDirectory.isDirectory());
+        assert (destinationHost != null && sshExecutable != null) || (destinationHost == null);
+        assert (sourceHost != null && sshExecutable != null) || (sourceHost == null);
+
         final List<String> standardParameters = Arrays.asList("--archive", "--delete", "--inplace", "--whole-file");
         final List<String> commandLineList = new ArrayList<String>();
         commandLineList.add(rsyncExecutable);
@@ -193,12 +197,12 @@ public class RsyncCopier implements IPathCopier
         {
             commandLineList.addAll(additionalCmdLineFlags);
         }
-        commandLineList.add(toUnix(sourcePath.getAbsolutePath()));
-        commandLineList.add(createDestination(destinationHost, destinationDirectory));
-        
+        commandLineList.add(buildPath(sourceHost, sourcePath, false));
+        commandLineList.add(buildPath(destinationHost, destinationDirectory, true));
+
         return commandLineList;
     }
-    
+
     private static String getSshExecutableArgument(String sshExecutable)
     {
         if (OSUtilities.isWindows())
@@ -209,16 +213,22 @@ public class RsyncCopier implements IPathCopier
             return sshExecutable + " -oBatchMode=yes";
         }
     }
-    
-    private static String createDestination(String destinationHost, File destinationDirectory)
+
+    private static String buildPath(String host, File resource, boolean isDirectory)
     {
-        if (null == destinationHost)
+        if (null == host)
         {
-            return toUnix(destinationDirectory.getAbsolutePath() + File.separator);
+            String path = resource.getAbsolutePath();
+            if (isDirectory)
+                path += File.separator;
+            return toUnix(path);
         } else
         {
+            String path = resource.getPath();
+            if (isDirectory)
+                path += File.separator;
             // We must not use the absolute path here because that is the business of the destination host.
-            return destinationHost + ":" + toUnix(destinationDirectory.getPath() + File.separator);
+            return host + ":" + toUnix(path);
         }
     }
 
@@ -362,8 +372,8 @@ public class RsyncCopier implements IPathCopier
     {
         assert destinationDirectory != null && destinationDirectory.isDirectory();
         assert destinationHost != null;
-        
-        final String destination = createDestination(destinationHost, destinationDirectory);
+
+        final String destination = buildPath(destinationHost, destinationDirectory, true);
         final ProcessBuilder listProcessBuilder =
                 new ProcessBuilder(rsyncExecutable, "--rsh", getSshExecutableArgument(sshExecutable), destination)
                         .redirectErrorStream(true);
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java
index 953b63410f2..f06f3f12507 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/xcopy/XcopyCopier.java
@@ -333,20 +333,20 @@ public class XcopyCopier implements IPathCopier
      * <em>Must not be called with <code>destinationHost != null</code> since this copier does not support explicitly 
      * specifying a destination host.</em>
      * 
-     * @throws IllegalStateException If <var>destinationHost</var> is not <code>null</code>.
+     * @throws IllegalStateException If <var>sourceHost</var> or <var>destinationHost</var> is not <code>null</code>.
      */
-    public Status copy(File sourcePath, File destinationDirectory, String destinationHost)
+    public Status copy(File sourcePath, String sourceHost, File destinationDirectory, String destinationHost)
     {
         assert sourcePath != null;
         assert destinationDirectory != null;
 
-        if (destinationHost == null)
+        if (sourceHost == null && destinationHost == null)
         {
             return copy(sourcePath, destinationDirectory);
         } else
         {
             throw new IllegalStateException(
-                    "Explicitely specifying a destination host is not supported by this copier.");
+                    "Explicitely specifying a source or destination host is not supported by this copier.");
         }
     }
 
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java
index ff2f52ade12..6babdc1bfe1 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/CopyActivityMonitorTest.java
@@ -78,7 +78,7 @@ public class CopyActivityMonitorTest
         }
     }
 
-    private final class MyFileSystemOperations implements IFileSystemOperations
+    private final class MyFileSystemOperations implements IFileSysOperationsFactory
     {
         /**
          * 
@@ -95,7 +95,7 @@ public class CopyActivityMonitorTest
             return checker;
         }
 
-        public IPathCopier getCopier()
+        public IPathCopier getCopier(File destinationDirectory)
         {
             throw new AssertionError("call not expected");
         }
@@ -187,7 +187,7 @@ public class CopyActivityMonitorTest
     public void testHappyPath() throws Throwable
     {
         final IPathLastChangedChecker checker = new HappyPathLastChangedChecker();
-        final IFileSystemOperations operations = new MyFileSystemOperations(checker);
+        final IFileSysOperationsFactory operations = new MyFileSystemOperations(checker);
         final ITerminable dummyTerminable = new DummyTerminable();
         final ITimingParameters parameters = new MyTimingParameters(0);
         final CopyActivityMonitor monitor =
@@ -205,7 +205,7 @@ public class CopyActivityMonitorTest
     public void testCopyStalled() throws Throwable
     {
         final IPathLastChangedChecker checker = new PathLastChangedCheckerStalled();
-        final IFileSystemOperations operations = new MyFileSystemOperations(checker);
+        final IFileSysOperationsFactory operations = new MyFileSystemOperations(checker);
         final MockTerminable copyProcess = new MockTerminable();
         final ITimingParameters parameters = new MyTimingParameters(0);
         final CopyActivityMonitor monitor =
@@ -251,7 +251,7 @@ public class CopyActivityMonitorTest
     public void testCopySeemsStalledButActuallyIsFine() throws Throwable
     {
         final IPathLastChangedChecker checker = new SimulateShortInterruptionChangedChecker();
-        final IFileSystemOperations operations = new MyFileSystemOperations(checker);
+        final IFileSysOperationsFactory operations = new MyFileSystemOperations(checker);
         final MockTerminable copyProcess = new MockTerminable();
         final ITimingParameters parameters = new MyTimingParameters(0);
         final CopyActivityMonitor monitor =
@@ -280,7 +280,7 @@ public class CopyActivityMonitorTest
                 LogMonitoringAppender.addAppender(LogCategory.OPERATION, "Activity monitor got terminated");
         LogFactory.getLogger(LogCategory.OPERATION, CopyActivityMonitor.class).addAppender(appender);
         final PathLastChangedCheckerStuck checker = new PathLastChangedCheckerStuck();
-        final IFileSystemOperations operations = new MyFileSystemOperations(checker);
+        final IFileSysOperationsFactory operations = new MyFileSystemOperations(checker);
         final MockTerminable copyProcess = new MockTerminable();
         final ITimingParameters parameters = new MyTimingParameters(0);
         final CopyActivityMonitor monitor =
@@ -305,7 +305,7 @@ public class CopyActivityMonitorTest
             try
             {
                 Thread.sleep(INACTIVITY_PERIOD_MILLIS); // Wait longer than the activity monitor is willing to wait for
-                                                        // us.
+                // us.
             } catch (InterruptedException e)
             {
                 // Can't happen since this method runs in a TimerThread which isn't interrupted.
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java
index 806e3079c9c..48d881886c4 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java
@@ -218,16 +218,22 @@ public class ParametersTest
         final String REMOTE_HOST = "myremotehost";
         final int CHECK_INTERVAL = 22;
         final int QUIET_PERIOD = 33;
+        final String TREAT_AS_REMOTE = Boolean.toString(true);
+        final String REMOTE_INCOMING_HOST = "my-remote-incoming-host";
+
         final Parameters parameters =
                 parse("--incoming-dir", LOCAL_DATADIR, "--buffer-dir", LOCAL_TEMPDIR, "--outgoing-dir", REMOTE_DATADIR,
                         "--outgoing-host", REMOTE_HOST, "--check-interval", Integer.toString(CHECK_INTERVAL),
-                        "--quiet-period", Integer.toString(QUIET_PERIOD));
+                        "--quiet-period", Integer.toString(QUIET_PERIOD), "--treat-incoming-as-remote",
+                        TREAT_AS_REMOTE, "--incoming-host", REMOTE_INCOMING_HOST);
         assertEquals(LOCAL_DATADIR, parameters.getIncomingStore().getPath().getPath());
+        assertEquals(REMOTE_INCOMING_HOST, parameters.getIncomingStore().getHost());
         assertEquals(LOCAL_TEMPDIR, parameters.getBufferStore().getPath().getPath());
         assertEquals(REMOTE_DATADIR, parameters.getOutgoingStore().getPath().getPath());
         assertEquals(REMOTE_HOST, parameters.getOutgoingStore().getHost());
         assertEquals(1000 * CHECK_INTERVAL, parameters.getCheckIntervalMillis());
         assertEquals(1000 * QUIET_PERIOD, parameters.getQuietPeriodMillis());
+        assertEquals(TREAT_AS_REMOTE, parameters.getTreatIncomingAsRemote());
     }
 
 }
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java
index 86344c1ed58..32232f892fa 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/SelfTestTest.java
@@ -50,7 +50,7 @@ public class SelfTestTest
     private static final File outgoingDirectory = new File(workingDirectory, "outgoing");
 
     private static final FileStore outgoingStore = new FileStore(outgoingDirectory, "outgoing", null, false);
-    
+
     private static final FileStore dummyStore = new FileStore(null, "dummy", null, false);
 
     private static final IPathCopier mockCopier = new MockPathCopier(false, false);
@@ -106,7 +106,7 @@ public class SelfTestTest
             throw new AssertionError();
         }
 
-        public Status copy(File sourcePath, File destinationDirectory, String destinationHost)
+        public Status copy(File sourcePath, String sourceHost, File destinationDirectory, String destinationHost)
         {
             throw new AssertionError();
         }
-- 
GitLab