From 6be425102e85d9ce6dd8fe8046c3876a5beae8fb Mon Sep 17 00:00:00 2001
From: tpylak <tpylak>
Date: Wed, 9 Jul 2008 11:45:47 +0000
Subject: [PATCH] DMV-32 FileStoreRemoteMounted.delete() should be monitored on
 a per-file basis

SVN: 7140
---
 .../ch/systemsx/cisd/datamover/DataMover.java |  18 ++--
 .../filesystem/RetryingPathRemover.java       | 100 +++++++++++++++++-
 .../store/FileStoreRemoteMounted.java         |  18 ++--
 .../filesystem/RetryingPathRemoverTest.java   |  79 ++++++++++++++
 4 files changed, 198 insertions(+), 17 deletions(-)
 create mode 100644 datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemoverTest.java

diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java
index caefb2d70ea..7d496c0fc04 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java
@@ -64,24 +64,30 @@ public final class DataMover
     @Private
     static final String PROCESS_MARKER_PREFIX = Constants.MARKER_PREFIX + "thread_";
 
-    private static final String TEMPLATE = PROCESS_MARKER_PREFIX + "%s_processing";
+    private static final String PROCESSING_MARKER_TEMPLATE =
+            PROCESS_MARKER_PREFIX + "%s_processing";
 
     @Private
-    static final String INCOMING_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "incoming");
+    static final String INCOMING_PROCESS_MARKER_FILENAME =
+            String.format(PROCESSING_MARKER_TEMPLATE, "incoming");
 
     @Private
-    static final String OUTGOING_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "outgoing");
+    static final String OUTGOING_PROCESS_MARKER_FILENAME =
+            String.format(PROCESSING_MARKER_TEMPLATE, "outgoing");
 
     @Private
-    static final String LOCAL_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "local");
+    static final String LOCAL_PROCESS_MARKER_FILENAME =
+            String.format(PROCESSING_MARKER_TEMPLATE, "local");
 
     @Private
-    static final String RECOVERY_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "recovery");
+    static final String RECOVERY_PROCESS_MARKER_FILENAME =
+            String.format(PROCESSING_MARKER_TEMPLATE, "recovery");
 
     /**
      * This marker file indicates that we are in a <i>shutdown</i> mode, started by the program.
      */
-    static final String SHUTDOWN_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "shutdown");
+    static final String SHUTDOWN_PROCESS_MARKER_FILENAME =
+            String.format(PROCESSING_MARKER_TEMPLATE, "shutdown");
 
     private static final String[] PROCESS_MARKER_FILENAMES =
                 { INCOMING_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME,
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemover.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemover.java
index 9e216548ea3..0a18ff99724 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemover.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemover.java
@@ -17,13 +17,26 @@
 package ch.systemsx.cisd.datamover.filesystem;
 
 import java.io.File;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
+import org.apache.commons.lang.time.DurationFormatUtils;
 import org.apache.log4j.Logger;
 
+import ch.rinn.restrictions.Private;
+import ch.systemsx.cisd.common.Constants;
+import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
+import ch.systemsx.cisd.common.concurrent.ExecutionResult;
+import ch.systemsx.cisd.common.concurrent.InactivityMonitor;
+import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor;
+import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IActivitySensor;
+import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IInactivityObserver;
 import ch.systemsx.cisd.common.exceptions.Status;
 import ch.systemsx.cisd.common.logging.LogCategory;
 import ch.systemsx.cisd.common.logging.LogFactory;
 import ch.systemsx.cisd.common.utilities.FileUtilities;
+import ch.systemsx.cisd.common.utilities.FileUtilities.SimpleActivityObserver;
 import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover;
 
 /**
@@ -35,12 +48,18 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover;
  */
 final class RetryingPathRemover implements IPathRemover
 {
+    private static final long DELETE_ONE_FILE_TIMEOUT_MILLIS =
+            Constants.MILLIS_TO_WAIT_BEFORE_TIMEOUT;
+
     private static final Logger operationLog =
             LogFactory.getLogger(LogCategory.OPERATION, RetryingPathMover.class);
 
     private static final Logger notificationLog =
             LogFactory.getLogger(LogCategory.NOTIFY, RetryingPathMover.class);
 
+    private final static ExecutorService executor =
+            new NamingThreadPoolExecutor("Deletion Thread").daemonize();
+
     private final int maxRetriesOnFailure;
 
     private final long millisToSleepOnFailure;
@@ -66,7 +85,7 @@ final class RetryingPathRemover implements IPathRemover
         boolean deletionOK = false;
         while (true)
         {
-            deletionOK = FileUtilities.deleteRecursively(path);
+            deletionOK = deleteAndMonitor(path);
             if (deletionOK)
             {
                 break;
@@ -104,4 +123,83 @@ final class RetryingPathRemover implements IPathRemover
             return Status.OK;
         }
     }
+
+    @Private
+    static class DeleteActivityDetector implements IActivitySensor, SimpleActivityObserver
+    {
+        private volatile long lastActivityTime = System.currentTimeMillis();
+
+        private final File path;
+
+        public DeleteActivityDetector(File path)
+        {
+            this.path = path;
+        }
+
+        // called each time when one file gets deleted
+        synchronized public void update()
+        {
+            lastActivityTime = System.currentTimeMillis();
+        }
+
+        synchronized public String describeInactivity(long now)
+        {
+            return "No delete activity of path " + path.getPath() + " for "
+                    + DurationFormatUtils.formatDurationHMS(now - lastActivityTime);
+        }
+
+        synchronized public long getTimeOfLastActivityMoreRecentThan(long thresholdMillis)
+        {
+            return lastActivityTime;
+        }
+
+    }
+
+    // if there is no progress during deletion, it will be stopped
+    private boolean deleteAndMonitor(final File path)
+    {
+        final DeleteActivityDetector sensor = new DeleteActivityDetector(path);
+        Callable<Boolean> deleteCallable = new Callable<Boolean>()
+            {
+                public Boolean call() throws Exception
+                {
+                    return FileUtilities.deleteRecursively(path, null, sensor);
+                }
+            };
+        return executeAndMonitor(sensor, deleteCallable, DELETE_ONE_FILE_TIMEOUT_MILLIS);
+    }
+
+    @Private
+    Boolean executeAndMonitor(final IActivitySensor sensor, final Callable<Boolean> deleteCallable,
+            final long inactivityThresholdMillis)
+    {
+        final Future<Boolean> deleteFuture = executor.submit(deleteCallable);
+
+        IInactivityObserver inactivityObserver = new IInactivityObserver()
+            {
+                // called when inactivity took longer than a timeout
+                public void update(long inactiveSinceMillis, String descriptionOfInactivity)
+                {
+                    operationLog.error(descriptionOfInactivity);
+                    deleteFuture.cancel(true);
+                }
+            };
+        InactivityMonitor inactivityMonitor =
+                new InactivityMonitor(sensor, inactivityObserver, inactivityThresholdMillis, true);
+
+        ExecutionResult<Boolean> executionResult =
+                ConcurrencyUtilities.getResult(deleteFuture, ConcurrencyUtilities.NO_TIMEOUT);
+        inactivityMonitor.stop();
+
+        Boolean result = executionResult.tryGetResult();
+        if (result != null)
+        {
+            return result.booleanValue();
+        } else
+        {
+            operationLog.error("Operation terminated with an error status: "
+                    + executionResult.getStatus());
+            return false;
+        }
+    }
 }
\ No newline at end of file
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java
index 2340c5da751..eb2df120d03 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java
@@ -84,12 +84,8 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
 
     public final Status delete(final StoreItem item)
     {
-        final Status statusOrNull = localImplMonitored.delete(item);
-        if (statusOrNull == null)
-        {
-            return Status.createRetriableError("Could not delete '" + item + "': time out.");
-        }
-        return statusOrNull;
+        // we do not run delete with a timeout
+        return localImpl.delete(item);
     }
 
     public final BooleanStatus exists(final StoreItem item)
@@ -103,12 +99,14 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
         return statusOrNull;
     }
 
-    public final StatusWithResult<Long> lastChanged(final StoreItem item, final long stopWhenFindYounger)
+    public final StatusWithResult<Long> lastChanged(final StoreItem item,
+            final long stopWhenFindYounger)
     {
-        final StatusWithResult<Long> statusOrNull = localImplMonitored.lastChanged(item, stopWhenFindYounger);
+        final StatusWithResult<Long> statusOrNull =
+                localImplMonitored.lastChanged(item, stopWhenFindYounger);
         if (statusOrNull == null)
         {
-            return StatusWithResult.<Long>createError(String.format(
+            return StatusWithResult.<Long> createError(String.format(
                     "Could not determine \"last changed time\" of %s: time out.", item));
         }
         return statusOrNull;
@@ -121,7 +119,7 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
                 localImplMonitored.lastChangedRelative(item, stopWhenFindYoungerRelative);
         if (statusOrNull == null)
         {
-            return StatusWithResult.<Long>createError(String.format(
+            return StatusWithResult.<Long> createError(String.format(
                     "Could not determine \"last changed time\" of %s: time out.", item));
         }
         return statusOrNull;
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemoverTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemoverTest.java
new file mode 100644
index 00000000000..5e5ed93584d
--- /dev/null
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/RetryingPathRemoverTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2008 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.datamover.filesystem;
+
+import java.io.File;
+import java.util.concurrent.Callable;
+
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import ch.rinn.restrictions.Friend;
+import ch.systemsx.cisd.common.logging.LogInitializer;
+import ch.systemsx.cisd.datamover.filesystem.RetryingPathRemover.DeleteActivityDetector;
+
+/**
+ * @author Tomasz Pylak
+ */
+@Friend(toClasses = RetryingPathRemover.class)
+public class RetryingPathRemoverTest
+{
+    @BeforeClass
+    public void init()
+    {
+        LogInitializer.init();
+    }
+
+    @Test
+    public void testDeleteAndMonitorHappyCase()
+    {
+        testDeleteAndMonitor(false);
+    }
+
+    @Test
+    public void testDeleteAndMonitorBlockedFail()
+    {
+        testDeleteAndMonitor(true);
+    }
+
+    public void testDeleteAndMonitor(final boolean shouldNotBlock)
+    {
+        RetryingPathRemover remover = new RetryingPathRemover(0, 0);
+        final long timeout = 20;
+        final DeleteActivityDetector sensor = new DeleteActivityDetector(new File("."));
+        Callable<Boolean> callable = new Callable<Boolean>()
+            {
+                public Boolean call() throws Exception
+                {
+                    for (int i = 0; i < 20; i++)
+                    {
+                        Thread.sleep(timeout / 2); // simulates delete
+                        sensor.update();
+                    }
+                    if (shouldNotBlock == false)
+                    {
+                        Thread.sleep(timeout * 4); // simulates blocked delete
+                        AssertJUnit.fail("operation should be killed till this point");
+                    }
+                    return true;
+                }
+            };
+        Boolean result = remover.executeAndMonitor(sensor, callable, timeout);
+        AssertJUnit.assertEquals(shouldNotBlock, result.booleanValue());
+    }
+}
\ No newline at end of file
-- 
GitLab