From 1ae0efba80bb2a868c39d98921d7bf5a1c5b0fca Mon Sep 17 00:00:00 2001
From: felmer <felmer>
Date: Thu, 15 May 2008 14:43:54 +0000
Subject: [PATCH] DMV-13 Introducing DataCompletedFilter and use it. New test
 for IncomingProcessorTest.

SVN: 6079
---
 .../cisd/datamover/IncomingProcessor.java     |  17 +-
 .../systemsx/cisd/datamover/Parameters.java   |  25 +++
 .../cisd/datamover/PropertyNames.java         |   4 +
 .../datamover/utils/DataCompletedFilter.java  | 168 ++++++++++++++++++
 .../cisd/datamover/IncomingProcessorTest.java |  76 +++++---
 5 files changed, 267 insertions(+), 23 deletions(-)
 create mode 100644 datamover/source/java/ch/systemsx/cisd/datamover/utils/DataCompletedFilter.java

diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java
index 5917df95293..4fd17c59f3f 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java
@@ -41,9 +41,11 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
 import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
 import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover;
 import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory;
+import ch.systemsx.cisd.datamover.utils.DataCompletedFilter;
 import ch.systemsx.cisd.datamover.utils.IStoreItemFilter;
 import ch.systemsx.cisd.datamover.utils.LocalBufferDirs;
 import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter;
+import ch.systemsx.cisd.datamover.utils.StoreItemFilterBank;
 
 /**
  * @author Tomasz Pylak
@@ -100,7 +102,20 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
         this.pathMover = factory.getMover();
         this.factory = factory;
         this.bufferDirs = bufferDirs;
-        this.storeItemFilter = new QuietPeriodFileFilter(incomingStore, parameters, timeProvider);
+        this.storeItemFilter = createFilter(timeProvider);
+    }
+
+    private IStoreItemFilter createFilter(ITimeProvider timeProvider)
+    {
+        StoreItemFilterBank filterBank = new StoreItemFilterBank();
+        filterBank.add(new QuietPeriodFileFilter(incomingStore, parameters, timeProvider));
+        String dataCompletedScript = parameters.getDataCompletedScript();
+        if (dataCompletedScript != null)
+        {
+            long timeout = parameters.getDataCompletedScriptTimeout();
+            filterBank.add(new DataCompletedFilter(incomingStore, dataCompletedScript, timeout));
+        }
+        return filterBank;
     }
     
     public TimerTask createRecoverableTimerTask()
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java
index e047c7da26c..b35d784422d 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java
@@ -55,11 +55,20 @@ import ch.systemsx.cisd.datamover.intf.ITimingParameters;
  */
 public final class Parameters implements ITimingParameters, IFileSysParameters
 {
+    private static final int DEFAULT_DATA_COMPLETED_SCRIPT_TIMEOUT = 600;
+
     private static final Logger operationLog =
             LogFactory.getLogger(LogCategory.OPERATION, Parameters.class);
 
     private static final Logger notificationLog =
             LogFactory.getLogger(LogCategory.NOTIFY, Parameters.class);
+    
+    @Option(longName = PropertyNames.DATA_COMPLETED_SCRIPT, metaVar = "EXEC", usage = "Optional script which checks whether incoming data is complete or not.")
+    private String dataCompletedScript;
+    
+    @Option(longName = PropertyNames.DATA_COMPLETED_SCRIPT_TIMEOUT, usage = "Timeout (in seconds) data completed script will be stopped "
+            + "[default: " + DEFAULT_DATA_COMPLETED_SCRIPT_TIMEOUT + "]", handler = MillisecondConversionOptionHandler.class)
+    private long dataCompletedScriptTimeout = toMillis(DEFAULT_DATA_COMPLETED_SCRIPT_TIMEOUT);
 
     /**
      * The name of the <code>rsync</code> executable to use for copy operations.
@@ -374,6 +383,12 @@ public final class Parameters implements ITimingParameters, IFileSysParameters
     private final void initParametersFromProperties()
     {
         final Properties serviceProperties = loadServiceProperties();
+        dataCompletedScript =
+                PropertyUtils.getProperty(serviceProperties,
+                        PropertyNames.DATA_COMPLETED_SCRIPT, dataCompletedScript);
+        dataCompletedScriptTimeout =
+                PropertyUtils.getPosLong(serviceProperties,
+                        PropertyNames.DATA_COMPLETED_SCRIPT_TIMEOUT, dataCompletedScriptTimeout);
         rsyncExecutable =
                 PropertyUtils.getProperty(serviceProperties, PropertyNames.RSYNC_EXECUTABLE,
                         rsyncExecutable);
@@ -485,6 +500,16 @@ public final class Parameters implements ITimingParameters, IFileSysParameters
         }
     }
 
+    public final String getDataCompletedScript()
+    {
+        return dataCompletedScript;
+    }
+
+    public final long getDataCompletedScriptTimeout()
+    {
+        return dataCompletedScriptTimeout;
+    }
+
     /**
      * @return The name of the <code>rsync</code> executable to use for copy operations.
      */
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/PropertyNames.java b/datamover/source/java/ch/systemsx/cisd/datamover/PropertyNames.java
index ecc3af5bc43..903ba005a3e 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/PropertyNames.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/PropertyNames.java
@@ -32,6 +32,10 @@ public final class PropertyNames
     static final String CHECK_INTERVAL = "check-interval";
 
     static final String CHECK_INTERVAL_INTERNAL = "check-interval-internal";
+    
+    static final String DATA_COMPLETED_SCRIPT = "data-completed-script";
+    
+    static final String DATA_COMPLETED_SCRIPT_TIMEOUT = "data-completed-script-timeout";
 
     static final String CLEANSING_REGEX = "cleansing-regex";
 
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/utils/DataCompletedFilter.java b/datamover/source/java/ch/systemsx/cisd/datamover/utils/DataCompletedFilter.java
new file mode 100644
index 00000000000..6264ba6aa63
--- /dev/null
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/utils/DataCompletedFilter.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2008 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.datamover.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import ch.systemsx.cisd.common.logging.LogCategory;
+import ch.systemsx.cisd.common.logging.LogFactory;
+import ch.systemsx.cisd.common.process.ProcessExecutionHelper;
+import ch.systemsx.cisd.common.process.ProcessResult;
+import ch.systemsx.cisd.common.utilities.AbstractHashable;
+import ch.systemsx.cisd.common.utilities.StoreItem;
+import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
+
+/**
+ * 
+ *
+ * @author Franz-Josef Elmer
+ */
+public class DataCompletedFilter implements IStoreItemFilter
+{
+    private static final class Status extends AbstractHashable
+    {
+        static final Status NULL = new Status();
+        
+        private final boolean ok;
+        private final boolean run;
+        private final boolean terminated;
+        private final int exitValue;
+        private final boolean blocked;
+        
+        private Status()
+        {
+            ok = true;
+            run = false;
+            terminated = false;
+            blocked = false;
+            exitValue = Integer.MAX_VALUE;
+        }
+
+        Status(ProcessResult processResult)
+        {
+            ok = processResult.isOK();
+            run = processResult.isRun();
+            terminated = processResult.isTerminated();
+            blocked = processResult.hasBlocked();
+            exitValue = processResult.exitValue();
+        }
+        
+        public final boolean isOk()
+        {
+            return ok;
+        }
+
+        public final boolean isRun()
+        {
+            return run;
+        }
+
+        public final boolean isTerminated()
+        {
+            return terminated;
+        }
+
+        public final int getExitValue()
+        {
+            return exitValue;
+        }
+
+        public final boolean isBlocked()
+        {
+            return blocked;
+        }
+    }
+    
+    private static final Logger operationLog =
+            LogFactory.getLogger(LogCategory.OPERATION, DataCompletedFilter.class);
+
+    private static final Logger machineLog =
+            LogFactory.getLogger(LogCategory.MACHINE, DataCompletedFilter.class);
+
+    private final static Logger notificationLog =
+            LogFactory.getLogger(LogCategory.NOTIFY, DataCompletedFilter.class);
+    
+    private final IFileStore fileStore;
+    private final String dataCompletedScript;
+    private final long dataCompletedScriptTimeout;
+    
+    private Status lastStatus = Status.NULL; 
+
+    public DataCompletedFilter(IFileStore fileStore, String dataCompletedScript,
+            long dataCompletedScriptTimeout)
+    {
+        if (dataCompletedScript == null)
+        {
+            throw new IllegalArgumentException("Data completed script not specified.");
+        }
+        this.dataCompletedScript = dataCompletedScript;
+        this.dataCompletedScriptTimeout = dataCompletedScriptTimeout;
+        if (fileStore == null)
+        {
+            throw new IllegalArgumentException("File store not specified.");
+        }
+        this.fileStore = fileStore;
+    }
+    
+    public boolean accept(StoreItem item)
+    {
+        List<String> commandLine = createCommand(item);
+        ProcessResult result =
+                ProcessExecutionHelper.run(commandLine, dataCompletedScriptTimeout, operationLog,
+                        machineLog);
+        Status status = new Status(result);
+        boolean ok = status.isOk();
+        if (status.equals(lastStatus) == false)
+        {
+            String message =
+                    "Processing status of data completed script has changed to " + status
+                            + ". Command line: " + commandLine;
+            if (ok)
+            {
+                if (notificationLog.isInfoEnabled())
+                {
+                    notificationLog.info(message);
+                }
+            } else
+            {
+                notificationLog.error(message);
+                result.log();
+            }
+            lastStatus = status;
+        }
+        return ok;
+    }
+
+    private List<String> createCommand(StoreItem item)
+    {
+        String absolutePath = StoreItem.asFile(fileStore.getPath(), item).getAbsolutePath();
+        String host = fileStore.tryGetHost();
+        List<String> command = new ArrayList<String>();
+        command.add("sh");
+        command.add(dataCompletedScript);
+        command.add(absolutePath);
+        if (host != null)
+        {
+            command.add(host);
+        }
+        return command;
+    }
+
+}
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java
index b57a0e7f14f..a4d973b1730 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark;
+import ch.systemsx.cisd.common.logging.LogInitializer;
 import ch.systemsx.cisd.common.utilities.FileUtilities;
 import ch.systemsx.cisd.common.utilities.IExitHandler;
 import ch.systemsx.cisd.common.utilities.MockTimeProvider;
@@ -48,20 +49,22 @@ public class IncomingProcessorTest
     private static final String COPY_COMPLETE_DIR = "copy-complete";
     private static final String READY_TO_MOVE_DIR = "ready-to-move";
     private static final String TEMP_DIR = "temp";
+    private static final String EXAMPLE_SCRIPT = "example-script";
 
     private Mockery context;
     private IFileSysOperationsFactory fileSysOpertationFactory;
     private IPathMover mover;
     private IPathRemover remover;
-    private DataMoverProcess process;
     private File incomingDir;
     private IExitHandler exitHandler;
     private File copyInProgressDir;
     private File copyCompleteDir;
+    private File exampleScript;
 
     @BeforeMethod
     public void setUp()
     {
+        LogInitializer.init();
         context = new Mockery();
         fileSysOpertationFactory = context.mock(IFileSysOperationsFactory.class);
         mover = context.mock(IPathMover.class);
@@ -70,6 +73,8 @@ public class IncomingProcessorTest
         
         FileUtilities.deleteRecursively(TEST_FOLDER);
         TEST_FOLDER.mkdirs();
+        exampleScript = new File(TEST_FOLDER, "example");
+        FileUtilities.writeToFile(exampleScript, "echo hello world");
         incomingDir = new File(TEST_FOLDER, INCOMING_DIR);
         incomingDir.mkdir();
         copyInProgressDir = new File(TEST_FOLDER, COPY_IN_PROGRESS_DIR);
@@ -78,26 +83,6 @@ public class IncomingProcessorTest
         copyCompleteDir.mkdir();
         new File(TEST_FOLDER, READY_TO_MOVE_DIR).mkdir();
         new File(TEST_FOLDER, TEMP_DIR).mkdir();
-        
-        String[] parameterArguments = new String[]
-            { "--" + PropertyNames.INCOMING_DIR, incomingDir.toString(), "-q", "1" };
-        Parameters parameters = new Parameters(parameterArguments, exitHandler);
-        LocalBufferDirs localBufferDirs =
-                new LocalBufferDirs(new FileWithHighwaterMark(TEST_FOLDER), COPY_IN_PROGRESS_DIR,
-                        COPY_COMPLETE_DIR, READY_TO_MOVE_DIR, TEMP_DIR);
-        context.checking(new Expectations()
-            {
-                {
-                    allowing(fileSysOpertationFactory).getMover();
-                    will(returnValue(mover));
-                    
-                    allowing(fileSysOpertationFactory).getRemover();
-                    will(returnValue(remover));
-                }
-            });
-        process =
-                IncomingProcessor.createMovingProcess(parameters, fileSysOpertationFactory,
-                        new MockTimeProvider(), localBufferDirs);
     }
     
     @AfterMethod
@@ -109,7 +94,7 @@ public class IncomingProcessorTest
     }
     
     @Test
-    public void testHappyCase() throws IOException
+    public void testHappyCaseWithoutDataCompletedScript() throws IOException
     {
         final File testDataFile = new File(incomingDir, "test-data.txt");
         testDataFile.createNewFile();
@@ -121,10 +106,57 @@ public class IncomingProcessorTest
                 }
             });
         
+        DataMoverProcess process =
+                createProcess("--" + PropertyNames.INCOMING_DIR, incomingDir.toString(), "-q", "1");
+        TimerTask dataMoverTimerTask = process.getDataMoverTimerTask();
+        dataMoverTimerTask.run(); // 1. round finds a file to process
+        dataMoverTimerTask.run(); // 2. round finds that quiet period is over
+        
+        context.assertIsSatisfied();
+    }
+    
+    
+    @Test
+    public void testHappyCaseWithDataCompletedScript() throws IOException
+    {
+        final File testDataFile = new File(incomingDir, "test-data.txt");
+        testDataFile.createNewFile();
+        context.checking(new Expectations()
+        {
+            {
+                one(mover).tryMove(testDataFile, copyCompleteDir, "");
+                will(returnValue(new File(copyCompleteDir, testDataFile.getName())));
+            }
+        });
+        
+        DataMoverProcess process =
+                createProcess("--" + PropertyNames.INCOMING_DIR, incomingDir.toString(), "-q", "1",
+                        "--" + PropertyNames.DATA_COMPLETED_SCRIPT, exampleScript.toString());
         TimerTask dataMoverTimerTask = process.getDataMoverTimerTask();
         dataMoverTimerTask.run(); // 1. round finds a file to process
         dataMoverTimerTask.run(); // 2. round finds that quiet period is over
         
         context.assertIsSatisfied();
     }
+    
+    private DataMoverProcess createProcess(String... args)
+    {
+        Parameters parameters = new Parameters(args, exitHandler);
+        LocalBufferDirs localBufferDirs =
+                new LocalBufferDirs(new FileWithHighwaterMark(TEST_FOLDER), COPY_IN_PROGRESS_DIR,
+                        COPY_COMPLETE_DIR, READY_TO_MOVE_DIR, TEMP_DIR);
+        context.checking(new Expectations()
+            {
+                {
+                    allowing(fileSysOpertationFactory).getMover();
+                    will(returnValue(mover));
+                    
+                    allowing(fileSysOpertationFactory).getRemover();
+                    will(returnValue(remover));
+                }
+            });
+        return IncomingProcessor.createMovingProcess(parameters, fileSysOpertationFactory,
+                        new MockTimeProvider(), localBufferDirs);
+        
+    }
 }
-- 
GitLab