From 3eb81618c4d8aa292a92e5e46845860c0304ef9d Mon Sep 17 00:00:00 2001
From: tpylak <tpylak>
Date: Tue, 10 Jun 2008 13:11:48 +0000
Subject: [PATCH] DMV-18 CopyActivityMonitor does not kill the stacked copy
 process if the destination file is not created, DMV-22 Termination of copy
 process by copy-monitor does not work

SVN: 6543
---
 .../remote/CopyActivityMonitor.java           | 252 +++++++++++-------
 .../filesystem/remote/rsync/RsyncCopier.java  |  70 +++--
 .../remote/CopyActivityMonitorTest.java       | 195 ++++++++++----
 .../remote/rsync/RsyncCopierTest.java         |  38 ++-
 4 files changed, 364 insertions(+), 191 deletions(-)

diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java
index 63545a8ba62..3fd3a151d2f 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Future;
 
 import org.apache.log4j.Logger;
 
+import ch.rinn.restrictions.Private;
 import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
 import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor;
 import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
@@ -52,7 +53,7 @@ public class CopyActivityMonitor
     private static final Logger machineLog =
             LogFactory.getLogger(LogCategory.MACHINE, CopyActivityMonitor.class);
 
-    private final IFileStore destinationStore;
+    private final IFileStoreMonitor destinationStore;
 
     private final long checkIntervallMillis;
 
@@ -60,6 +61,10 @@ public class CopyActivityMonitor
 
     private final String threadNamePrefix;
 
+    private final ExecutorService lastChangedExecutor =
+            new NamingThreadPoolExecutor("Last Changed Explorer").daemonize();
+
+    /** handler to terminate monitored process if the observed store item does not change */
     private final ITerminable terminable;
 
     /**
@@ -86,11 +91,18 @@ public class CopyActivityMonitor
     public CopyActivityMonitor(IFileStore destinationStore, ITerminable copyProcess,
             ITimingParameters timingParameters)
     {
-        assert destinationStore != null;
+        this(createFileStoreMonitor(destinationStore), copyProcess, timingParameters);
+    }
+
+    @Private
+    CopyActivityMonitor(IFileStoreMonitor destinationStoreMonitor, ITerminable copyProcess,
+            ITimingParameters timingParameters)
+    {
+        assert destinationStoreMonitor != null;
         assert copyProcess != null;
         assert timingParameters != null;
 
-        this.destinationStore = destinationStore;
+        this.destinationStore = destinationStoreMonitor;
         this.terminable = copyProcess;
         this.checkIntervallMillis = timingParameters.getCheckIntervalMillis();
         this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis();
@@ -107,6 +119,20 @@ public class CopyActivityMonitor
         }
     }
 
+    // Used for all file system operations in this class.
+    @Private
+    static interface IFileStoreMonitor
+    {
+        // returns 0 when an error or timeout occurs during the check
+        long lastChanged(StoreItem item, long stopWhenYoungerThan);
+
+        // true if item exists in the store
+        boolean exists(StoreItem item);
+
+        // description of the store for logging purposes
+        String toString();
+    }
+
     /**
      * Starts the activity monitoring.
      * 
@@ -133,12 +159,6 @@ public class CopyActivityMonitor
         activityMonitoringTimer.cancel();
     }
 
-    private static interface LastChangeItemChecker
-    {
-        // returns 0 when an error or timeout occurs during the check
-        long lastChanged(long previousCheck);
-    }
-
     /**
      * A value object that holds the information about the last check performed for a path.
      */
@@ -171,6 +191,28 @@ public class CopyActivityMonitor
         }
     }
 
+    private static IFileStoreMonitor createFileStoreMonitor(final IFileStore destinationStore)
+    {
+        return new IFileStoreMonitor()
+            {
+                public long lastChanged(StoreItem item, long stopWhenFindYoungerRelative)
+                {
+                    return destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative);
+                }
+
+                public boolean exists(StoreItem item)
+                {
+                    return destinationStore.exists(item);
+                }
+
+                @Override
+                public String toString()
+                {
+                    return destinationStore.toString();
+                }
+            };
+    }
+
     /**
      * A {@link TimerTask} that monitors writing activity on a directory.
      */
@@ -183,13 +225,8 @@ public class CopyActivityMonitor
         private static final String INACTIVITY_REPORT_TEMPLATE =
                 "No progress on copying '%s' to '%s' for %f seconds - network connection might be stalled.";
 
-        private final ExecutorService lastChangedExecutor =
-                new NamingThreadPoolExecutor("Last Changed Explorer").daemonize();
-
         private final StoreItem itemToBeCopied;
 
-        private final LastChangeItemChecker lastChangeChecker;
-
         private PathCheckRecord lastCheckOrNull;
 
         private ActivityMonitoringTimerTask(StoreItem itemToBeCopied)
@@ -200,7 +237,6 @@ public class CopyActivityMonitor
 
             this.lastCheckOrNull = null;
             this.itemToBeCopied = itemToBeCopied;
-            this.lastChangeChecker = createLastChangedChecker();
         }
 
         @Override
@@ -219,13 +255,6 @@ public class CopyActivityMonitor
                             "Asking for last change time of '%s' inside '%s'.", itemToBeCopied,
                             destinationStore));
                 }
-                if (destinationStore.exists(itemToBeCopied) == false)
-                {
-                    operationLog.warn(String.format(
-                            "File or directory '%s' inside '%s' does not (yet?) exist.",
-                            itemToBeCopied, destinationStore));
-                    return;
-                }
                 final long now = System.currentTimeMillis();
                 if (isQuietFor(inactivityPeriodMillis, now))
                 {
@@ -256,30 +285,13 @@ public class CopyActivityMonitor
             }
         }
 
-        private LastChangeItemChecker createLastChangedChecker()
-        {
-            return new LastChangeItemChecker()
-                {
-                    public long lastChanged(long previousCheck)
-                    {
-                        final Long lastChanged = tryLastChanged(destinationStore, itemToBeCopied);
-                        if (operationLog.isTraceEnabled() && lastChanged != null)
-                        {
-                            String msgTemplate =
-                                    "Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.";
-                            String msg =
-                                    String.format(msgTemplate, itemToBeCopied, destinationStore,
-                                            lastChanged);
-                            operationLog.trace(msg);
-                        }
-                        return (lastChanged != null) ? lastChanged : 0;
-                    }
-                };
-        }
-
         // true if nothing has changed during the specified period
         private boolean isQuietFor(long quietPeriodMillis, long now)
         {
+            if (destinationStore.exists(itemToBeCopied) == false)
+            {
+                return checkNonexistentPeriod(quietPeriodMillis, now);
+            }
             if (lastCheckOrNull == null) // never checked before
             {
                 setFirstModificationDate(now);
@@ -309,12 +321,38 @@ public class CopyActivityMonitor
             }
         }
 
+        // Checks how much time elapsed since the last check without looking into file system.
+        // Returns true if it's more than quietPeriodMillis.
+        // Used to stop the copy process if file does not appear at all for a long time.
+        private boolean checkNonexistentPeriod(long quietPeriodMillis, long now)
+        {
+            if (lastCheckOrNull == null)
+            {
+                lastCheckOrNull = new PathCheckRecord(now, 0);
+                return false;
+            } else
+            {
+                if (lastCheckOrNull.getTimeOfLastModification() != 0)
+                {
+                    operationLog.warn(String.format(
+                            "File or directory '%s' has vanished from '%s'.", itemToBeCopied,
+                            destinationStore));
+                } else
+                {
+                    operationLog.warn(String.format(
+                            "File or directory '%s' inside '%s' does not (yet?) exist.",
+                            itemToBeCopied, destinationStore));
+                }
+                return (now - lastCheckOrNull.getTimeChecked() >= quietPeriodMillis);
+            }
+        }
+
         // check if item has been modified since last check by comparing its current modification
         // time to the one acquired in the past
         private boolean checkIfModifiedAndSet(long now)
         {
             final long prevModificationTime = lastCheckOrNull.getTimeOfLastModification();
-            final long newModificationTime = lastChangeChecker.lastChanged(prevModificationTime);
+            final long newModificationTime = lastChanged(itemToBeCopied, prevModificationTime);
             boolean newIsKnown = (newModificationTime != 0);
             if (newIsKnown && newModificationTime != prevModificationTime)
             {
@@ -328,82 +366,94 @@ public class CopyActivityMonitor
 
         private void setFirstModificationDate(final long timeChecked)
         {
-            long lastChanged = lastChangeChecker.lastChanged(0L); // 0 if error
+            long lastChanged = lastChanged(itemToBeCopied, 0L); // 0 if error
             lastCheckOrNull = new PathCheckRecord(timeChecked, lastChanged);
         }
+    }
 
-        private Long tryLastChanged(IFileStore store, StoreItem item)
+    private long lastChanged(StoreItem item, long previousCheck)
+    {
+        final Long lastChanged = tryLastChanged(destinationStore, item);
+        if (operationLog.isTraceEnabled() && lastChanged != null)
         {
-            final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
-            final Future<Long> lastChangedFuture =
-                    lastChangedExecutor.submit(createCheckerCallable(store, item,
-                            minusSafetyMargin(inactivityPeriodMillis)));
-            final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
-            try
-            {
-                final Long lastChanged =
-                        ConcurrencyUtilities.getResult(lastChangedFuture, timeoutMillis,
-                                simpleMachineLog, "Check for recent paths").tryGetResult();
-                if (lastChanged == null)
-                {
-                    operationLog.error(String.format(
-                            "Could not determine \"last changed time\" of %s: time out.", item));
-                    return null;
-                }
-                return lastChanged;
-            } catch (UnknownLastChangedException ex)
+            String msgTemplate =
+                    "Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.";
+            String msg = String.format(msgTemplate, item, destinationStore, lastChanged);
+            operationLog.trace(msg);
+        }
+        return (lastChanged != null) ? lastChanged : 0;
+    }
+
+    private long minusSafetyMargin(long period)
+    {
+        return Math.max(0L, period - 1000L);
+    }
+
+    private Long tryLastChanged(IFileStoreMonitor store, StoreItem item)
+    {
+        long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis);
+        final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
+        final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
+        final Future<Long> lastChangedFuture =
+                lastChangedExecutor.submit(createCheckerCallable(store, item,
+                        stopWhenFindYoungerRelative));
+        try
+        {
+            final Long lastChanged =
+                    ConcurrencyUtilities.getResult(lastChangedFuture, timeoutMillis,
+                            simpleMachineLog, "Check for recent paths").tryGetResult();
+            if (lastChanged == null)
             {
                 operationLog.error(String.format(
-                        "Could not determine \"last changed time\" of %s: %s", item, ex));
+                        "Could not determine \"last changed time\" of %s: time out.", item));
                 return null;
             }
-        }
-
-        private long minusSafetyMargin(long period)
+            return lastChanged;
+        } catch (UnknownLastChangedException ex)
         {
-            return Math.max(0L, period - 1000L);
+            operationLog.error(String.format("Could not determine \"last changed time\" of %s: %s",
+                    item, ex));
+            return null;
         }
+    }
 
-        private Callable<Long> createCheckerCallable(final IFileStore store, final StoreItem item,
-                final long stopWhenYoungerThan)
-        {
-            return new Callable<Long>()
+    private static Callable<Long> createCheckerCallable(final IFileStoreMonitor store,
+            final StoreItem item, final long stopWhenYoungerThan)
+    {
+        return new Callable<Long>()
+            {
+                public Long call() throws Exception
                 {
-                    public Long call() throws Exception
+                    if (machineLog.isTraceEnabled())
                     {
+                        machineLog
+                                .trace("Starting quick check for recent paths on '" + item + "'.");
+                    }
+                    try
+                    {
+                        final long lastChanged = store.lastChanged(item, stopWhenYoungerThan);
                         if (machineLog.isTraceEnabled())
                         {
-                            machineLog.trace("Starting quick check for recent paths on '" + item
-                                    + "'.");
+                            machineLog
+                                    .trace(String
+                                            .format(
+                                                    "Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.",
+                                                    item, lastChanged));
                         }
-                        try
-                        {
-                            final long lastChanged =
-                                    store.lastChangedRelative(item, stopWhenYoungerThan);
-                            if (machineLog.isTraceEnabled())
-                            {
-                                machineLog
-                                        .trace(String
-                                                .format(
-                                                        "Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.",
-                                                        item, lastChanged));
-                            }
-                            return lastChanged;
-                        } catch (RuntimeException ex)
+                        return lastChanged;
+                    } catch (RuntimeException ex)
+                    {
+                        if (machineLog.isTraceEnabled())
                         {
-                            if (machineLog.isTraceEnabled())
-                            {
-                                final Throwable th =
-                                        (ex instanceof CheckedExceptionTunnel) ? ex.getCause() : ex;
-                                machineLog.trace("Failed on quick check for recent paths on '"
-                                        + item + "'.", th);
-                            }
-                            throw ex;
+                            final Throwable th =
+                                    (ex instanceof CheckedExceptionTunnel) ? ex.getCause() : ex;
+                            machineLog.trace("Failed on quick check for recent paths on '" + item
+                                    + "'.", th);
                         }
+                        throw ex;
                     }
-                };
-        }
-
+                }
+            };
     }
 
 }
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java
index d1fe8db3076..fcccae21059 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.log4j.Logger;
@@ -32,6 +33,8 @@ import ch.systemsx.cisd.common.logging.LogCategory;
 import ch.systemsx.cisd.common.logging.LogFactory;
 import ch.systemsx.cisd.common.process.ProcessExecutionHelper;
 import ch.systemsx.cisd.common.process.ProcessResult;
+import ch.systemsx.cisd.common.process.ProcessExecutionHelper.IProcessHandler;
+import ch.systemsx.cisd.common.utilities.ITerminable;
 import ch.systemsx.cisd.common.utilities.OSUtilities;
 import ch.systemsx.cisd.datamover.filesystem.intf.IPathCopier;
 import ch.systemsx.cisd.datamover.filesystem.remote.rsync.RsyncVersionChecker.RsyncVersion;
@@ -82,14 +85,11 @@ public final class RsyncCopier implements IPathCopier
      */
     private final boolean destinationDirectoryRequiresDeletionBeforeCreation;
 
-    /**
-     * Informs that a process has been started (and, in this case, is finished or not) or not.
-     * <p>
-     * Gets initialized to <code>false</code> just before {@link ProcessExecutionHelper} gets
-     * called.
-     * </p>
-     */
-    private AtomicBoolean terminated = new AtomicBoolean(false);
+    // stores the handler to stop the copy process if it has been launched or null otherwise.
+    private final AtomicReference<ITerminable> rsyncTerminator;
+
+    // used to ensure that if terminate() is called before copy(), then copy will not proceed
+    private final AtomicBoolean isTerminatedExternally;
 
     /**
      * Constructs an <code>RsyncCopier</code>.
@@ -114,6 +114,8 @@ public final class RsyncCopier implements IPathCopier
         this.destinationDirectoryRequiresDeletionBeforeCreation =
                 destinationDirectoryRequiresDeletionBeforeCreation;
         this.overwrite = overwrite;
+        this.rsyncTerminator = new AtomicReference<ITerminable>(null);
+        this.isTerminatedExternally = new AtomicBoolean(false);
         if (cmdLineFlags.length > 0)
         {
             this.additionalCmdLineFlags = Arrays.asList(cmdLineFlags);
@@ -158,23 +160,21 @@ public final class RsyncCopier implements IPathCopier
     }
 
     /**
-     * Terminates the copy process by calling {@link Process#destroy()}, if a copy process is
-     * currently running. If no copy process is running, the method will return immediately.
+     * Terminates the copy process if it is still currently running. If no copy process is running,
+     * the method will return immediately. If many copy processes has been launched, only the last
+     * one will be terminated. No more copy operations can be started from that point.
      */
-    public final boolean terminate()
+    synchronized public final boolean terminate()
     {
-        // TODO 2008-06-02, Christian Ribeaud: Reimplement this once it is possible to run the
-        // killer process NOW in ProcessExecutionHelper.
-        // final Process copyProcess = copyProcessReference.get();
-        // if (copyProcess != null)
-        // {
-        // copyProcess.destroy();
-        // return true;
-        // } else
-        // {
-        // return false;
-        // }
-        return terminated.get();
+        isTerminatedExternally.set(true);
+        final ITerminable copyProcess = rsyncTerminator.get();
+        if (copyProcess != null)
+        {
+            return copyProcess.terminate();
+        } else
+        {
+            return false;
+        }
     }
 
     /**
@@ -234,12 +234,25 @@ public final class RsyncCopier implements IPathCopier
         final List<String> commandLine =
                 createCommandLine(sourcePath, sourceHostOrNull, destinationDirectory,
                         destinationHostOrNull);
-        terminated.set(false);
-        final ProcessResult processResult =
-                ProcessExecutionHelper.run(commandLine, operationLog, machineLog,
-                        MILLIS_TO_WAIT_BEFORE_TIMEOUT);
-        terminated.set(true);
+        IProcessHandler processHandler;
+        synchronized (this)
+        {
+            if (isTerminatedExternally.get())
+            {
+                // it can happen that terminate was called before us
+                return TERMINATED_STATUS;
+            }
+            processHandler =
+                    ProcessExecutionHelper.runUnblocking(commandLine, operationLog, machineLog,
+                            MILLIS_TO_WAIT_BEFORE_TIMEOUT);
+            rsyncTerminator.set(processHandler);
+        }
+        final ProcessResult processResult = processHandler.getResult();
         processResult.log();
+        if (isTerminatedExternally.get())
+        {
+            return TERMINATED_STATUS;
+        }
         return createStatus(processResult);
     }
 
@@ -265,6 +278,7 @@ public final class RsyncCopier implements IPathCopier
 
         final List<String> standardParameters = Arrays.asList("--archive", "--delete", "--inplace");
         final List<String> commandLineList = new ArrayList<String>();
+
         commandLineList.add(rsyncExecutable);
         commandLineList.addAll(standardParameters);
         if (isOverwriteMode())
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java
index 24a477c84d4..45f492126f3 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java
@@ -27,23 +27,16 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import ch.rinn.restrictions.Friend;
 import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
-import ch.systemsx.cisd.common.exceptions.Status;
-import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark;
-import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher;
-import ch.systemsx.cisd.common.logging.ISimpleLogger;
 import ch.systemsx.cisd.common.logging.LogCategory;
 import ch.systemsx.cisd.common.logging.LogInitializer;
 import ch.systemsx.cisd.common.test.LogMonitoringAppender;
 import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler;
 import ch.systemsx.cisd.common.utilities.ITerminable;
 import ch.systemsx.cisd.common.utilities.StoreItem;
-import ch.systemsx.cisd.datamover.filesystem.intf.FileStore;
-import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore;
-import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
 import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
-import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier;
-import ch.systemsx.cisd.datamover.filesystem.store.FileStoreLocal;
+import ch.systemsx.cisd.datamover.filesystem.remote.CopyActivityMonitor.IFileStoreMonitor;
 import ch.systemsx.cisd.datamover.intf.ITimingParameters;
 import ch.systemsx.cisd.datamover.testhelper.FileOperationsUtil;
 
@@ -52,6 +45,8 @@ import ch.systemsx.cisd.datamover.testhelper.FileOperationsUtil;
  * 
  * @author Bernd Rinn
  */
+@Friend(toClasses =
+    { CopyActivityMonitor.class, CopyActivityMonitor.IFileStoreMonitor.class })
 public class CopyActivityMonitorTest
 {
 
@@ -113,7 +108,10 @@ public class CopyActivityMonitorTest
 
         public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
         {
-            assertEquals(stopWhenFindYoungerRelativeExpected, stopWhenFindYoungerRelative);
+            if (stopWhenFindYoungerRelative != 0)
+            {
+                assertEquals(stopWhenFindYoungerRelativeExpected, stopWhenFindYoungerRelative);
+            }
             return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2;
         }
     }
@@ -162,68 +160,32 @@ public class CopyActivityMonitorTest
         }
     }
 
-    private IFileStore asFileStore(File directory, final ILastChangedChecker checker)
+    private IFileStoreMonitor asFileStore(File directory, final ILastChangedChecker checker)
     {
         IFileSysOperationsFactory factory = FileOperationsUtil.createTestFatory();
         return asFileStore(directory, checker, factory);
     }
 
-    private IFileStore asFileStore(File directory, final ILastChangedChecker checker,
+    private IFileStoreMonitor asFileStore(final File directory, final ILastChangedChecker checker,
             IFileSysOperationsFactory factory)
     {
-        final FileWithHighwaterMark fileWithHighwaterMark = new FileWithHighwaterMark(directory);
-        final FileStoreLocal localImpl =
-                new FileStoreLocal(fileWithHighwaterMark, "input-test", factory);
-        return new FileStore(fileWithHighwaterMark, null, "input-test", factory)
+        return new IFileStoreMonitor()
             {
-                public Status delete(StoreItem item)
-                {
-                    return localImpl.delete(item);
-                }
-
                 public boolean exists(StoreItem item)
                 {
-                    return localImpl.exists(item);
-                }
-
-                public long lastChanged(StoreItem item, long stopWhenFindYounger)
-                {
-                    throw new UnsupportedOperationException("lastChanged");
+                    return true;
+                    // return StoreItem.asFile(directory, item).exists();
                 }
 
-                public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
+                public long lastChanged(StoreItem item, long stopWhenFindYoungerRelative)
                 {
                     return checker.lastChangedRelative(item, stopWhenFindYoungerRelative);
                 }
 
-                public String tryCheckDirectoryFullyAccessible(long timeOutMillis)
-                {
-                    return localImpl.tryCheckDirectoryFullyAccessible(timeOutMillis);
-                }
-
-                public IExtendedFileStore tryAsExtended()
-                {
-                    return localImpl.tryAsExtended();
-                }
-
-                public IStoreCopier getCopier(IFileStore destinationDirectory)
-                {
-                    return localImpl.getCopier(destinationDirectory);
-                }
-
-                public String getLocationDescription(StoreItem item)
-                {
-                    return localImpl.getLocationDescription(item);
-                }
-
-                public StoreItem[] tryListSortByLastModified(ISimpleLogger loggerOrNull)
-                {
-                    return localImpl.tryListSortByLastModified(loggerOrNull);
-                }
-
-                public final HighwaterMarkWatcher getHighwaterMarkWatcher()
+                @Override
+                public String toString()
                 {
-                    return localImpl.getHighwaterMarkWatcher();
+                    return "[test store] " + directory.getPath();
                 }
             };
     }
@@ -469,4 +431,127 @@ public class CopyActivityMonitorTest
 
     }
 
+    @Test(groups =
+        { "slow" })
+    // check if copy is terminated if destination file is never visible
+    public void testActivityFileNeverExistsFail() throws Throwable
+    {
+        final StoreItem dummyItem = createDummyItem();
+        final IFileStoreMonitor store = new IFileStoreMonitor()
+            {
+                public boolean exists(StoreItem item)
+                {
+                    assertEquals(dummyItem, item);
+                    return false;
+                }
+
+                public long lastChanged(StoreItem item, long stopWhenYoungerThan)
+                {
+                    throw new UnsupportedOperationException(); // should be never called
+                }
+
+            };
+        checkCopyTerminated(store, dummyItem);
+    }
+
+    @Test(groups =
+        { "slow" })
+    // check if copy is terminated if lastChange always fails
+    public void testActivityLastChangeUnavailableFail() throws Throwable
+    {
+        final StoreItem dummyItem = createDummyItem();
+        final IFileStoreMonitor store = new IFileStoreMonitor()
+            {
+                public boolean exists(StoreItem item)
+                {
+                    assertEquals(dummyItem, item);
+                    return true;
+                }
+
+                public long lastChanged(StoreItem item, long stopWhenYoungerThan)
+                {
+                    assertEquals(dummyItem, item);
+                    return 0; // signalizes error
+                }
+
+            };
+        checkCopyTerminated(store, dummyItem);
+    }
+
+    @Test(groups =
+        { "slow" })
+    // check if copy is terminated if lastChange fails on even calls and returns the unchanged value
+    // on odd calls
+    public void testActivityLastChangeUnavailableOftenFail() throws Throwable
+    {
+        final StoreItem dummyItem = createDummyItem();
+        final IFileStoreMonitor store = new IFileStoreMonitor()
+            {
+                private boolean oddCall = true;
+
+                public boolean exists(StoreItem item)
+                {
+                    assertEquals(dummyItem, item);
+                    return true;
+                }
+
+                public long lastChanged(StoreItem item, long stopWhenYoungerThan)
+                {
+                    assertEquals(dummyItem, item);
+                    oddCall = !oddCall;
+                    return oddCall ? 10 : 0; // error or unchanged value
+                }
+
+            };
+        checkCopyTerminated(store, dummyItem);
+    }
+
+    @Test(groups =
+        { "slow" })
+    // happy case - check if copy is not terminated if lastChange returns changing values
+    public void testActivityChangingCopyCompletes() throws Throwable
+    {
+        final StoreItem dummyItem = createDummyItem();
+        final IFileStoreMonitor store = new IFileStoreMonitor()
+            {
+                private int counter = 1;
+
+                public boolean exists(StoreItem item)
+                {
+                    assertEquals(dummyItem, item);
+                    return true;
+                }
+
+                public long lastChanged(StoreItem item, long stopWhenYoungerThan)
+                {
+                    return counter++;
+                }
+
+            };
+        checkCopyTerminationStatus(store, dummyItem, false);
+    }
+
+    private void checkCopyTerminated(final IFileStoreMonitor store, StoreItem dummyItem)
+            throws InterruptedException
+    {
+        checkCopyTerminationStatus(store, dummyItem, true);
+    }
+
+    private void checkCopyTerminationStatus(final IFileStoreMonitor store, StoreItem dummyItem,
+            boolean expectedIsTerminated) throws InterruptedException
+    {
+        final MockTerminable copyProcess = new MockTerminable();
+        final ITimingParameters parameters = new MyTimingParameters(0);
+        final CopyActivityMonitor monitor = new CopyActivityMonitor(store, copyProcess, parameters);
+        monitor.start(dummyItem);
+        Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
+        monitor.stop();
+        assertEquals(expectedIsTerminated, copyProcess.isTerminated());
+    }
+
+    private static StoreItem createDummyItem()
+    {
+        return new StoreItem("dummy-item");
+    }
+
 }
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java
index 8fd63a372f4..fb763944cc1 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java
@@ -117,11 +117,9 @@ public final class RsyncCopierTest
     public void testRsyncRetriableFailure() throws IOException, InterruptedException
     {
         final int exitValue = 11;
-        final File buggyRsyncBinary = createRsync(exitValue);
-        final RsyncCopier copier = new RsyncCopier(buggyRsyncBinary, null, false, false);
-        final Status status = copier.copy(sourceFile, destinationDirectory);
-        assertEquals(StatusFlag.RETRIABLE_ERROR, status.getFlag());
-        assertEquals(RsyncExitValueTranslator.getMessage(exitValue), status.getMessage());
+        StatusFlag expectedStatus = StatusFlag.RETRIABLE_ERROR;
+
+        testRsyncFailure(exitValue, expectedStatus);
     }
 
     @Test(groups =
@@ -129,10 +127,18 @@ public final class RsyncCopierTest
     public void testRsyncFatalFailure() throws IOException, InterruptedException
     {
         final int exitValue = 1;
+        StatusFlag expectedStatus = StatusFlag.FATAL_ERROR;
+
+        testRsyncFailure(exitValue, expectedStatus);
+    }
+
+    private void testRsyncFailure(final int exitValue, StatusFlag expectedStatus)
+            throws IOException, InterruptedException
+    {
         final File buggyRsyncBinary = createRsync(exitValue);
         final RsyncCopier copier = new RsyncCopier(buggyRsyncBinary, null, false, false);
         final Status status = copier.copy(sourceFile, destinationDirectory);
-        assertEquals(StatusFlag.FATAL_ERROR, status.getFlag());
+        assertEquals(expectedStatus, status.getFlag());
         assertEquals(RsyncExitValueTranslator.getMessage(exitValue), status.getMessage());
     }
 
@@ -185,7 +191,7 @@ public final class RsyncCopierTest
         { "requires_unix", "slow" })
     public void testRsyncTermination() throws IOException, InterruptedException
     {
-        final File sleepyRsyncBinary = createRsync("2.6.9", "/bin/sleep 100");
+        final File sleepyRsyncBinary = createSleepProcess(100);
         final RsyncCopier copier = new RsyncCopier(sleepyRsyncBinary, null, false, false);
         final Thread thread = new Thread(new Runnable()
             {
@@ -218,4 +224,22 @@ public final class RsyncCopierTest
         assertEquals(RsyncCopier.TERMINATED_STATUS, status);
     }
 
+    @Test(groups =
+        { "requires_unix" })
+    public void testRsyncTerminationBeforeCopy() throws IOException, InterruptedException
+    {
+        final File sleepyRsyncBinary = createSleepProcess(100);
+        final RsyncCopier copier = new RsyncCopier(sleepyRsyncBinary, null, false, false);
+        // copy monitor can call this method before the copy starts
+        boolean wasRunning = copier.terminate();
+        assertEquals(false, wasRunning);
+        final Status status = copier.copy(sourceFile, destinationDirectory);
+        assertEquals(RsyncCopier.TERMINATED_STATUS, status);
+    }
+
+    private File createSleepProcess(int seconds) throws IOException, InterruptedException
+    {
+        return createRsync("2.6.9", "/bin/sleep " + seconds);
+    }
+
 }
-- 
GitLab