From 61c906a21428fdb141b9cf968e038ae78a6efc71 Mon Sep 17 00:00:00 2001
From: jakubs <jakubs>
Date: Fri, 1 Jun 2012 11:15:06 +0000
Subject: [PATCH] SP-73, BIS-21 improve recovery robustness

SVN: 25441
---
 .../cisd/etlserver/DssRegistrationLogger.java |  8 ++
 .../DataSetStorageAlgorithmRunner.java        | 42 ++++++----
 .../IDataSetOnErrorActionDecision.java        |  6 +-
 .../impl/DataSetRegistrationTransaction.java  | 20 +++--
 .../v2/JythonTopLevelDataSetHandlerV2.java    | 36 ++++-----
 .../recovery/DataSetStorageRecoveryInfo.java  | 28 ++++---
 .../JythonDropboxRecoveryTest.java            | 77 ++++++++++++++++++-
 7 files changed, 155 insertions(+), 62 deletions(-)

diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/DssRegistrationLogger.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/DssRegistrationLogger.java
index 4d5454b03a0..e90c523ef71 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/DssRegistrationLogger.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/DssRegistrationLogger.java
@@ -83,6 +83,14 @@ public class DssRegistrationLogger
         FileUtilities.appendToFile(file, logMessage.toString(), false);
     }
 
+    /**
+     * Logs class and message of exception.
+     */
+    public void log(Throwable ex, String message)
+    {
+        log(message+": " + ex.toString());
+    }
+
     /**
      * Logs a message, truncating the content if it exceeds the length limit.
      */
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java
index 8be39f89f93..3d4f1a198bc 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java
@@ -30,8 +30,6 @@ import ch.systemsx.cisd.etlserver.IStorageProcessorTransactional.IStorageProcess
 import ch.systemsx.cisd.etlserver.registrator.IDataSetOnErrorActionDecision.ErrorType;
 import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction;
 import ch.systemsx.cisd.etlserver.registrator.recovery.AutoRecoverySettings;
-import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStorageRecoveryInfo;
-import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStorageRecoveryInfo.RecoveryStage;
 import ch.systemsx.cisd.etlserver.registrator.recovery.IDataSetStorageRecoveryManager;
 import ch.systemsx.cisd.openbis.dss.generic.shared.IEncapsulatedOpenBISService;
 import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation;
@@ -56,6 +54,8 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
          */
         public void didRollbackStorageAlgorithmRunner(DataSetStorageAlgorithmRunner<T> algorithm,
                 Throwable ex, ErrorType errorType);
+
+        public void markReadyForRecovery(DataSetStorageAlgorithmRunner<T> algorithm, Throwable ex);
     }
 
     /**
@@ -233,8 +233,9 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
         {
             if (shouldUseAutoRecovery())
             {
-                rollbackAfterStorageConfirmation(ex);
+                rollbackDelegate.markReadyForRecovery(this, ex);
             }
+            dssRegistrationLog.log(ex, "Error during storage confirmation");
             return false;
             // There is nothing we can do without recovery
         }
@@ -263,6 +264,8 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
             postPreRegistrationHooks.executePreRegistration(persistentMapHolder);
         } catch (Throwable throwable)
         {
+            dssRegistrationLog.log(throwable, "Error in execution of pre registration hooks");
+
             rollbackDuringPreRegistration(throwable);
             return false;
         }
@@ -317,7 +320,7 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
             return false;
         }
 
-        return storeAfterRegistration();
+        return cleanPrecommitAndConfirmStorage();
 
         // confirm storage in AS
 
@@ -364,7 +367,7 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
     /**
      * Execute the post-registration part of the storage process
      */
-    public boolean storeAfterRegistration()
+    public boolean cleanPrecommitAndConfirmStorage()
     {
 
         cleanPrecommitDirectory();
@@ -442,13 +445,6 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
                 ErrorType.POST_REGISTRATION_ERROR);
     }
 
-    private void rollbackAfterStorageConfirmation(Throwable ex)
-    {
-        operationLog.error("Failed to confirm storage in as", ex);
-        rollbackDelegate.didRollbackStorageAlgorithmRunner(this, ex,
-                ErrorType.STORAGE_CONFIRMATION_ERROR);
-    }
-
     /**
      * Committed => Stored
      */
@@ -465,6 +461,9 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
             dssRegistrationLog.log("Data has been moved to the final store.");
         } catch (final Throwable throwable)
         {
+            rollbackDelegate.markReadyForRecovery(this, throwable);
+
+            dssRegistrationLog.log(throwable, "Error while storing committed datasets.");
             // Something has gone really wrong
             operationLog.error("Error while storing committed datasets", throwable);
             return false;
@@ -506,9 +505,17 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
 
         } catch (final Throwable throwable)
         {
+            dssRegistrationLog.log(throwable, "Error in commit of storage processors.");
             // Something has gone really wrong
             operationLog.error("Error while committing storage processors", throwable);
-            rollbackAfterStorageProcessorAndMetadataRegistration(throwable);
+
+            if (shouldUseAutoRecovery())
+            {
+                rollbackDelegate.markReadyForRecovery(this, throwable);
+            } else
+            {
+                rollbackAfterStorageProcessorAndMetadataRegistration(throwable);
+            }
             return false;
         }
         return true;
@@ -524,7 +531,14 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation>
 
         } catch (final Throwable throwable)
         {
-            rollbackDuringMetadataRegistration(throwable);
+            dssRegistrationLog.log("Error in registrating data in application server");
+            if (shouldUseAutoRecovery() && storageRecoveryManager.canRecoverFromError(throwable))
+            {
+                rollbackDelegate.markReadyForRecovery(this, throwable);
+            } else
+            {
+                rollbackDuringMetadataRegistration(throwable);
+            }
             return false;
         }
 
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/IDataSetOnErrorActionDecision.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/IDataSetOnErrorActionDecision.java
index f289b0fed5c..734e9d79806 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/IDataSetOnErrorActionDecision.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/IDataSetOnErrorActionDecision.java
@@ -27,9 +27,9 @@ public interface IDataSetOnErrorActionDecision
 {
     public enum ErrorType
     {
-        INVALID_DATA_SET, VALIDATION_SCRIPT_ERROR, OPENBIS_REGISTRATION_FAILURE,
-        REGISTRATION_SCRIPT_ERROR, STORAGE_PROCESSOR_ERROR, PRE_REGISTRATION_ERROR,
-        POST_REGISTRATION_ERROR, STORAGE_CONFIRMATION_ERROR
+        INVALID_DATA_SET, VALIDATION_SCRIPT_ERROR, REGISTRATION_SCRIPT_ERROR,
+        PRE_REGISTRATION_ERROR, OPENBIS_REGISTRATION_FAILURE, STORAGE_PROCESSOR_ERROR,
+        POST_REGISTRATION_ERROR;
     }
 
     UnstoreDataAction computeUndoAction(ErrorType errorType, Throwable failureOrNull);
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java
index 90f694e7aa9..1dadc843b45 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java
@@ -450,6 +450,7 @@ public class DataSetRegistrationTransaction<T extends DataSetInformation> implem
     /**
      * Delegate method called by the {@link DataSetStorageAlgorithmRunner}.
      */
+    @Override
     public void didRollbackStorageAlgorithmRunner(DataSetStorageAlgorithmRunner<T> algorithm,
             Throwable ex, ErrorType errorType)
     {
@@ -459,25 +460,22 @@ public class DataSetRegistrationTransaction<T extends DataSetInformation> implem
         IDataSetStorageRecoveryManager storageRecoveryManager =
                 registrationService.getRegistratorContext().getGlobalState()
                         .getStorageRecoveryManager();
-        boolean canRecover =
-                useAutoRecovery
-                        && (errorType == ErrorType.OPENBIS_REGISTRATION_FAILURE || errorType == ErrorType.STORAGE_CONFIRMATION_ERROR)
-                        && storageRecoveryManager.canRecoverFromError(ex);
-        if (useAutoRecovery && canRecover)
-        {
-            registrationService.registerNonFatalError(ex);
-            state = new RecoveryPendingTransactionState<T>(getStateAsLiveState());
-        } else
-        {
+        
             if (useAutoRecovery)
             {
                 storageRecoveryManager.removeCheckpoint(algorithm);
             }
             rollback();
             registrationService.didRollbackTransaction(this, algorithm, ex, errorType);
-        }
     }
 
+    @Override
+    public void markReadyForRecovery(DataSetStorageAlgorithmRunner<T> algorithm, Throwable ex)
+    {
+        registrationService.registerNonFatalError(ex);
+        state = new RecoveryPendingTransactionState<T>(getStateAsLiveState());
+    }
+    
     /**
      * Delegate method called by the {@link DataSetStorageAlgorithmRunner}. This implementation asks
      * the DataSetRegistrationService to register not just the data sets, but perform any creation
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java
index 4582d954819..eda4b013aa5 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java
@@ -17,10 +17,8 @@
 package ch.systemsx.cisd.etlserver.registrator.api.v2;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
-import java.util.List;
 
 import org.python.core.PyFunction;
 import org.python.util.PythonInterpreter;
@@ -190,17 +188,16 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend
 
         if (false == retryPeriodHasPassed(recoveryInfo))
         {
-            String message =
-                    "Found recovery information for " + incomingFileOriginal
-                            + ". The recovery won't happen as the retry period has not yet passed";
-            operationLog.info(message);
-
             return;
         }
 
         operationLog.info("Will recover from broken registration. Found marker file "
                 + recoveryMarkerFile + " and " + recoveryFile);
 
+        final DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state);
+
+        logger.log("Starting recovery at checkpoint " + recoveryInfo.getRecoveryStage());
+
         IDelegatedActionWithResult<Boolean> recoveryMarkerFileCleanupAction =
                 new IDelegatedActionWithResult<Boolean>()
                     {
@@ -223,8 +220,6 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend
                                 state.getFileOperations().move(recoveryMarkerFile,
                                         errorRecoveryMarkerFile);
 
-                                DssRegistrationLogger logger =
-                                        recoveryState.getRegistrationLogger(state);
                                 logger.log("Recovery failed. Giving up.");
                                 logger.registerFailure();
 
@@ -292,12 +287,7 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend
             final IDelegatedActionWithResult<Boolean> recoveryMarkerCleanup)
     {
 
-        DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state);
-
-        logger.log("The registration has been disturbed. Will try to recover...");
-
-        // rollback delegate
-        final List<Throwable> encounteredErrors = new ArrayList<Throwable>();
+        final DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state);
 
         // keeps track of whether we should keep or delete the recovery files.
         // we can delete if succesfully recovered, or rolledback.
@@ -310,7 +300,14 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend
                         DataSetStorageAlgorithmRunner<T> algorithm, Throwable ex,
                         ErrorType errorType)
                 {
-                    encounteredErrors.add(ex);
+                    // do nothing. recovery takes care of everything
+                }
+
+                @Override
+                public void markReadyForRecovery(DataSetStorageAlgorithmRunner<T> algorithm,
+                        Throwable ex)
+                {
+                    // don't have to do nothing.
                 }
             };
 
@@ -414,7 +411,7 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend
 
                 if (success)
                 {
-                    success = runner.storeAfterRegistration();
+                    success = runner.cleanPrecommitAndConfirmStorage();
                 }
                 if (success)
                 {
@@ -436,11 +433,8 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend
                 throw (Error) error;
             }
 
-            System.err.println("Caught an error! " + error);
-            error.printStackTrace();
             // in this case we should ignore, and run the recovery again after some time
-            encounteredErrors.add(error);
-            logger.log("Error in recovery: " + error);
+            logger.log(error, "Uncaught error during recovery");
         }
 
         cleanAfterwardsAction.execute(registrationSuccessful);
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/recovery/DataSetStorageRecoveryInfo.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/recovery/DataSetStorageRecoveryInfo.java
index ca0a38c131d..c31fe51f345 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/recovery/DataSetStorageRecoveryInfo.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/recovery/DataSetStorageRecoveryInfo.java
@@ -35,25 +35,23 @@ public class DataSetStorageRecoveryInfo implements Serializable
         /**
          * After precommit, before the registration in as
          */
-        PRECOMMIT,
+        PRECOMMIT("Precommit"),
 
         /**
          * After the registration has succeeded and post-registration hook executed
          */
-        POST_REGISTRATION_HOOK_EXECUTED,
+        POST_REGISTRATION_HOOK_EXECUTED("After post-registration"),
         /**
          * All files have been moved to the store, but application server has not been informed yet
          */
-        STORAGE_COMPLETED,
-        /**
-         * Storage has been confirmed in the application server
-         */
-        STORAGE_CONFIRMED,
-        /**
-         * The post-storage hooks have completed - that is not a real recovery checkpoint as there
-         * is nothing to do afterwards!
-         */
-        POST_STORAGE_HOOK_COMPLETED;
+        STORAGE_COMPLETED("Storage completed");
+
+        private String description;
+
+        private RecoveryStage(String description)
+        {
+            this.description = description;
+        }
 
         /**
          * @return true if this stage is before or equal other
@@ -70,6 +68,12 @@ public class DataSetStorageRecoveryInfo implements Serializable
         {
             return this.ordinal() < other.ordinal();
         }
+
+        @Override
+        public String toString()
+        {
+            return description;
+        }
     }
 
     private static final long serialVersionUID = 1L;
diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonDropboxRecoveryTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonDropboxRecoveryTest.java
index 2a8cd775a1d..b9f37da6f34 100644
--- a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonDropboxRecoveryTest.java
+++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonDropboxRecoveryTest.java
@@ -478,6 +478,53 @@ public class JythonDropboxRecoveryTest extends AbstractJythonDataSetHandlerTest
         return recoveryMarkerFile;
     }
 
+    @Test
+    public void testRecoveryFailureAtStorage()
+    {
+        RecoveryTestCase testCase = new RecoveryTestCase("No name");
+        setUpHomeDataBaseExpectations();
+
+        createData();
+
+        Properties properties =
+                createThreadPropertiesRelativeToScriptsFolder(testCase.dropboxScriptPath,
+                        testCase.overrideProperties);
+
+        createHandler(properties, true, false);
+
+        final RecordingMatcher<ch.systemsx.cisd.openbis.generic.shared.dto.AtomicEntityOperationDetails> atomicatOperationDetails =
+                new RecordingMatcher<ch.systemsx.cisd.openbis.generic.shared.dto.AtomicEntityOperationDetails>();
+
+        // create expectations
+        context.checking(new StorageErrorExpectations(atomicatOperationDetails));
+
+        handleAndMakeRecoverableImmediately(testCase);
+        
+        JythonHookTestTool.assertMessagesInWorkingDirectory(workingDirectory,
+                "pre_metadata_registration", "post_metadata_registration");
+
+        assertRecoveryFile(testCase.recoveryRertyCount, RecoveryInfoDateConstraint.ORIGINAL,
+                testCase.recoveryLastTry);
+        assertOriginalMarkerFileExists();
+
+        makeFileSystemAvailable(workingDirectory);
+        
+        // this recovery should succeed
+        handler.handle(markerFile);
+
+        assertStorageProcess(atomicatOperationDetails.recordedObject(), DATA_SET_CODE,
+                "sub_data_set_1", 0);
+        
+        assertNoOriginalMarkerFileExists();
+        assertNoRecoveryMarkerFile();
+
+        //
+        // // item in store
+        //
+        //
+        JythonHookTestTool.assertMessagesInWorkingDirectory(workingDirectory, "post_storage");
+    }
+    
     // INFO: test with recovery from error in storage confirmed
     @Test
     public void testRecoveryFailureAtStorageConfirmed()
@@ -524,7 +571,6 @@ public class JythonDropboxRecoveryTest extends AbstractJythonDataSetHandlerTest
         //
         //
         JythonHookTestTool.assertMessagesInWorkingDirectory(workingDirectory, "post_storage");
-
     }
 
     @DataProvider(name = "multipleCheckpointsDataProvider")
@@ -671,6 +717,27 @@ public class JythonDropboxRecoveryTest extends AbstractJythonDataSetHandlerTest
         }
     }
 
+    class StorageErrorExpectations extends AbstractExpectations
+    {
+        public StorageErrorExpectations(
+                final RecordingMatcher<AtomicEntityOperationDetails> atomicatOperationDetails)
+        {
+            super(atomicatOperationDetails);
+            prepareExpecatations();
+        }
+
+        private void prepareExpecatations()
+        {
+            initialExpectations();
+            registerDataSetsAndMakeFileSystemUnavailable();
+            
+            // the recovery should happen here
+
+            setStorageConfirmed(false);
+        }
+    }
+
+    
     class BasicRecoveryTestExpectations extends AbstractExpectations
     {
         final RecoveryTestCase testCase;
@@ -834,6 +901,14 @@ public class JythonDropboxRecoveryTest extends AbstractJythonDataSetHandlerTest
             will(returnValue(new AtomicEntityOperationResult()));
         }
 
+        protected void registerDataSetsAndMakeFileSystemUnavailable()
+        {
+            one(openBisService).drawANewUniqueID();
+            will(returnValue(new Long(1)));
+            one(openBisService).performEntityOperations(with(atomicatOperationDetails));
+            will(doAll(makeFileSystemUnavailableAction(), returnValue(new AtomicEntityOperationResult())));
+        }
+
         /**
          * @param shouldFail - if true the call to as should throw an exception
          */
-- 
GitLab