diff --git a/common/source/java/ch/systemsx/cisd/common/interpreter/PythonInterpreter.java b/common/source/java/ch/systemsx/cisd/common/interpreter/PythonInterpreter.java index 705c773566e04ee53cee91549e95574615bb44c4..616ad2a4432415a2da3d24023b78ea6a561a3e7c 100644 --- a/common/source/java/ch/systemsx/cisd/common/interpreter/PythonInterpreter.java +++ b/common/source/java/ch/systemsx/cisd/common/interpreter/PythonInterpreter.java @@ -36,7 +36,7 @@ public class PythonInterpreter extends org.python.util.PythonInterpreter private Resources resources = new Resources(operationLog); - private PythonInterpreter() + protected PythonInterpreter() { } diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/JythonUtils.java b/common/source/java/ch/systemsx/cisd/common/utilities/JythonUtils.java index e7cb5adb5e0541ebb44d3b79032f2ba25fa6f4cf..02479f0bea2da643d98eb293842ee0f4193bff2b 100644 --- a/common/source/java/ch/systemsx/cisd/common/utilities/JythonUtils.java +++ b/common/source/java/ch/systemsx/cisd/common/utilities/JythonUtils.java @@ -19,9 +19,14 @@ package ch.systemsx.cisd.common.utilities; import java.util.HashMap; import java.util.Map; +import org.python.core.Py; import org.python.core.PyDictionary; +import org.python.core.PyFunction; +import org.python.core.PyObject; import org.python.core.PySequenceList; +import ch.systemsx.cisd.common.interpreter.PythonInterpreter; + /** * Jython utility methods. * @@ -45,4 +50,34 @@ public class JythonUtils } return javaMap; } + + /** + * Tries to get a function defined in jython script + * + * @return a Jython function object, or <code>null</code> if function doesn't exist. + */ + public static PyFunction tryJythonFunction(PythonInterpreter interpreter, String functionName) + { + try + { + PyFunction function = interpreter.get(functionName, PyFunction.class); + return function; + } catch (Exception e) + { + return null; + } + } + + /** + * Turn all arguments into a python objects, and calls the specified function. + */ + public static PyObject invokeFunction(PyFunction function, Object... args) + { + PyObject[] pyArgs = new PyObject[args.length]; + for (int i = 0; i < args.length; i++) + { + pyArgs[i] = Py.java2py(args[i]); + } + return function.__call__(pyArgs); + } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractOmniscientTopLevelDataSetRegistrator.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractOmniscientTopLevelDataSetRegistrator.java index cb5d6290d9bff303c3ad0bf9978fe538394af67f..a04c599d7d3cdd9d79d9921b79e80baa5789fa49 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractOmniscientTopLevelDataSetRegistrator.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractOmniscientTopLevelDataSetRegistrator.java @@ -422,19 +422,17 @@ public abstract class AbstractOmniscientTopLevelDataSetRegistrator<T extends Dat IImmutableCopier hardlinkMaker = new AssertionCatchingImmutableCopierWrapper(FastRecursiveHardLinkMaker.tryCreate()); boolean linkWasMade = false; - if (null != hardlinkMaker) + + // Use the hardlink maker if we got one + Status status = hardlinkMaker.copyImmutably(incomingDataSetFile, preStagingDir, null); + linkWasMade = status.isOK(); + if (status.isError()) { - // Use the hardlink maker if we got one - Status status = hardlinkMaker.copyImmutably(incomingDataSetFile, preStagingDir, null); - linkWasMade = status.isOK(); - if (status.isError()) - { - final String msg = - status.tryGetErrorMessage() == null ? "Unknown error" : status - .tryGetErrorMessage(); - operationLog.warn("Failed to make a hard link copy of " + incomingDataSetFile - + " to " + preStagingDir + ": " + msg); - } + final String msg = + status.tryGetErrorMessage() == null ? "Unknown error" : status + .tryGetErrorMessage(); + operationLog.warn("Failed to make a hard link copy of " + incomingDataSetFile + " to " + + preStagingDir + ": " + msg); } if (false == linkWasMade) diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractProgrammableTopLevelDataSetHandler.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractProgrammableTopLevelDataSetHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..5f56e20965f33a0102e11b37524d2120849d4d9f --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/AbstractProgrammableTopLevelDataSetHandler.java @@ -0,0 +1,738 @@ +/* + * Copyright 2012 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.etlserver.registrator; + +import java.io.File; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.exceptions.NotImplementedException; +import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; +import ch.systemsx.cisd.etlserver.DssRegistrationLogger; +import ch.systemsx.cisd.etlserver.IStorageProcessorTransactional.UnstoreDataAction; +import ch.systemsx.cisd.etlserver.ITopLevelDataSetRegistratorDelegate; +import ch.systemsx.cisd.etlserver.TopLevelDataSetRegistratorGlobalState; +import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner.IPrePostRegistrationHook; +import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner.IRollbackDelegate; +import ch.systemsx.cisd.etlserver.registrator.IDataSetOnErrorActionDecision.ErrorType; +import ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler.ProgrammableDropboxObjectFactory; +import ch.systemsx.cisd.etlserver.registrator.api.IJavaDataSetRegistrationDropboxV1; +import ch.systemsx.cisd.etlserver.registrator.api.v1.IDataSetRegistrationTransaction; +import ch.systemsx.cisd.etlserver.registrator.api.v1.SecondaryTransactionFailure; +import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.AbstractTransactionState; +import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction; +import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.RollbackStack; +import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.RollbackStack.IRollbackStackDelegate; +import ch.systemsx.cisd.etlserver.registrator.api.v2.DataSetRegistrationTransactionV2Delegate; +import ch.systemsx.cisd.etlserver.registrator.api.v2.IDataSetRegistrationTransactionV2; +import ch.systemsx.cisd.etlserver.registrator.api.v2.IJavaDataSetRegistrationDropboxV2; +import ch.systemsx.cisd.etlserver.registrator.api.v2.JythonDataSetRegistrationServiceV2; +import ch.systemsx.cisd.etlserver.registrator.monitor.DssRegistrationHealthMonitor; +import ch.systemsx.cisd.etlserver.registrator.recovery.AbstractRecoveryState; +import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStoragePrecommitRecoveryState; +import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStorageRecoveryInfo; +import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStorageRecoveryInfo.RecoveryStage; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; +import ch.systemsx.cisd.openbis.generic.shared.basic.EntityOperationsState; +import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; + +/** + * @author Pawel Glyzewski + */ +public abstract class AbstractProgrammableTopLevelDataSetHandler<T extends DataSetInformation> + extends AbstractOmniscientTopLevelDataSetRegistrator<T> +{ + private final int processMaxRetryCount; + + private final int processRetryPauseInSec; + + /** + * @param globalState + */ + protected AbstractProgrammableTopLevelDataSetHandler( + TopLevelDataSetRegistratorGlobalState globalState) + { + super(globalState); + + this.processMaxRetryCount = globalState.getThreadParameters().getProcessMaxRetryCount(); + this.processRetryPauseInSec = globalState.getThreadParameters().getProcessRetryPauseInSec(); + } + + @Override + public abstract boolean shouldNotAddToFaultyPathsOrNull(File file); + + @Override + abstract protected void handleDataSet(DataSetFile dataSetFile, + DataSetRegistrationService<T> service) throws Throwable; + + protected void executeProcessFunctionWithRetries( + IJavaDataSetRegistrationDropboxV2<T> v2Programm, + JythonDataSetRegistrationServiceV2<T> service, DataSetFile incomingDataSetFile) + { + DistinctExceptionsCollection errors = new DistinctExceptionsCollection(); + + // create initial transaction + service.transaction(); + + while (true) + { + waitUntilApplicationIsReady(incomingDataSetFile); + + Exception problem; + try + { + v2Programm.process(wrapTransaction(service.getTransaction())); + // if function succeeded - than we are happy + return; + } catch (Exception ex) + { + problem = ex; + operationLog + .info("Exception occured during jython script processing. Will check if can retry.", + ex); + } + + int errorCount = errors.add(problem); + + if (errorCount > processMaxRetryCount) + { + operationLog + .error("The jython script processing has failed too many times. Rolling back."); + throw CheckedExceptionTunnel.wrapIfNecessary(problem); + } else + { + operationLog.debug("The same error happened for the " + errorCount + + " time (max allowed is " + processMaxRetryCount + ")"); + } + + DataSetRegistrationContext registrationContext = + service.getTransaction().getRegistrationContext(); + + boolean retryFunctionResult = false; + try + { + retryFunctionResult = + v2Programm.shouldRetryProcessing(registrationContext, problem); + } catch (Exception ex) + { + operationLog.error("The retry function has failed. Rolling back.", ex); + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + + if (false == retryFunctionResult) + { + operationLog + .error("The should_retry_processing function returned false. Will not retry."); + throw CheckedExceptionTunnel.wrapIfNecessary(problem); + } + + service.rollbackAndForgetTransaction(); + // TODO: now the transaction is rolled back and everything should be in place again. + // should we catch some exceptions here? can we recover if whatever went wrong in here + + // creates the new transaction and propagates the values in the persistent map + service.transaction().getRegistrationContext().getPersistentMap() + .putAll(registrationContext.getPersistentMap()); + + waitTheRetryPeriod(processRetryPauseInSec); + } + } + + /** + * Wraps the transaction - to hide methods which we don't want to expose in the api. + */ + protected IDataSetRegistrationTransactionV2 wrapTransaction( + IDataSetRegistrationTransaction transaction) + { + IDataSetRegistrationTransactionV2 v2transaction = + new DataSetRegistrationTransactionV2Delegate(transaction); + return v2transaction; + } + + protected void waitUntilApplicationIsReady(DataSetFile incomingDataSetFile) + { + while (false == DssRegistrationHealthMonitor.getInstance().isApplicationReady( + incomingDataSetFile.getRealIncomingFile().getParentFile())) + { + waitTheRetryPeriod(10); + // do nothing. just repeat until the application is ready + } + } + + protected void waitTheRetryPeriod(int retryPeriod) + { + ConcurrencyUtilities.sleep(retryPeriod * 1000); // in seconds + } + + @Override + public void didRollbackTransaction(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction, + DataSetStorageAlgorithmRunner<T> algorithmRunner, Throwable ex) + { + try + { + getV2DropboxProgram(service).rollbackPreRegistration( + transaction.getRegistrationContext(), ex); + } catch (NotImplementedException e) + { + if (shouldUseOldJythonHookFunctions()) + { + IJavaDataSetRegistrationDropboxV1<T> v1Dropbox = getV1DropboxProgram(); + + try + { + v1Dropbox.rollbackTransaction(service, transaction, algorithmRunner, ex); + } catch (NotImplementedException exc) + { + try + { + // No Rollback transaction function was called, see if the rollback service + // function + // was + // defined, and call it. + v1Dropbox.rollbackService(service, ex); + } catch (NotImplementedException exception) + { + // silently ignore if function is not implemented + } + } + } + } + + super.didRollbackTransaction(service, transaction, algorithmRunner, ex); + } + + @Override + public void didCommitTransaction(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction) + { + super.didCommitTransaction(service, transaction); + try + { + getV2DropboxProgram(service).postStorage(transaction.getRegistrationContext()); + } catch (NotImplementedException e) + { + try + { + if (shouldUseOldJythonHookFunctions()) + { + getV1DropboxProgram().commitTransaction(service, transaction); + } + } catch (NotImplementedException ex) + { + // silently ignore if function is not implemented + } + } + } + + @Override + public void didPreRegistration(DataSetRegistrationService<T> service, + DataSetRegistrationContext.IHolder registrationContextHolder) + { + super.didPreRegistration(service, registrationContextHolder); + try + { + getV2DropboxProgram(service).preMetadataRegistration( + registrationContextHolder.getRegistrationContext()); + } catch (NotImplementedException e) + { + // ignore + } + } + + @Override + public void didPostRegistration(DataSetRegistrationService<T> service, + DataSetRegistrationContext.IHolder registrationContextHolder) + { + super.didPostRegistration(service, registrationContextHolder); + try + { + getV2DropboxProgram(service).postMetadataRegistration( + registrationContextHolder.getRegistrationContext()); + } catch (NotImplementedException e) + { + // ignore + } + } + + @Override + public void didEncounterSecondaryTransactionErrors(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction, + List<SecondaryTransactionFailure> secondaryErrors) + { + super.didEncounterSecondaryTransactionErrors(service, transaction, secondaryErrors); + + if (shouldUseOldJythonHookFunctions()) + { + try + { + getV1DropboxProgram().didEncounterSecondaryTransactionErrors(service, transaction, + secondaryErrors); + } catch (NotImplementedException e) + { + // silently ignore if function is not implemented + } + } + } + + @Override + protected void handleRecovery(final File incomingFileOriginal) + { + if (shouldUseOldJythonHookFunctions()) + { + super.handleRecovery(incomingFileOriginal); + return; + } + + // get the marker file + final File recoveryMarkerFile = + state.getGlobalState().getStorageRecoveryManager() + .getProcessingMarkerFile(incomingFileOriginal); + + // deserialize recovery state + final AbstractRecoveryState<T> recoveryState = + state.getGlobalState().getStorageRecoveryManager() + .extractRecoveryCheckpoint(recoveryMarkerFile); + + // then we should ensure that the recovery will actually take place itself! + final DataSetStorageRecoveryInfo recoveryInfo = + state.getGlobalState().getStorageRecoveryManager() + .getRecoveryFileFromMarker(recoveryMarkerFile); + + final File recoveryFile = recoveryInfo.getRecoveryStateFile(); + + if (false == recoveryFile.exists()) + { + operationLog.error("Recovery file does not exist. " + recoveryFile); + + throw new IllegalStateException("Recovery file " + recoveryFile + " doesn't exist"); + } + + if (false == retryPeriodHasPassed(recoveryInfo)) + { + return; + } + + operationLog.info("Will recover from broken registration. Found marker file " + + recoveryMarkerFile + " and " + recoveryFile); + + final DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state); + + logger.log("Starting recovery at checkpoint " + recoveryInfo.getRecoveryStage()); + + IRecoveryCleanupDelegate recoveryMarkerFileCleanupAction = new IRecoveryCleanupDelegate() + { + @Override + public void execute(boolean shouldStopRecovery, boolean shouldIncreaseTryCount) + { + if (false == shouldStopRecovery + && recoveryInfo.getTryCount() >= state.getGlobalState() + .getStorageRecoveryManager().getMaximumRertyCount()) + { + notificationLog.error("The dataset " + + recoveryState.getIncomingDataSetFile().getRealIncomingFile() + + " has failed to register. Giving up."); + deleteMarkerFile(); + + File errorRecoveryMarkerFile = + new File(recoveryMarkerFile.getParent(), + recoveryMarkerFile.getName() + ".ERROR"); + state.getFileOperations().move(recoveryMarkerFile, errorRecoveryMarkerFile); + + logger.log("Recovery failed. Giving up."); + logger.registerFailure(); + + } else + { + if (shouldStopRecovery) + { + deleteMarkerFile(); + + recoveryMarkerFile.delete(); + recoveryFile.delete(); + } else + { + // this replaces the recovery file with a new one with increased + // count + // FIXME: is this safe operation (how to assure, that it won't + // corrupt the recoveryMarkerFile?) + DataSetStorageRecoveryInfo rInfo = + state.getGlobalState().getStorageRecoveryManager() + .getRecoveryFileFromMarker(recoveryMarkerFile); + if (shouldIncreaseTryCount) + { + rInfo.increaseTryCount(); + } + rInfo.setLastTry(new Date()); + rInfo.writeToFile(recoveryMarkerFile); + } + } + } + + private void deleteMarkerFile() + { + File incomingMarkerFile = + MarkerFileUtility.getMarkerFileFromIncoming(recoveryState + .getIncomingDataSetFile().getRealIncomingFile()); + if (incomingMarkerFile.exists()) + { + + incomingMarkerFile.delete(); + } + } + }; + + PostRegistrationCleanUpAction cleanupAction = + new PostRegistrationCleanUpAction(recoveryState.getIncomingDataSetFile(), + new DoNothingDelegatedAction()); + + handleRecoveryState(recoveryInfo.getRecoveryStage(), recoveryState, cleanupAction, + recoveryMarkerFileCleanupAction); + } + + /** + * Check wheter the last retry + retry period < date.now + */ + private boolean retryPeriodHasPassed(final DataSetStorageRecoveryInfo recoveryInfo) + { + Calendar c = Calendar.getInstance(); + c.setTime(recoveryInfo.getLastTry()); + c.add(Calendar.SECOND, state.getGlobalState().getStorageRecoveryManager() + .getRetryPeriodInSeconds()); + return c.getTime().before(new Date()); + } + + private void handleRecoveryState(RecoveryStage recoveryStage, + final AbstractRecoveryState<T> recoveryState, + final IDelegatedActionWithResult<Boolean> cleanAfterwardsAction, + final IRecoveryCleanupDelegate recoveryMarkerCleanup) + { + + final DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state); + + // keeps track of whether we should keep or delete the recovery files. + // we can delete if succesfully recovered, or rolledback. + // This code is not executed at all in case of a recovery give-up + boolean shouldStopRecovery = false; + + // by default in case of failure we increase try count + boolean shouldIncreaseTryCount = true; + + IRollbackDelegate<T> rollbackDelegate = new IRollbackDelegate<T>() + { + @Override + public void didRollbackStorageAlgorithmRunner( + DataSetStorageAlgorithmRunner<T> algorithm, Throwable ex, + ErrorType errorType) + { + // do nothing. recovery takes care of everything + } + + @Override + public void markReadyForRecovery(DataSetStorageAlgorithmRunner<T> algorithm, + Throwable ex) + { + // don't have to do nothing. + } + }; + + // hookAdaptor + RecoveryHookAdaptor hookAdaptor = + getRecoveryHookAdaptor(recoveryState.getIncomingDataSetFile() + .getLogicalIncomingFile()); + + DataSetRegistrationContext.IHolder registrationContextHolder = + new DataSetRegistrationContext.IHolder() + { + + @Override + public DataSetRegistrationContext getRegistrationContext() + { + return new DataSetRegistrationContext(recoveryState.getPersistentMap(), + state.getGlobalState()); + } + }; + + ArrayList<DataSetStorageAlgorithm<T>> dataSetStorageAlgorithms = + recoveryState.getDataSetStorageAlgorithms(state); + + RollbackStack rollbackStack = recoveryState.getRollbackStack(); + + DataSetStorageAlgorithmRunner<T> runner = + new DataSetStorageAlgorithmRunner<T>( + recoveryState.getIncomingDataSetFile(), // incoming + dataSetStorageAlgorithms, // algorithms + rollbackDelegate, // rollback delegate, + rollbackStack, // rollbackstack + logger, // registrationLogger + state.getGlobalState().getOpenBisService(), // openBisService + hookAdaptor, // the hooks + state.getGlobalState().getStorageRecoveryManager(), + registrationContextHolder, state.getGlobalState()); + + boolean registrationSuccessful = false; + + operationLog.info("Recovery succesfully deserialized the state of the registration"); + try + { + EntityOperationsState entityOperationsState; + + if (recoveryStage.beforeOrEqual(RecoveryStage.PRECOMMIT)) + { + TechId registrationId = + ((DataSetStoragePrecommitRecoveryState<T>) recoveryState) + .getRegistrationId(); + if (registrationId == null) + { + throw new IllegalStateException( + "Recovery state cannot have null registrationId at the precommit phase"); + } + entityOperationsState = + state.getGlobalState().getOpenBisService() + .didEntityOperationsSucceed(registrationId); + } else + { + // if we are at the later stage than precommit - it means that the entity operations + // have succeeded + entityOperationsState = EntityOperationsState.OPERATION_SUCCEEDED; + } + + if (EntityOperationsState.IN_PROGRESS == entityOperationsState) + { + shouldIncreaseTryCount = false; + } else if (EntityOperationsState.NO_OPERATION == entityOperationsState) + { + operationLog + .info("Recovery hasn't found registration artifacts in the application server. Registration of metadata was not successful."); + + IRollbackStackDelegate rollbackStackDelegate = + new AbstractTransactionState.LiveTransactionRollbackDelegate(state + .getGlobalState().getStagingDir()); + + rollbackStack.setLockedState(false); + + rollbackStack.rollbackAll(rollbackStackDelegate); + UnstoreDataAction action = + state.getOnErrorActionDecision().computeUndoAction( + ErrorType.OPENBIS_REGISTRATION_FAILURE, null); + DataSetStorageRollbacker rollbacker = + new DataSetStorageRollbacker(state, operationLog, action, recoveryState + .getIncomingDataSetFile().getRealIncomingFile(), null, null, + ErrorType.OPENBIS_REGISTRATION_FAILURE); + operationLog.info(rollbacker.getErrorMessageForLog()); + rollbacker.doRollback(logger); + + logger.log("Operations haven't been registered in AS - recovery rollback"); + logger.registerFailure(); + + shouldStopRecovery = true; + + hookAdaptor.executePreRegistrationRollback(registrationContextHolder, null); + + finishRegistration(dataSetStorageAlgorithms, rollbackStack); + } else + { + + operationLog + .info("Recovery has found datasets in the AS. The registration of metadata was successful."); + + if (recoveryStage.before(RecoveryStage.POST_REGISTRATION_HOOK_EXECUTED)) + { + runner.postRegistration(); + } + + boolean success = true; + if (recoveryStage.before(RecoveryStage.STORAGE_COMPLETED)) + { + success = runner.commitAndStore(); + } + + if (success) + { + success = runner.cleanPrecommitAndConfirmStorage(); + } + if (success) + { + hookAdaptor.executePostStorage(registrationContextHolder); + + registrationSuccessful = true; + shouldStopRecovery = true; + + logger.registerSuccess(); + + // do the actions performed when the registration comes into terminal state. + finishRegistration(dataSetStorageAlgorithms, rollbackStack); + + } + } + } catch (Throwable error) + { + if ("org.jmock.api.ExpectationError".equals(error.getClass().getCanonicalName())) + { + // this exception can by only thrown by tests. + // propagation of the exception is essential to test some functionalities + // implemented like this to avoid dependency to jmock in production + throw (Error) error; + } + operationLog.error("Uncaught error during recovery", error); + // in this case we should ignore, and run the recovery again after some time + logger.log(error, "Uncaught error during recovery"); + } + + cleanAfterwardsAction.execute(registrationSuccessful); + + recoveryMarkerCleanup.execute(shouldStopRecovery, shouldIncreaseTryCount); + } + + private void finishRegistration(ArrayList<DataSetStorageAlgorithm<T>> dataSetStorageAlgorithms, + RollbackStack rollbackStack) + { + for (DataSetStorageAlgorithm<T> algorithm : dataSetStorageAlgorithms) + { + algorithm.getStagingFile().delete(); + } + rollbackStack.discard(); + } + + /** + * Set the factory available to the python script. Subclasses may want to override. + */ + @SuppressWarnings("unchecked") + protected IDataSetRegistrationDetailsFactory<T> createObjectFactory( + DataSetInformation userProvidedDataSetInformationOrNull) + { + return (IDataSetRegistrationDetailsFactory<T>) new ProgrammableDropboxObjectFactory<DataSetInformation>( + getRegistratorState(), userProvidedDataSetInformationOrNull) + { + @Override + protected DataSetInformation createDataSetInformation() + { + return new DataSetInformation(); + } + }; + } + + /** + * Create a registration service. + */ + @Override + protected DataSetRegistrationService<T> createDataSetRegistrationService( + DataSetFile incomingDataSetFile, DataSetInformation callerDataSetInformationOrNull, + IDelegatedActionWithResult<Boolean> cleanAfterwardsAction, + ITopLevelDataSetRegistratorDelegate delegate) + { + return createJythonDataSetRegistrationService(incomingDataSetFile, + callerDataSetInformationOrNull, cleanAfterwardsAction, delegate); + } + + /** + * Create a registration service. + */ + protected DataSetRegistrationService<T> createJythonDataSetRegistrationService( + DataSetFile incomingDataSetFile, + DataSetInformation userProvidedDataSetInformationOrNull, + IDelegatedActionWithResult<Boolean> cleanAfterwardsAction, + ITopLevelDataSetRegistratorDelegate delegate) + { + return new DataSetRegistrationService<T>(this, incomingDataSetFile, + this.createObjectFactory(userProvidedDataSetInformationOrNull), + cleanAfterwardsAction, delegate); + } + + protected abstract RecoveryHookAdaptor getRecoveryHookAdaptor(File incoming); + + protected abstract boolean shouldUseOldJythonHookFunctions(); + + protected abstract IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgram( + DataSetRegistrationService<T> service); + + protected abstract IJavaDataSetRegistrationDropboxV1<T> getV1DropboxProgram(); + + interface IRecoveryCleanupDelegate + { + void execute(boolean shouldStopRecovery, boolean shouldIncreaseTryCount); + } + + /** + * Create an adaptor that offers access to the recovery hook functions. + */ + protected abstract class RecoveryHookAdaptor implements IPrePostRegistrationHook<T> + { + protected abstract IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgramInternal(); + + protected final File incoming; + + public RecoveryHookAdaptor(File incoming) + { + this.incoming = incoming; + } + + @Override + public void executePreRegistration( + DataSetRegistrationContext.IHolder registrationContextHolder) + { + throw new NotImplementedException("Recovery cannot execute pre-registration hook."); + } + + @Override + public void executePostRegistration( + DataSetRegistrationContext.IHolder registrationContextHolder) + { + try + { + getV2DropboxProgramInternal().postMetadataRegistration( + registrationContextHolder.getRegistrationContext()); + } catch (NotImplementedException e) + { + // ignore + } + } + + /** + * This method does not belong to the IPrePostRegistrationHook interface. Is called directly + * by recovery. + */ + public void executePostStorage(DataSetRegistrationContext.IHolder registrationContextHolder) + { + try + { + getV2DropboxProgramInternal().postStorage( + registrationContextHolder.getRegistrationContext()); + } catch (NotImplementedException e) + { + // ignore + } + } + + public void executePreRegistrationRollback( + DataSetRegistrationContext.IHolder registrationContextHolder, Throwable throwable) + { + try + { + getV2DropboxProgramInternal().rollbackPreRegistration( + registrationContextHolder.getRegistrationContext(), throwable); + } catch (NotImplementedException e) + { + // ignore + } + } + } +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java index c493f5ea062a37dbc75eea9b7afd0c50a23c30ad..00305a4b5a3da014a09da4a87b75a509f8c36c5f 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/JythonTopLevelDataSetHandler.java @@ -25,16 +25,19 @@ import org.python.core.PyException; import org.python.core.PyFunction; import org.python.core.PyObject; -import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; +import ch.systemsx.cisd.common.exceptions.NotImplementedException; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.interpreter.PythonInterpreter; import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; import ch.systemsx.cisd.common.utilities.PropertyUtils; import ch.systemsx.cisd.etlserver.ITopLevelDataSetRegistratorDelegate; import ch.systemsx.cisd.etlserver.TopLevelDataSetRegistratorGlobalState; +import ch.systemsx.cisd.etlserver.registrator.api.IJavaDataSetRegistrationDropboxV1; import ch.systemsx.cisd.etlserver.registrator.api.v1.SecondaryTransactionFailure; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction; +import ch.systemsx.cisd.etlserver.registrator.api.v2.IJavaDataSetRegistrationDropboxV2; +import ch.systemsx.cisd.etlserver.registrator.api.v2.JythonAsJavaDataSetRegistrationDropboxV2Wrapper; import ch.systemsx.cisd.etlserver.registrator.monitor.DssRegistrationHealthMonitor; import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; @@ -44,9 +47,9 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; * @author Chandrasekhar Ramakrishnan */ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends - AbstractOmniscientTopLevelDataSetRegistrator<T> + AbstractProgrammableTopLevelDataSetHandler<T> { - protected enum JythonHookFunction + public enum JythonHookFunction { /** * The name of the v2 process function, that is executed during the registration. A @@ -60,7 +63,8 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends ROLLBACK_SERVICE_FUNCTION_NAME("rollback_service", 2), /** - * The name of the function to define to hook into the transaction rollback mechanism. V1 only. + * The name of the function to define to hook into the transaction rollback mechanism. V1 + * only. */ ROLLBACK_TRANSACTION_FUNCTION_NAME("rollback_transaction", 4), @@ -103,7 +107,7 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends DID_ENCOUNTER_SECONDARY_TRANSACTION_ERRORS_FUNCTION_NAME( "did_encounter_secondary_transaction_errors", 3); - String name; + public final String name; int argCount; @@ -141,6 +145,83 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends // The key for the script in the properties file public static final String SCRIPT_PATH_KEY = "script-path"; + private IJavaDataSetRegistrationDropboxV1<T> v1 = new IJavaDataSetRegistrationDropboxV1<T>() + { + @Override + public void rollbackTransaction(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction, + DataSetStorageAlgorithmRunner<T> algorithmRunner, Throwable ex) + { + PythonInterpreter interpreter = getInterpreterFromService(service); + + PyFunction function = + tryJythonFunction(interpreter, + JythonHookFunction.ROLLBACK_TRANSACTION_FUNCTION_NAME); + if (null != function) + { + invokeRollbackTransactionFunction(function, service, transaction, + algorithmRunner, ex); + } else + { + throw new NotImplementedException(); + } + } + + @Override + public void rollbackService(DataSetRegistrationService<T> service, Throwable ex) + { + PythonInterpreter interpreter = getInterpreterFromService(service); + PyFunction function = + tryJythonFunction(interpreter, + JythonHookFunction.ROLLBACK_SERVICE_FUNCTION_NAME); + if (null != function) + { + invokeRollbackServiceFunction(function, service, ex); + } else + { + throw new NotImplementedException(); + } + } + + @Override + public void commitTransaction(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction) + { + PythonInterpreter interpreter = getInterpreterFromService(service); + PyFunction function = + tryJythonFunction(interpreter, + JythonHookFunction.COMMIT_TRANSACTION_FUNCTION_NAME); + if (null != function) + { + invokeServiceTransactionFunction(function, service, transaction); + } else + { + throw new NotImplementedException(); + } + } + + @Override + public void didEncounterSecondaryTransactionErrors( + DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction, + List<SecondaryTransactionFailure> secondaryErrors) + { + PythonInterpreter interpreter = getInterpreterFromService(service); + PyFunction function = + tryJythonFunction( + interpreter, + JythonHookFunction.DID_ENCOUNTER_SECONDARY_TRANSACTION_ERRORS_FUNCTION_NAME); + if (null != function) + { + invokeDidEncounterSecondaryTransactionErrorsFunction(function, service, + transaction, secondaryErrors); + } else + { + throw new NotImplementedException(); + } + } + }; + protected final File scriptFile; /** @@ -195,21 +276,6 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends verifyEvaluatorHookFunctions(interpreter); } - protected void waitTheRetryPeriod(int retryPeriod) - { - ConcurrencyUtilities.sleep(retryPeriod * 1000); // in seconds - } - - protected void waitUntilApplicationIsReady(DataSetFile incomingDataSetFile) - { - while (false == DssRegistrationHealthMonitor.getInstance().isApplicationReady( - incomingDataSetFile.getRealIncomingFile().getParentFile())) - { - waitTheRetryPeriod(10); - // do nothing. just repeat until the application is ready - } - } - protected void verifyEvaluatorHookFunctions(PythonInterpreter interpreter) { for (JythonHookFunction function : JythonHookFunction.values()) @@ -282,60 +348,17 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends @Override protected void rollback(DataSetRegistrationService<T> service, Throwable throwable) { - PythonInterpreter interpreter = getInterpreterFromService(service); - PyFunction function = - tryJythonFunction(interpreter, JythonHookFunction.ROLLBACK_SERVICE_FUNCTION_NAME); - if (null != function) + try + { + v1.rollbackService(service, throwable); + } catch (NotImplementedException ex) { - invokeRollbackServiceFunction(function, service, throwable); + // ignore } super.rollback(service, throwable); } - @Override - public void didRollbackTransaction(DataSetRegistrationService<T> service, - DataSetRegistrationTransaction<T> transaction, - DataSetStorageAlgorithmRunner<T> algorithmRunner, Throwable ex) - { - invokeRollbackTransactionFunction(service, transaction, algorithmRunner, ex); - super.didRollbackTransaction(service, transaction, algorithmRunner, ex); - } - - @Override - public void didCommitTransaction(DataSetRegistrationService<T> service, - DataSetRegistrationTransaction<T> transaction) - { - super.didCommitTransaction(service, transaction); - invokeCommitTransactionFunction(service, transaction); - } - - @Override - public void didPreRegistration(DataSetRegistrationService<T> service, - DataSetRegistrationContext.IHolder registrationContextHolder) - { - super.didPreRegistration(service, registrationContextHolder); - invokePreRegistrationFunction(service, registrationContextHolder); - } - - @Override - public void didPostRegistration(DataSetRegistrationService<T> service, - DataSetRegistrationContext.IHolder registrationContextHolder) - { - super.didPostRegistration(service, registrationContextHolder); - invokePostRegistrationFunction(service, registrationContextHolder); - } - - @Override - public void didEncounterSecondaryTransactionErrors(DataSetRegistrationService<T> service, - DataSetRegistrationTransaction<T> transaction, - List<SecondaryTransactionFailure> secondaryErrors) - { - super.didEncounterSecondaryTransactionErrors(service, transaction, secondaryErrors); - - invokeDidEncounterSecondaryTransactionErrorsFunction(service, transaction, secondaryErrors); - } - // getters for v2 hook functions required for auto-recovery public PyFunction tryGetPostRegistrationFunction(DataSetRegistrationService<T> service) { @@ -375,111 +398,12 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends * If true than the old methods of jython hook functions will also be used (as a fallbacks in * case of the new methods or missing, or normally) */ + @Override protected boolean shouldUseOldJythonHookFunctions() { return true; } - private void invokeRollbackTransactionFunction(DataSetRegistrationService<T> service, - DataSetRegistrationTransaction<T> transaction, - DataSetStorageAlgorithmRunner<T> algorithmRunner, Throwable ex) - { - PyFunction function = getRollbackPreRegistrationFunction(service); - - if (null != function) - { - invokeTransactionFunctionWithContext(function, transaction, ex); - } else if (shouldUseOldJythonHookFunctions()) - { - PythonInterpreter interpreter = getInterpreterFromService(service); - - function = - tryJythonFunction(interpreter, - JythonHookFunction.ROLLBACK_TRANSACTION_FUNCTION_NAME); - if (null != function) - { - invokeRollbackTransactionFunction(function, service, transaction, algorithmRunner, - ex); - } else - { - // No Rollback transaction function was called, see if the rollback service function - // was - // defined, and call it. - function = - tryJythonFunction(interpreter, - JythonHookFunction.ROLLBACK_SERVICE_FUNCTION_NAME); - if (null != function) - { - invokeRollbackServiceFunction(function, service, ex); - } - } - } - } - - private void invokeCommitTransactionFunction(DataSetRegistrationService<T> service, - DataSetRegistrationTransaction<T> transaction) - { - PythonInterpreter interpreter = getInterpreterFromService(service); - - PyFunction function = tryGetPostStorageFunction(service); - - if (null != function) - { - invokeTransactionFunctionWithContext(function, transaction); - } else if (shouldUseOldJythonHookFunctions()) - { - function = - tryJythonFunction(interpreter, - JythonHookFunction.COMMIT_TRANSACTION_FUNCTION_NAME); - if (null != function) - { - invokeServiceTransactionFunction(function, service, transaction); - } - } - } - - private void invokePreRegistrationFunction(DataSetRegistrationService<T> service, - DataSetRegistrationContext.IHolder registrationContextHolder) - { - PythonInterpreter interpreter = getInterpreterFromService(service); - PyFunction function = - tryJythonFunction(interpreter, JythonHookFunction.PRE_REGISTRATION_FUNCTION_NAME); - - if (null != function) - { - invokeTransactionFunctionWithContext(function, registrationContextHolder); - } - } - - private void invokePostRegistrationFunction(DataSetRegistrationService<T> service, - DataSetRegistrationContext.IHolder registrationContextHolder) - { - PyFunction function = tryGetPostRegistrationFunction(service); - if (null != function) - { - invokeTransactionFunctionWithContext(function, registrationContextHolder); - } - } - - private void invokeDidEncounterSecondaryTransactionErrorsFunction( - DataSetRegistrationService<T> service, DataSetRegistrationTransaction<T> transaction, - List<SecondaryTransactionFailure> secondaryErrors) - { - if (shouldUseOldJythonHookFunctions()) - { - PythonInterpreter interpreter = getInterpreterFromService(service); - PyFunction function = - tryJythonFunction( - interpreter, - JythonHookFunction.DID_ENCOUNTER_SECONDARY_TRANSACTION_ERRORS_FUNCTION_NAME); - if (null != function) - { - invokeDidEncounterSecondaryTransactionErrorsFunction(function, service, - transaction, secondaryErrors); - } - } - } - protected PyFunction tryJythonFunction(PythonInterpreter interpreter, JythonHookFunction functionDefinition) { @@ -512,19 +436,6 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends invokeFunction(function, service, transaction); } - private void invokeTransactionFunctionWithContext(PyFunction function, - DataSetRegistrationContext.IHolder registrationContextHolder, Object... additionalArgs) - { - if (additionalArgs.length > 0) - { - invokeFunction(function, registrationContextHolder.getRegistrationContext(), - additionalArgs); - } else - { - invokeFunction(function, registrationContextHolder.getRegistrationContext()); - } - } - private void invokeDidEncounterSecondaryTransactionErrorsFunction(PyFunction function, DataSetRegistrationService<T> service, DataSetRegistrationTransaction<T> transaction, List<SecondaryTransactionFailure> secondaryErrors) @@ -545,28 +456,11 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends return function.__call__(pyArgs); } - /** - * Set the factory available to the python script. Subclasses may want to override. - */ - @SuppressWarnings("unchecked") - protected IDataSetRegistrationDetailsFactory<T> createObjectFactory( - PythonInterpreter interpreter, DataSetInformation userProvidedDataSetInformationOrNull) + public abstract static class ProgrammableDropboxObjectFactory<T extends DataSetInformation> + extends AbstractDataSetRegistrationDetailsFactory<T> { - return (IDataSetRegistrationDetailsFactory<T>) new JythonObjectFactory<DataSetInformation>( - getRegistratorState(), userProvidedDataSetInformationOrNull) - { - @Override - protected DataSetInformation createDataSetInformation() - { - return new DataSetInformation(); - } - }; - } - - public abstract static class JythonObjectFactory<T extends DataSetInformation> extends - AbstractDataSetRegistrationDetailsFactory<T> - { - public JythonObjectFactory(OmniscientTopLevelDataSetRegistratorState registratorState, + public ProgrammableDropboxObjectFactory( + OmniscientTopLevelDataSetRegistratorState registratorState, DataSetInformation userProvidedDataSetInformationOrNull) { super(registratorState, userProvidedDataSetInformationOrNull); @@ -600,15 +494,17 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends { private final PythonInterpreter interpreter; - public JythonDataSetRegistrationService(JythonTopLevelDataSetHandler<T> registrator, + public JythonDataSetRegistrationService( + AbstractProgrammableTopLevelDataSetHandler<T> registrator, DataSetFile incomingDataSetFile, DataSetInformation userProvidedDataSetInformationOrNull, IDelegatedActionWithResult<Boolean> globalCleanAfterwardsAction, ITopLevelDataSetRegistratorDelegate delegate, PythonInterpreter interpreter, TopLevelDataSetRegistratorGlobalState globalState) { - super(registrator, incomingDataSetFile, registrator.createObjectFactory(interpreter, - userProvidedDataSetInformationOrNull), globalCleanAfterwardsAction, delegate); + super(registrator, incomingDataSetFile, registrator + .createObjectFactory(userProvidedDataSetInformationOrNull), + globalCleanAfterwardsAction, delegate); interpreter.set(STATE_VARIABLE_NAME, globalState); this.interpreter = interpreter; } @@ -656,4 +552,24 @@ public class JythonTopLevelDataSetHandler<T extends DataSetInformation> extends return false; } + @Override + protected IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgram( + DataSetRegistrationService<T> service) + { + return new JythonAsJavaDataSetRegistrationDropboxV2Wrapper<T>( + getInterpreterFromService(service)); + } + + @Override + protected IJavaDataSetRegistrationDropboxV1<T> getV1DropboxProgram() + { + return v1; + } + + @Override + protected ch.systemsx.cisd.etlserver.registrator.AbstractProgrammableTopLevelDataSetHandler<T>.RecoveryHookAdaptor getRecoveryHookAdaptor( + File incoming) + { + throw new NotImplementedException(); + } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/IJavaDataSetRegistrationDropboxV1.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/IJavaDataSetRegistrationDropboxV1.java new file mode 100644 index 0000000000000000000000000000000000000000..36e20473818c1fdb1c5f170e54135f23e42bddb6 --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/IJavaDataSetRegistrationDropboxV1.java @@ -0,0 +1,44 @@ +/* + * Copyright 2012 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.etlserver.registrator.api; + +import java.util.List; + +import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationService; +import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner; +import ch.systemsx.cisd.etlserver.registrator.api.v1.SecondaryTransactionFailure; +import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; + +/** + * @author Pawel Glyzewski + */ +public interface IJavaDataSetRegistrationDropboxV1<T extends DataSetInformation> +{ + public void rollbackTransaction(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction, + DataSetStorageAlgorithmRunner<T> algorithmRunner, Throwable ex); + + public void rollbackService(DataSetRegistrationService<T> service, Throwable ex); + + public void commitTransaction(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction); + + public void didEncounterSecondaryTransactionErrors(DataSetRegistrationService<T> service, + DataSetRegistrationTransaction<T> transaction, + List<SecondaryTransactionFailure> secondaryErrors); +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationServiceV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationServiceV2.java new file mode 100644 index 0000000000000000000000000000000000000000..d6ba93711a02c00fd236205e7068e81c473bfc2f --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationServiceV2.java @@ -0,0 +1,63 @@ +/* + * Copyright 2012 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.etlserver.registrator.api.v2; + +import org.python.core.PyObject; + +import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; +import ch.systemsx.cisd.etlserver.ITopLevelDataSetRegistratorDelegate; +import ch.systemsx.cisd.etlserver.registrator.AbstractProgrammableTopLevelDataSetHandler; +import ch.systemsx.cisd.etlserver.registrator.DataSetFile; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; + +/** + * @author Pawel Glyzewski + */ +/* + * TODO: gpawel: this class shouldn't extend V2 jython service, it should be the other way around. + */ +public class DataSetRegistrationServiceV2<T extends DataSetInformation> extends + JythonDataSetRegistrationServiceV2<T> +{ + public DataSetRegistrationServiceV2(AbstractProgrammableTopLevelDataSetHandler<T> registrator, + DataSetFile incomingDataSetFile, + DataSetInformation userProvidedDataSetInformationOrNull, + IDelegatedActionWithResult<Boolean> globalCleanAfterwardsAction, + ITopLevelDataSetRegistratorDelegate delegate) + { + super(registrator, incomingDataSetFile, userProvidedDataSetInformationOrNull, + globalCleanAfterwardsAction, delegate, + new ch.systemsx.cisd.common.interpreter.PythonInterpreter() + { + @Override + public void set(String name, Object value) + { + } + + @Override + public void set(String name, PyObject value) + { + } + + @Override + public void releaseResources() + { + } + }, null); + } + +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationTransactionV2Delegate.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationTransactionV2Delegate.java index 93400bba286a4483319b968036cbafb36489fbd5..24da50e4e04c56283bfd159507d73718af5e605d 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationTransactionV2Delegate.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/DataSetRegistrationTransactionV2Delegate.java @@ -43,7 +43,7 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.api.internal.v1.ISpaceImmutab /** * @author Jakub Straszewski */ -class DataSetRegistrationTransactionV2Delegate implements IDataSetRegistrationTransactionV2 +public class DataSetRegistrationTransactionV2Delegate implements IDataSetRegistrationTransactionV2 { private IDataSetRegistrationTransaction transaction; diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/IJavaDataSetRegistrationDropboxV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/IJavaDataSetRegistrationDropboxV2.java new file mode 100644 index 0000000000000000000000000000000000000000..9c8174f20a4b5fa29c36fd1c439f3ef28e5a83b2 --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/IJavaDataSetRegistrationDropboxV2.java @@ -0,0 +1,43 @@ +/* + * Copyright 2012 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.etlserver.registrator.api.v2; + +import ch.systemsx.cisd.common.exceptions.NotImplementedException; +import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationContext; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; + +/** + * @author Pawel Glyzewski + */ +public interface IJavaDataSetRegistrationDropboxV2<T extends DataSetInformation> +{ + public void process(IDataSetRegistrationTransactionV2 transaction); + + public void postStorage(DataSetRegistrationContext context); + + public void preMetadataRegistration(DataSetRegistrationContext context); + + public void postMetadataRegistration(DataSetRegistrationContext context); + + public void rollbackPreRegistration(DataSetRegistrationContext context, Throwable throwable); + + public boolean isRetryFunctionDefined(); + + public boolean shouldRetryProcessing(DataSetRegistrationContext context, Exception problem) + throws NotImplementedException; + +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JavaTopLevelDataSetHandlerV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JavaTopLevelDataSetHandlerV2.java new file mode 100644 index 0000000000000000000000000000000000000000..e0d88e5f41883287db706215aebd324c87f66816 --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JavaTopLevelDataSetHandlerV2.java @@ -0,0 +1,167 @@ +/* + * Copyright 2012 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.etlserver.registrator.api.v2; + +import java.io.File; + +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; +import ch.systemsx.cisd.common.exceptions.NotImplementedException; +import ch.systemsx.cisd.common.interpreter.PythonInterpreter; +import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; +import ch.systemsx.cisd.common.utilities.PropertyUtils; +import ch.systemsx.cisd.etlserver.ITopLevelDataSetRegistratorDelegate; +import ch.systemsx.cisd.etlserver.TopLevelDataSetRegistratorGlobalState; +import ch.systemsx.cisd.etlserver.registrator.AbstractProgrammableTopLevelDataSetHandler; +import ch.systemsx.cisd.etlserver.registrator.DataSetFile; +import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationService; +import ch.systemsx.cisd.etlserver.registrator.api.IJavaDataSetRegistrationDropboxV1; +import ch.systemsx.cisd.etlserver.registrator.monitor.DssRegistrationHealthMonitor; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; + +/** + * @author Pawel Glyzewski + */ +public class JavaTopLevelDataSetHandlerV2<T extends DataSetInformation> extends + AbstractProgrammableTopLevelDataSetHandler<T> +{ + // The key for the script in the properties file + public static final String PROGRAM_CLASS_KEY = "program-class"; + + private Class<? extends IJavaDataSetRegistrationDropboxV2<T>> programClass; + + /** + * @param globalState + */ + @SuppressWarnings("unchecked") + protected JavaTopLevelDataSetHandlerV2(TopLevelDataSetRegistratorGlobalState globalState) + { + super(globalState); + + String className = + PropertyUtils.getMandatoryProperty(globalState.getThreadParameters() + .getThreadProperties(), PROGRAM_CLASS_KEY); + try + { + programClass = + (Class<? extends IJavaDataSetRegistrationDropboxV2<T>>) Class + .forName(className); + } catch (ClassNotFoundException ex) + { + throw ConfigurationFailureException.fromTemplate("Class '%s' does not exist!", + className); + } + + DssRegistrationHealthMonitor.getInstance(globalState.getOpenBisService(), + globalState.getRecoveryStateDir()); + } + + /** + * V2 registration framework -- do not put files that are scheduled for recovery into the faulty + * paths. + */ + @Override + public boolean shouldNotAddToFaultyPathsOrNull(File file) + { + // If there is a recovery marker file, do not add the file to faulty paths. + return hasRecoveryMarkerFile(file); + } + + @Override + protected void handleDataSet(DataSetFile dataSetFile, DataSetRegistrationService<T> service) + throws Throwable + { + IJavaDataSetRegistrationDropboxV2<T> v2Programm = getV2DropboxProgram(service); + + if (v2Programm.isRetryFunctionDefined()) + { + executeProcessFunctionWithRetries(v2Programm, + (JythonDataSetRegistrationServiceV2<T>) service, dataSetFile); + } else + { + // in case when there is no retry function defined we just call the process and don't + // try to catch any kind of exceptions + v2Programm.process(wrapTransaction(service.transaction())); + } + } + + @Override + protected RecoveryHookAdaptor getRecoveryHookAdaptor(File incoming) + { + return new RecoveryHookAdaptor(incoming) + { + IJavaDataSetRegistrationDropboxV2<T> v2ProgramInternal; + + @Override + protected IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgramInternal() + { + if (v2ProgramInternal == null) + { + v2ProgramInternal = getV2DropboxProgram(null); + } + + return v2ProgramInternal; + } + }; + + } + + /** + * Create a Jython registration service that includes access to the interpreter. + * + * @param pythonInterpreter + */ + protected DataSetRegistrationService<T> createJythonDataSetRegistrationServiceV2( + DataSetFile incomingDataSetFile, + DataSetInformation userProvidedDataSetInformationOrNull, + IDelegatedActionWithResult<Boolean> cleanAfterwardsAction, + ITopLevelDataSetRegistratorDelegate delegate, PythonInterpreter pythonInterpreter, + TopLevelDataSetRegistratorGlobalState globalState) + { + return new DataSetRegistrationServiceV2<T>(this, incomingDataSetFile, + userProvidedDataSetInformationOrNull, cleanAfterwardsAction, delegate); + } + + @Override + protected boolean shouldUseOldJythonHookFunctions() + { + return false; + } + + @Override + protected IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgram( + DataSetRegistrationService<T> service) + { + try + { + return programClass.newInstance(); + } catch (InstantiationException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } catch (IllegalAccessException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + @Override + protected IJavaDataSetRegistrationDropboxV1<T> getV1DropboxProgram() + { + throw new NotImplementedException( + "getV1DropboxProgram in not implemented, as this is V2 only handler."); + } +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonAsJavaDataSetRegistrationDropboxV2Wrapper.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonAsJavaDataSetRegistrationDropboxV2Wrapper.java new file mode 100644 index 0000000000000000000000000000000000000000..987f644647a6ef2a8cdc25ddcb27942e0d935db4 --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonAsJavaDataSetRegistrationDropboxV2Wrapper.java @@ -0,0 +1,199 @@ +/* + * Copyright 2012 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.etlserver.registrator.api.v2; + +import org.apache.log4j.Logger; +import org.python.core.PyFunction; +import org.python.core.PyInteger; +import org.python.core.PyObject; + +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.exceptions.NotImplementedException; +import ch.systemsx.cisd.common.interpreter.PythonInterpreter; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.utilities.JythonUtils; +import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationContext; +import ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler.JythonHookFunction; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; + +/** + * @author Pawel Glyzewski + */ +public class JythonAsJavaDataSetRegistrationDropboxV2Wrapper<T extends DataSetInformation> + implements IJavaDataSetRegistrationDropboxV2<T> +{ + protected static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + JythonAsJavaDataSetRegistrationDropboxV2Wrapper.class); + + private PythonInterpreter interpreter; + + private Boolean retryDefined; + + public JythonAsJavaDataSetRegistrationDropboxV2Wrapper(PythonInterpreter interpreter) + { + this.interpreter = interpreter; + } + + @Override + public void process(IDataSetRegistrationTransactionV2 transaction) + { + try + { + PyFunction function = + JythonUtils.tryJythonFunction(interpreter, + JythonHookFunction.PROCESS_FUNCTION.name); + if (function == null) + { + throw new IllegalStateException("Undefined process() function"); + } + JythonUtils.invokeFunction(function, transaction); + } catch (Exception e) + { + throw CheckedExceptionTunnel.wrapIfNecessary(e); + } + } + + @Override + public void postStorage(DataSetRegistrationContext context) + { + PyFunction function = + JythonUtils.tryJythonFunction(getInterpreter(), + JythonHookFunction.POST_STORAGE_FUNCTION_NAME.name); + if (function != null) + { + JythonUtils.invokeFunction(function, context); + } else + { + throw new NotImplementedException("postStorage is not implemented."); + } + } + + @Override + public void preMetadataRegistration(DataSetRegistrationContext context) + { + PyFunction function = + JythonUtils.tryJythonFunction(getInterpreter(), + JythonHookFunction.PRE_REGISTRATION_FUNCTION_NAME.name); + + if (null != function) + { + JythonUtils.invokeFunction(function, context); + } else + { + throw new NotImplementedException("preMetadataRegistration is not implemented."); + } + } + + @Override + public void postMetadataRegistration(DataSetRegistrationContext context) + { + PyFunction function = + JythonUtils.tryJythonFunction(getInterpreter(), + JythonHookFunction.POST_REGISTRATION_FUNCTION_NAME.name); + if (function != null) + { + JythonUtils.invokeFunction(function, context); + } else + { + throw new NotImplementedException("postMetadataRegistration is not implemented."); + } + } + + @Override + public void rollbackPreRegistration(DataSetRegistrationContext context, Throwable throwable) + { + PyFunction function = + JythonUtils.tryJythonFunction(getInterpreter(), + JythonHookFunction.ROLLBACK_PRE_REGISTRATION_FUNCTION_NAME.name); + if (function != null) + { + JythonUtils.invokeFunction(function, context, throwable); + } else + { + throw new NotImplementedException("rollbackPreRegistration is not implemented."); + } + } + + @Override + public boolean isRetryFunctionDefined() + { + if (retryDefined == null) + { + PyFunction function = + JythonUtils.tryJythonFunction(getInterpreter(), + JythonHookFunction.SHOULD_RETRY_PROCESS_FUNCTION_NAME.name); + if (function == null) + { + retryDefined = false; + } else + { + retryDefined = true; + } + } + return retryDefined; + } + + @Override + public boolean shouldRetryProcessing(DataSetRegistrationContext context, Exception problem) + { + if (false == isRetryFunctionDefined()) + { + throw new NotImplementedException("shouldRetryProcessing is not implemented."); + } + + PyFunction retryFunction = + JythonUtils.tryJythonFunction(getInterpreter(), + JythonHookFunction.SHOULD_RETRY_PROCESS_FUNCTION_NAME.name); + PyObject retryFunctionResult = null; + try + { + retryFunctionResult = JythonUtils.invokeFunction(retryFunction, context, problem); + } catch (Exception ex) + { + operationLog.error("The retry function has failed. Rolling back.", ex); + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + + if (retryFunctionResult == null) + { + operationLog + .error("The should_retry_processing function did not return anything. Will not retry."); + throw CheckedExceptionTunnel.wrapIfNecessary(problem); + } + + if (false == retryFunctionResult instanceof PyInteger) + { // the python booleans are returned as PyIntegers + operationLog + .error("The should_retry_processing function returned object of non-boolean type " + + retryFunctionResult.getClass() + ". Will not retry."); + throw CheckedExceptionTunnel.wrapIfNecessary(problem); + } + + if (((PyInteger) retryFunctionResult).asInt() != 0) + { + return true; + } + + return false; + } + + private PythonInterpreter getInterpreter() + { + return interpreter; + } +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java index ec19ef5296716171dc7be9de752306981af9f585..2acacc59622b47dd3ef3ef16571d80283622eac4 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonDataSetRegistrationServiceV2.java @@ -22,6 +22,7 @@ import ch.systemsx.cisd.common.interpreter.PythonInterpreter; import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; import ch.systemsx.cisd.etlserver.ITopLevelDataSetRegistratorDelegate; import ch.systemsx.cisd.etlserver.TopLevelDataSetRegistratorGlobalState; +import ch.systemsx.cisd.etlserver.registrator.AbstractProgrammableTopLevelDataSetHandler; import ch.systemsx.cisd.etlserver.registrator.DataSetFile; import ch.systemsx.cisd.etlserver.registrator.IDataSetRegistrationDetailsFactory; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction; @@ -36,7 +37,8 @@ public class JythonDataSetRegistrationServiceV2<T extends DataSetInformation> ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler.JythonDataSetRegistrationService<T> { - public JythonDataSetRegistrationServiceV2(JythonTopLevelDataSetHandlerV2<T> registrator, + public JythonDataSetRegistrationServiceV2( + AbstractProgrammableTopLevelDataSetHandler<T> registrator, DataSetFile incomingDataSetFile, DataSetInformation userProvidedDataSetInformationOrNull, IDelegatedActionWithResult<Boolean> globalCleanAfterwardsAction, @@ -92,20 +94,19 @@ public class JythonDataSetRegistrationServiceV2<T extends DataSetInformation> return transactions.get(0); } } - - + /** * rolls back the existing transaction */ public void rollbackAndForgetTransaction() { DataSetRegistrationTransaction<T> transaction = getTransaction(); - + transaction.rollback(); - + transactions.remove(transaction); } - + /** * Commit any scheduled changes. */ diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java index ab48d42746b4e456d45fd2d8f68e1a9c9f8d3eff..19387ce972ddea307818b49d98b825a442dc4ce0 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v2/JythonTopLevelDataSetHandlerV2.java @@ -17,45 +17,15 @@ package ch.systemsx.cisd.etlserver.registrator.api.v2; import java.io.File; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import org.python.core.PyFunction; -import org.python.core.PyInteger; -import org.python.core.PyObject; - -import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; -import ch.systemsx.cisd.common.exceptions.NotImplementedException; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.interpreter.PythonInterpreter; import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; -import ch.systemsx.cisd.etlserver.DssRegistrationLogger; -import ch.systemsx.cisd.etlserver.IStorageProcessorTransactional.UnstoreDataAction; import ch.systemsx.cisd.etlserver.ITopLevelDataSetRegistratorDelegate; import ch.systemsx.cisd.etlserver.TopLevelDataSetRegistratorGlobalState; import ch.systemsx.cisd.etlserver.registrator.DataSetFile; -import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationContext; import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationService; -import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithm; -import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner; -import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner.IPrePostRegistrationHook; -import ch.systemsx.cisd.etlserver.registrator.DataSetStorageAlgorithmRunner.IRollbackDelegate; -import ch.systemsx.cisd.etlserver.registrator.DataSetStorageRollbacker; -import ch.systemsx.cisd.etlserver.registrator.DistinctExceptionsCollection; -import ch.systemsx.cisd.etlserver.registrator.IDataSetOnErrorActionDecision.ErrorType; -import ch.systemsx.cisd.etlserver.registrator.MarkerFileUtility; -import ch.systemsx.cisd.etlserver.registrator.api.v1.IDataSetRegistrationTransaction; -import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.AbstractTransactionState; -import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.RollbackStack; -import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.RollbackStack.IRollbackStackDelegate; -import ch.systemsx.cisd.etlserver.registrator.recovery.AbstractRecoveryState; -import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStoragePrecommitRecoveryState; -import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStorageRecoveryInfo; -import ch.systemsx.cisd.etlserver.registrator.recovery.DataSetStorageRecoveryInfo.RecoveryStage; import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; -import ch.systemsx.cisd.openbis.generic.shared.basic.EntityOperationsState; -import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; /** * A top-level data set handler that runs a python (jython) script to register data sets. @@ -65,11 +35,6 @@ import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extends ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler<T> { - - private final int processMaxRetryCount; - - private final int processRetryPauseInSec; - /** * Constructor. * @@ -78,8 +43,6 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend public JythonTopLevelDataSetHandlerV2(TopLevelDataSetRegistratorGlobalState globalState) { super(globalState); - this.processMaxRetryCount = globalState.getThreadParameters().getProcessMaxRetryCount(); - this.processRetryPauseInSec = globalState.getThreadParameters().getProcessRetryPauseInSec(); } /** @@ -132,147 +95,29 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend protected void executeJythonScript(DataSetFile dataSetFile, String scriptString, JythonDataSetRegistrationService<T> service) { - // Configure the evaluator PythonInterpreter interpreter = service.getInterpreter(); + IJavaDataSetRegistrationDropboxV2<T> v2Programm = + new JythonAsJavaDataSetRegistrationDropboxV2Wrapper<T>(interpreter); + // Invoke the evaluator interpreter.exec(scriptString); verifyEvaluatorHookFunctions(interpreter); - PyFunction retryFunction = getShouldRetryProcessFunction(service); - - if (retryFunction == null) + if (false == v2Programm.isRetryFunctionDefined()) { - // in case when there is no retry function defined we just call the process and don't // try to catch any kind of exceptions - executeJythonProcessFunction(service.getInterpreter(), service.transaction()); + v2Programm.process(wrapTransaction(service.transaction())); } else { - executeJythonProcessFunctionWithRetries(interpreter, - (JythonDataSetRegistrationServiceV2<T>) service, retryFunction, dataSetFile); + executeProcessFunctionWithRetries(v2Programm, + (JythonDataSetRegistrationServiceV2<T>) service, dataSetFile); } } - private void executeJythonProcessFunctionWithRetries(PythonInterpreter interpreter, - JythonDataSetRegistrationServiceV2<T> service, PyFunction retryFunction, - DataSetFile incomingDataSetFile) - { - DistinctExceptionsCollection errors = new DistinctExceptionsCollection(); - - // create initial transaction - service.transaction(); - - while (true) - { - waitUntilApplicationIsReady(incomingDataSetFile); - - Exception problem; - try - { - executeJythonProcessFunction(interpreter, service.getTransaction()); - // if function succeeded - than we are happy - return; - } catch (Exception ex) - { - problem = ex; - operationLog - .info("Exception occured during jython script processing. Will check if can retry.", - ex); - } - - int errorCount = errors.add(problem); - - if (errorCount > processMaxRetryCount) - { - operationLog - .error("The jython script processing has failed too many times. Rolling back."); - throw CheckedExceptionTunnel.wrapIfNecessary(problem); - } else - { - operationLog.debug("The same error happened for the " + errorCount - + " time (max allowed is " + processMaxRetryCount + ")"); - } - - DataSetRegistrationContext registrationContext = - service.getTransaction().getRegistrationContext(); - - PyObject retryFunctionResult = null; - try - { - retryFunctionResult = invokeFunction(retryFunction, registrationContext, problem); - } catch (Exception ex) - { - operationLog.error("The retry function has failed. Rolling back.", ex); - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - - if (retryFunctionResult == null) - { - operationLog - .error("The should_retry_processing function did not return anything. Will not retry."); - throw CheckedExceptionTunnel.wrapIfNecessary(problem); - } - - if (retryFunctionResult instanceof PyInteger == false) - { // the python booleans are returned as PyIntegers - operationLog - .error("The should_retry_processing function returned object of non-boolean type " - + retryFunctionResult.getClass() + ". Will not retry."); - throw CheckedExceptionTunnel.wrapIfNecessary(problem); - } - - if (((PyInteger) retryFunctionResult).asInt() == 0) - { - operationLog - .error("The should_retry_processing function returned false. Will not retry."); - throw CheckedExceptionTunnel.wrapIfNecessary(problem); - } - - service.rollbackAndForgetTransaction(); - // TODO: now the transaction is rolled back and everything should be in place again. - // should we catch some exceptions here? can we recover if whatever went wrong in here - - // creates the new transaction and propagates the values in the persistent map - service.transaction().getRegistrationContext().getPersistentMap() - .putAll(registrationContext.getPersistentMap()); - - waitTheRetryPeriod(processRetryPauseInSec); - } - } - - protected void executeJythonProcessFunction(PythonInterpreter interpreter, - IDataSetRegistrationTransaction transaction) - { - try - { - PyFunction function = - tryJythonFunction(interpreter, JythonHookFunction.PROCESS_FUNCTION); - if (function == null) - { - throw new IllegalStateException("Undefined process() function"); - } - IDataSetRegistrationTransactionV2 v2transaction = wrapTransaction(transaction); - invokeFunction(function, v2transaction); - } catch (Exception e) - { - throw CheckedExceptionTunnel.wrapIfNecessary(e); - } - } - - /** - * Wraps the transaction - to hide methods which we don't want to expose in the api. - */ - protected IDataSetRegistrationTransactionV2 wrapTransaction( - IDataSetRegistrationTransaction transaction) - { - IDataSetRegistrationTransactionV2 v2transaction = - new DataSetRegistrationTransactionV2Delegate(transaction); - return v2transaction; - } - @Override protected boolean shouldUseOldJythonHookFunctions() { @@ -298,408 +143,46 @@ public class JythonTopLevelDataSetHandlerV2<T extends DataSetInformation> extend } @Override - protected void handleRecovery(final File incomingFileOriginal) + protected IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgram( + DataSetRegistrationService<T> service) { - // get the marker file - final File recoveryMarkerFile = - state.getGlobalState().getStorageRecoveryManager() - .getProcessingMarkerFile(incomingFileOriginal); - - // deserialize recovery state - final AbstractRecoveryState<T> recoveryState = - state.getGlobalState().getStorageRecoveryManager() - .extractRecoveryCheckpoint(recoveryMarkerFile); - - // then we should ensure that the recovery will actually take place itself! - final DataSetStorageRecoveryInfo recoveryInfo = - state.getGlobalState().getStorageRecoveryManager() - .getRecoveryFileFromMarker(recoveryMarkerFile); - - final File recoveryFile = recoveryInfo.getRecoveryStateFile(); - - if (false == recoveryFile.exists()) - { - operationLog.error("Recovery file does not exist. " + recoveryFile); - - throw new IllegalStateException("Recovery file " + recoveryFile + " doesn't exist"); - } - - if (false == retryPeriodHasPassed(recoveryInfo)) - { - return; - } - - operationLog.info("Will recover from broken registration. Found marker file " - + recoveryMarkerFile + " and " + recoveryFile); - - final DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state); - - logger.log("Starting recovery at checkpoint " + recoveryInfo.getRecoveryStage()); - - IRecoveryCleanupDelegate recoveryMarkerFileCleanupAction = new IRecoveryCleanupDelegate() - { - @Override - public void execute(boolean shouldStopRecovery, boolean shouldIncreaseTryCount) - { - if (false == shouldStopRecovery - && recoveryInfo.getTryCount() >= state.getGlobalState() - .getStorageRecoveryManager().getMaximumRertyCount()) - { - notificationLog.error("The dataset " - + recoveryState.getIncomingDataSetFile().getRealIncomingFile() - + " has failed to register. Giving up."); - deleteMarkerFile(); - - File errorRecoveryMarkerFile = - new File(recoveryMarkerFile.getParent(), - recoveryMarkerFile.getName() + ".ERROR"); - state.getFileOperations().move(recoveryMarkerFile, errorRecoveryMarkerFile); - - logger.log("Recovery failed. Giving up."); - logger.registerFailure(); - - } else - { - if (shouldStopRecovery) - { - deleteMarkerFile(); - - recoveryMarkerFile.delete(); - recoveryFile.delete(); - } else - { - // this replaces the recovery file with a new one with increased - // count - // FIXME: is this safe operation (how to assure, that it won't - // corrupt the recoveryMarkerFile?) - DataSetStorageRecoveryInfo rInfo = - state.getGlobalState().getStorageRecoveryManager() - .getRecoveryFileFromMarker(recoveryMarkerFile); - if (shouldIncreaseTryCount) - { - rInfo.increaseTryCount(); - } - rInfo.setLastTry(new Date()); - rInfo.writeToFile(recoveryMarkerFile); - } - } - } - - private void deleteMarkerFile() - { - File incomingMarkerFile = - MarkerFileUtility.getMarkerFileFromIncoming(recoveryState - .getIncomingDataSetFile().getRealIncomingFile()); - if (incomingMarkerFile.exists()) - { - - incomingMarkerFile.delete(); - } - } - }; - - PostRegistrationCleanUpAction cleanupAction = - new PostRegistrationCleanUpAction(recoveryState.getIncomingDataSetFile(), - new DoNothingDelegatedAction()); - - handleRecoveryState(recoveryInfo.getRecoveryStage(), recoveryState, cleanupAction, - recoveryMarkerFileCleanupAction); - } - - interface IRecoveryCleanupDelegate - { - void execute(boolean shouldStopRecovery, boolean shouldIncreaseTryCount); + return new JythonAsJavaDataSetRegistrationDropboxV2Wrapper<T>( + getInterpreterFromService(service)); } - /** - * Check wheter the last retry + retry period < date.now - */ - private boolean retryPeriodHasPassed(final DataSetStorageRecoveryInfo recoveryInfo) - { - Calendar c = Calendar.getInstance(); - c.setTime(recoveryInfo.getLastTry()); - c.add(Calendar.SECOND, state.getGlobalState().getStorageRecoveryManager() - .getRetryPeriodInSeconds()); - return c.getTime().before(new Date()); - } - - private void handleRecoveryState(RecoveryStage recoveryStage, - final AbstractRecoveryState<T> recoveryState, - final IDelegatedActionWithResult<Boolean> cleanAfterwardsAction, - final IRecoveryCleanupDelegate recoveryMarkerCleanup) + @Override + protected ch.systemsx.cisd.etlserver.registrator.AbstractProgrammableTopLevelDataSetHandler<T>.RecoveryHookAdaptor getRecoveryHookAdaptor( + File incoming) { - - final DssRegistrationLogger logger = recoveryState.getRegistrationLogger(state); - - // keeps track of whether we should keep or delete the recovery files. - // we can delete if succesfully recovered, or rolledback. - // This code is not executed at all in case of a recovery give-up - boolean shouldStopRecovery = false; - - // by default in case of failure we increase try count - boolean shouldIncreaseTryCount = true; - - IRollbackDelegate<T> rollbackDelegate = new IRollbackDelegate<T>() + return new RecoveryHookAdaptor(incoming) { - @Override - public void didRollbackStorageAlgorithmRunner( - DataSetStorageAlgorithmRunner<T> algorithm, Throwable ex, - ErrorType errorType) - { - // do nothing. recovery takes care of everything - } + IJavaDataSetRegistrationDropboxV2<T> v2ProgramInternal; @Override - public void markReadyForRecovery(DataSetStorageAlgorithmRunner<T> algorithm, - Throwable ex) + protected IJavaDataSetRegistrationDropboxV2<T> getV2DropboxProgramInternal() { - // don't have to do nothing. - } - }; - - // hookAdaptor - RecoveryHookAdaptor hookAdaptor = - new RecoveryHookAdaptor(recoveryState.getIncomingDataSetFile() - .getLogicalIncomingFile()); - - DataSetRegistrationContext.IHolder registrationContextHolder = - new DataSetRegistrationContext.IHolder() + if (v2ProgramInternal == null) { + PythonInterpreter internalInterpreter = + PythonInterpreter.createIsolatedPythonInterpreter(); + // interpreter.execute script - @Override - public DataSetRegistrationContext getRegistrationContext() - { - return new DataSetRegistrationContext(recoveryState.getPersistentMap(), - state.getGlobalState()); - } - }; - - ArrayList<DataSetStorageAlgorithm<T>> dataSetStorageAlgorithms = - recoveryState.getDataSetStorageAlgorithms(state); - - RollbackStack rollbackStack = recoveryState.getRollbackStack(); + configureEvaluator(incoming, null, internalInterpreter); - DataSetStorageAlgorithmRunner<T> runner = - new DataSetStorageAlgorithmRunner<T>( - recoveryState.getIncomingDataSetFile(), // incoming - dataSetStorageAlgorithms, // algorithms - rollbackDelegate, // rollback delegate, - rollbackStack, // rollbackstack - logger, // registrationLogger - state.getGlobalState().getOpenBisService(), // openBisService - hookAdaptor, // the hooks - state.getGlobalState().getStorageRecoveryManager(), - registrationContextHolder, - state.getGlobalState()); - - boolean registrationSuccessful = false; - - operationLog.info("Recovery succesfully deserialized the state of the registration"); - try - { - EntityOperationsState entityOperationsState; - - if (recoveryStage.beforeOrEqual(RecoveryStage.PRECOMMIT)) - { - TechId registrationId = - ((DataSetStoragePrecommitRecoveryState<T>) recoveryState) - .getRegistrationId(); - if (registrationId == null) - { - throw new IllegalStateException( - "Recovery state cannot have null registrationId at the precommit phase"); - } - entityOperationsState = - state.getGlobalState().getOpenBisService() - .didEntityOperationsSucceed(registrationId); - } else - { - // if we are at the later stage than precommit - it means that the entity operations - // have succeeded - entityOperationsState = EntityOperationsState.OPERATION_SUCCEEDED; - } - - if (EntityOperationsState.IN_PROGRESS == entityOperationsState) - { - shouldIncreaseTryCount = false; - } else if (EntityOperationsState.NO_OPERATION == entityOperationsState) - { - operationLog - .info("Recovery hasn't found registration artifacts in the application server. Registration of metadata was not successful."); + // Load the script + String scriptString = FileUtilities.loadToString(scriptFile); - IRollbackStackDelegate rollbackStackDelegate = - new AbstractTransactionState.LiveTransactionRollbackDelegate(state - .getGlobalState().getStagingDir()); + // Invoke the evaluator + internalInterpreter.exec(scriptString); - rollbackStack.setLockedState(false); - - rollbackStack.rollbackAll(rollbackStackDelegate); - UnstoreDataAction action = - state.getOnErrorActionDecision().computeUndoAction( - ErrorType.OPENBIS_REGISTRATION_FAILURE, null); - DataSetStorageRollbacker rollbacker = - new DataSetStorageRollbacker(state, operationLog, action, recoveryState - .getIncomingDataSetFile().getRealIncomingFile(), null, null, - ErrorType.OPENBIS_REGISTRATION_FAILURE); - operationLog.info(rollbacker.getErrorMessageForLog()); - rollbacker.doRollback(logger); - - logger.log("Operations haven't been registered in AS - recovery rollback"); - logger.registerFailure(); - - shouldStopRecovery = true; - - hookAdaptor.executePreRegistrationRollback(registrationContextHolder, null); - - finishRegistration(dataSetStorageAlgorithms, rollbackStack); - } else - { - - operationLog - .info("Recovery has found datasets in the AS. The registration of metadata was successful."); - - if (recoveryStage.before(RecoveryStage.POST_REGISTRATION_HOOK_EXECUTED)) - { - runner.postRegistration(); - } - - boolean success = true; - if (recoveryStage.before(RecoveryStage.STORAGE_COMPLETED)) - { - success = runner.commitAndStore(); - } - - if (success) - { - success = runner.cleanPrecommitAndConfirmStorage(); - } - if (success) - { - hookAdaptor.executePostStorage(registrationContextHolder); - - registrationSuccessful = true; - shouldStopRecovery = true; - - logger.registerSuccess(); - - // do the actions performed when the registration comes into terminal state. - finishRegistration(dataSetStorageAlgorithms, rollbackStack); + verifyEvaluatorHookFunctions(internalInterpreter); + v2ProgramInternal = + new JythonAsJavaDataSetRegistrationDropboxV2Wrapper<T>( + internalInterpreter); + } + return v2ProgramInternal; } - } - } catch (Throwable error) - { - if ("org.jmock.api.ExpectationError".equals(error.getClass().getCanonicalName())) - { - // this exception can by only thrown by tests. - // propagation of the exception is essential to test some functionalities - // implemented like this to avoid dependency to jmock in production - throw (Error) error; - } - operationLog.error("Uncaught error during recovery", error); - // in this case we should ignore, and run the recovery again after some time - logger.log(error, "Uncaught error during recovery"); - } - - cleanAfterwardsAction.execute(registrationSuccessful); - - recoveryMarkerCleanup.execute(shouldStopRecovery, shouldIncreaseTryCount); - - } - - private void finishRegistration(ArrayList<DataSetStorageAlgorithm<T>> dataSetStorageAlgorithms, - RollbackStack rollbackStack) - { - for (DataSetStorageAlgorithm<T> algorithm : dataSetStorageAlgorithms) - { - algorithm.getStagingFile().delete(); - } - rollbackStack.discard(); - } - - /** - * Create an adaptor that offers access to the recovery hook functions. - */ - protected class RecoveryHookAdaptor implements IPrePostRegistrationHook<T> - { - /** - * internally use only with getInterpreter - */ - private PythonInterpreter internalInterpreter; - - private final File incoming; - - public RecoveryHookAdaptor(File incoming) - { - this.incoming = incoming; - } - - private PythonInterpreter getInterpreter() - { - if (internalInterpreter == null) - { - internalInterpreter = PythonInterpreter.createIsolatedPythonInterpreter(); - // interpreter.execute script - - configureEvaluator(incoming, null, internalInterpreter); - - // Load the script - String scriptString = FileUtilities.loadToString(scriptFile); - - // Invoke the evaluator - internalInterpreter.exec(scriptString); - - verifyEvaluatorHookFunctions(internalInterpreter); - } - return internalInterpreter; - } - - @Override - public void executePreRegistration( - DataSetRegistrationContext.IHolder registrationContextHolder) - { - throw new NotImplementedException("Recovery cannot execute pre-registration hook."); - } - - @Override - public void executePostRegistration( - DataSetRegistrationContext.IHolder registrationContextHolder) - { - PyFunction function = - tryJythonFunction(getInterpreter(), - JythonHookFunction.POST_REGISTRATION_FUNCTION_NAME); - if (function != null) - { - invokeFunction(function, registrationContextHolder.getRegistrationContext()); - } - } - - /** - * This method does not belong to the IPrePostRegistrationHook interface. Is called directly - * by recovery. - */ - public void executePostStorage(DataSetRegistrationContext.IHolder registrationContextHolder) - { - PyFunction function = - tryJythonFunction(getInterpreter(), - JythonHookFunction.POST_STORAGE_FUNCTION_NAME); - if (function != null) - { - invokeFunction(function, registrationContextHolder.getRegistrationContext()); - } - } - - public void executePreRegistrationRollback( - DataSetRegistrationContext.IHolder registrationContextHolder, Throwable t) - { - PyFunction function = - tryJythonFunction(getInterpreter(), - JythonHookFunction.ROLLBACK_PRE_REGISTRATION_FUNCTION_NAME); - if (function != null) - { - invokeFunction(function, registrationContextHolder.getRegistrationContext(), t); - } - } - + }; } } diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDataSetHandler.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDataSetHandler.java index d6a7a99d451222981806db7a62b950d957dc4783..045573b617bbd2ecc857e51db7beff94d415f5b5 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDataSetHandler.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDataSetHandler.java @@ -37,7 +37,7 @@ public class JythonPlateDataSetHandler extends JythonTopLevelDataSetHandler<Data */ @Override protected IDataSetRegistrationDetailsFactory<DataSetInformation> createObjectFactory( - PythonInterpreter interpreter, DataSetInformation userProvidedDataSetInformationOrNull) + DataSetInformation userProvidedDataSetInformationOrNull) { return new JythonPlateDatasetFactory(getRegistratorState(), userProvidedDataSetInformationOrNull); diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDatasetFactory.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDatasetFactory.java index 3db55c9b669526f819be3d9ce6288f37f642d448..8b535eae0bfe43918cae754f8384f51a5d4d1646 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDatasetFactory.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/JythonPlateDatasetFactory.java @@ -26,7 +26,7 @@ import ch.systemsx.cisd.etlserver.registrator.AbstractOmniscientTopLevelDataSetR import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationDetails; import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationService; import ch.systemsx.cisd.etlserver.registrator.IDataSetRegistrationDetailsFactory; -import ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler.JythonObjectFactory; +import ch.systemsx.cisd.etlserver.registrator.JythonTopLevelDataSetHandler.ProgrammableDropboxObjectFactory; import ch.systemsx.cisd.etlserver.registrator.api.v1.IDataSet; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.DataSetRegistrationTransaction; import ch.systemsx.cisd.openbis.dss.etl.PlateGeometryOracle; @@ -46,7 +46,7 @@ import ch.systemsx.cisd.openbis.plugin.screening.shared.basic.dto.ScreeningConst * @author jakubs */ -public class JythonPlateDatasetFactory extends JythonObjectFactory<DataSetInformation> implements +public class JythonPlateDatasetFactory extends ProgrammableDropboxObjectFactory<DataSetInformation> implements IImagingDatasetFactory { final IDataSetRegistrationDetailsFactory<ImageDataSetInformation> imageDatasetFactory; @@ -66,7 +66,7 @@ public class JythonPlateDatasetFactory extends JythonObjectFactory<DataSetInform new JythonImageDataSetRegistrationFactory(this.registratorState, this.userProvidedDataSetInformationOrNull); this.featureVectorDatasetFactory = - new JythonObjectFactory<FeatureVectorDataSetInformation>(this.registratorState, + new ProgrammableDropboxObjectFactory<FeatureVectorDataSetInformation>(this.registratorState, this.userProvidedDataSetInformationOrNull) { @Override diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/v2/JythonPlateDataSetHandlerV2.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/v2/JythonPlateDataSetHandlerV2.java index c4f4c8621cf4f694ac2a6d4e3de9f2f76045135c..b843cf021319f8405d99cda47d5f7425ed6e68d8 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/v2/JythonPlateDataSetHandlerV2.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/jython/v2/JythonPlateDataSetHandlerV2.java @@ -58,8 +58,8 @@ public class JythonPlateDataSetHandlerV2 extends JythonTopLevelDataSetHandlerV2< * Create a screening specific factory available to the python script. */ @Override - protected IDataSetRegistrationDetailsFactory<DataSetInformation> createObjectFactory( - PythonInterpreter interpreter, DataSetInformation userProvidedDataSetInformationOrNull) + public IDataSetRegistrationDetailsFactory<DataSetInformation> createObjectFactory( + DataSetInformation userProvidedDataSetInformationOrNull) { return new JythonPlateDatasetFactory(getRegistratorState(), userProvidedDataSetInformationOrNull);