From d912a9b28bf7f4f79287d99ed739d68066c0912b Mon Sep 17 00:00:00 2001 From: jakubs <jakubs> Date: Tue, 12 Jun 2012 13:50:17 +0000 Subject: [PATCH] BIS-21 SP-108 first version of the jython processing retry SVN: 25658 --- .../DataSetRegistrationPersistentMap.java | 12 ++ .../JythonTopLevelDataSetHandler.java | 29 +++-- .../impl/DataSetRegistrationTransaction.java | 25 ++-- .../JythonDataSetRegistrationServiceV2.java | 15 ++- .../v2/JythonTopLevelDataSetHandlerV2.java | 111 +++++++++++++++++- .../JythonTopLevelDataSetRegistratorTest.java | 17 ++- 6 files changed, 174 insertions(+), 35 deletions(-) diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetRegistrationPersistentMap.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetRegistrationPersistentMap.java index 20d7aa5e3a7..d2d013f5faf 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetRegistrationPersistentMap.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetRegistrationPersistentMap.java @@ -3,6 +3,7 @@ package ch.systemsx.cisd.etlserver.registrator; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; +import java.util.Map.Entry; import java.util.Set; /** @@ -64,6 +65,17 @@ public class DataSetRegistrationPersistentMap implements Serializable } + /** + * Add all entries from other persistent map. + */ + public void putAll(DataSetRegistrationPersistentMap other) + { + for (Entry<String, Serializable> item : other.persistentMap.entrySet()) + { + this.persistentMap.put(item.getKey(), item.getValue()); + } + } + public Serializable remove(Object key) { return persistentMap.remove(key); diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java index 976dc0918e7..007c498b724 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java @@ -84,6 +84,12 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends */ ROLLBACK_PRE_REGISTRATION_FUNCTION_NAME("rollback_pre_registration", 2), + /** + * The name of the function to define to hook into the transaction rollback mechanism. + */ + SHOULD_RETRY_PROCESS_FUNCTION_NAME("should_retry_processing", 2), + + /** * The name of the function called when secondary transactions, DynamicTransactionQuery * objects, fail. @@ -165,7 +171,7 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends executeJythonScript(dataSetFile, scriptString, service); } - private void executeJythonScript(File dataSetFile, String scriptString, + protected void executeJythonScript(File dataSetFile, String scriptString, JythonDataSetRegistrationService<T> service) { // Configure the evaluator @@ -175,19 +181,9 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends // Invoke the evaluator interpreter.exec(scriptString); - executeJythonProcessFunction(service.interpreter); - verifyEvaluatorHookFunctions(interpreter); } - /** - * Execute the function that processes the data set. Subclasses may override. - */ - protected void executeJythonProcessFunction(PythonInterpreter interpreter) - { - - } - protected void verifyEvaluatorHookFunctions(PythonInterpreter interpreter) { for (JythonHookFunction function : JythonHookFunction.values()) @@ -215,7 +211,7 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends } } - protected void configureEvaluator(File dataSetFile, + private void configureEvaluator(File dataSetFile, JythonDataSetRegistrationService<T> service, PythonInterpreter interpreter) { interpreter.set(SERVICE_VARIABLE_NAME, service); @@ -340,6 +336,15 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends return function; } + public PyFunction getShouldRetryProcessFunction (DataSetRegistrationService<T> service) + { + PythonInterpreter interpreter = getInterpreterFromService(service); + PyFunction function = + tryJythonFunction(interpreter, + JythonHookFunction.SHOULD_RETRY_PROCESS_FUNCTION_NAME); + return function; + } + /** * If true than the old methods of jython hook functions will also be used (as a fallbacks in * case of the new methods or missing, or normally) diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java index ed745cab440..68395660b64 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/DataSetRegistrationTransaction.java @@ -401,12 +401,7 @@ public class DataSetRegistrationTransaction<T extends DataSetInformation> implem getStateAsLiveState().deleteFile(src); } - /** - * Marked as deprecated, to prevent using this method directly. Instead it should only be used - * implicitly as an implementation of the persistent map holder interface. - */ @Override - @Deprecated public DataSetRegistrationPersistentMap getPersistentMap() { return registrationContext; @@ -486,13 +481,13 @@ public class DataSetRegistrationTransaction<T extends DataSetInformation> implem IDataSetStorageRecoveryManager storageRecoveryManager = registrationService.getRegistratorContext().getGlobalState() .getStorageRecoveryManager(); - - if (useAutoRecovery) - { - storageRecoveryManager.removeCheckpoint(algorithm); - } - rollback(); - registrationService.didRollbackTransaction(this, algorithm, ex, errorType); + + if (useAutoRecovery) + { + storageRecoveryManager.removeCheckpoint(algorithm); + } + rollback(); + registrationService.didRollbackTransaction(this, algorithm, ex, errorType); } @Override @@ -501,7 +496,7 @@ public class DataSetRegistrationTransaction<T extends DataSetInformation> implem registrationService.registerNonFatalError(ex); state = new RecoveryPendingTransactionState<T>(getStateAsLiveState()); } - + /** * Delegate method called by the {@link DataSetStorageAlgorithmRunner}. This implementation asks * the DataSetRegistrationService to register not just the data sets, but perform any creation @@ -523,9 +518,9 @@ public class DataSetRegistrationTransaction<T extends DataSetInformation> implem @Override public EntityOperationsState didEntityOperationsSucceeded(TechId registrationId) { - return openBisService.didEntityOperationsSucceed(registrationId); + return openBisService.didEntityOperationsSucceed(registrationId); } - + public boolean isCommittedOrRolledback() { return isCommitted() || isRolledback(); diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java index a661607a6e6..d3ee3c4cf34 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java @@ -93,7 +93,20 @@ public class JythonDataSetRegistrationServiceV2<T extends DataSetInformation> return transactions.get(0); } } - + + + /** + * rolls back the existing transaction + */ + public void rollbackAndForgetTransaction() + { + DataSetRegistrationTransaction<T> transaction = getTransaction(); + + transaction.rollback(); + + transactions.remove(transaction); + } + /** * Commit any scheduled changes. */ diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java index 71e62f8d158..a89c7482116 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java @@ -41,9 +41,12 @@ import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner; import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner.IPrePostRegistrationHook; import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner.IRollbackDelegate; import ch.systemsx.cisd.etlserver.registrator.DataSetStorageRollbacker; +import ch.systemsx.cisd.etlserver.registrator.DistinctExceptionsCollection; import ch.systemsx.cisd.etlserver.registrator.IDataSetOnErrorActionDecision.ErrorType; import ch.systemsx.cisd.etlserver.registrator.MarkerFileUtility; +import ch.systemsx.cisd.etlserver.registrator.api.v1.IDataSetRegistrationTransaction; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.AbstractTransactionState; +import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.RollbackStack; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.RollbackStack.IRollbackStackDelegate; import ch.systemsx.cisd.etlserver.registrator.recovery.AbstractRecoveryState; @@ -105,8 +108,7 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend pythonInterpreter, globalState); } - @Override - protected void configureEvaluator( + private void configureEvaluator( File dataSetFile, ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler.JythonDataSetRegistrationService<T> service, PythonInterpreter interpreter) @@ -116,14 +118,109 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend if (service != null) { - interpreter.set(TRANSACTION_VARIABLE_NAME, service.transaction()); interpreter.set(FACTORY_VARIABLE_NAME, service.getDataSetRegistrationDetailsFactory()); } } @Override - protected void executeJythonProcessFunction(PythonInterpreter interpreter) + protected void executeJythonScript(File dataSetFile, String scriptString, + JythonDataSetRegistrationService<T> service) + { + + // Configure the evaluator + PythonInterpreter interpreter = service.getInterpreter(); + configureEvaluator(dataSetFile, service, interpreter); + + // Invoke the evaluator + interpreter.exec(scriptString); + + verifyEvaluatorHookFunctions(interpreter); + + PyFunction retryFunction = getShouldRetryProcessFunction(service); + + if (retryFunction == null) + { + + // in case when there is no retry function defined we just call the process and don't + // try to catch any kind of exceptions + executeJythonProcessFunction(service.getInterpreter(), service.transaction()); + } else + { + executeJythonProcessFunctionWithRetries(interpreter, + (JythonDataSetRegistrationServiceV2<T>) service, retryFunction); + } + } + + private static final int MAX_RETRY_COUNT = 3; + + private static final int RETRY_SLEEP = 100; + + private void executeJythonProcessFunctionWithRetries(PythonInterpreter interpreter, + JythonDataSetRegistrationServiceV2<T> service, PyFunction retryFunction) + { + DistinctExceptionsCollection errors = new DistinctExceptionsCollection(); + + // create initial transaction + service.transaction(); + + while (true) + { + Exception problem; + try + { + executeJythonProcessFunction(interpreter, service.getTransaction()); + // if function succeeded - than we are happy + return; + } catch (Exception ex) + { + problem = ex; + operationLog + .info("Exception occured during jython script processing. Will check if can retry.", + ex); + } + + int errorCount = errors.add(problem); + + // TODO: if the max retry count has happened - then we finish + // This actually will likely be removed if we would like to give the control about + // retries 100% to the user only + if (errorCount > MAX_RETRY_COUNT) + { + operationLog + .error("The jython script processing has failed too many times. Rolling back."); + throw CheckedExceptionTunnel.wrapIfNecessary(problem); + } + + DataSetRegistrationPersistentMap persistentMap = + service.getTransaction().getPersistentMap(); + + try + { + invokeFunction(retryFunction, persistentMap, problem); + } catch (Exception ex) + { + operationLog.error("The retry function has failed. Rolling back.", ex); + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + + // TODO: we dont have a way to check the result of the jython function. Thus we assume + // that the user agreed to do te retry. If that's an object to change we should check + // the result and only proceed if this was true + + service.rollbackAndForgetTransaction(); + // TODO: now the transaction is rolled back and everything should be in place again. + // should we catch some exceptions here? can we recover if whatever went wrong in here + + // creates the new transaction and propagates the values in the persistent map + service.transaction().getPersistentMap().putAll(persistentMap); + } + } + + protected void executeJythonProcessFunction(PythonInterpreter interpreter, + IDataSetRegistrationTransaction transaction) { + interpreter.set(TRANSACTION_VARIABLE_NAME, transaction); + String PROCESS_FUNCTION_NAME = "process"; try { @@ -345,14 +442,16 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend RollbackStack rollbackStack = recoveryState.getRollbackStack(); DataSetStorageAlgorithmRunner<T> runner = - new DataSetStorageAlgorithmRunner<T>(recoveryState.getIncomingDataSetFile(), // incoming + new DataSetStorageAlgorithmRunner<T>( + recoveryState.getIncomingDataSetFile(), // incoming dataSetStorageAlgorithms, // algorithms rollbackDelegate, // rollback delegate, rollbackStack, // rollbackstack logger, // registrationLogger state.getGlobalState().getOpenBisService(), // openBisService hookAdaptor, // the hooks - state.getGlobalState().getStorageRecoveryManager(), persistentMapHolder, state.getGlobalState()); + state.getGlobalState().getStorageRecoveryManager(), persistentMapHolder, + state.getGlobalState()); boolean registrationSuccessful = false; diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetRegistratorTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetRegistratorTest.java index 06e5f154fd2..654a9903235 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetRegistratorTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetRegistratorTest.java @@ -266,8 +266,23 @@ public class JythonTopLevelDataSetRegistratorTest extends AbstractJythonDataSetH } }; testCase.failurePoint = TestCaseParameters.FailurePoint.AFTER_GET_EXPERIMENT; - testCases.addAll(multipleVersionsOfTestCase(testCase)); + testCases.add(testCase); + + testCase = new TestCaseParameters("Postregistration hook has wrong signature."); + testCase.dropboxScriptPath = "testcase-postregistration-hook-wrong-signature.py"; + testCase.shouldThrowExceptionDuringRegistration = true; + testCase.exceptionAcceptor = new IPredicate<Exception>() + { + @Override + public boolean execute(Exception arg) + { + return arg.getMessage().contains("wrong number of arguments"); + } + }; + testCase.failurePoint = TestCaseParameters.FailurePoint.AT_THE_BEGINNING; + testCases.add(versionV2(testCase)); + testCase = new TestCaseParameters("Simple transaction explicit rollback"); testCase.dropboxScriptPath = "testcase-rollback.py"; testCase.failurePoint = TestCaseParameters.FailurePoint.AFTER_GET_EXPERIMENT; -- GitLab