From fe41a8e169622fdb2316857f7ffa8dbc899bc0b6 Mon Sep 17 00:00:00 2001 From: jakubs <jakubs> Date: Mon, 13 Feb 2012 08:56:08 +0000 Subject: [PATCH] LMS-2767 precommit works for jythin dropboxes SVN: 24415 --- .../ContainerDataSetStorageAlgorithm.java | 144 ++-------------- .../registrator/DataSetStorageAlgorithm.java | 159 +++++++++++++----- .../DataSetStorageAlgorithmRunner.java | 89 ++++++++-- .../api/v1/impl/AbstractTransactionState.java | 13 +- 4 files changed, 207 insertions(+), 198 deletions(-) diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/ContainerDataSetStorageAlgorithm.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/ContainerDataSetStorageAlgorithm.java index 62838bbf1a8..9d96f89bd87 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/ContainerDataSetStorageAlgorithm.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/ContainerDataSetStorageAlgorithm.java @@ -40,8 +40,6 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.NewExternalData; public class ContainerDataSetStorageAlgorithm<T extends DataSetInformation> extends DataSetStorageAlgorithm<T> { - // We use our own states, not the ones of the parent. - private DataSetStorageAlgorithmState<T> state; /** * @param incomingDataSetFile @@ -64,67 +62,43 @@ public class ContainerDataSetStorageAlgorithm<T extends DataSetInformation> exte super(incomingDataSetFile, registrationDetails, dataStoreStrategy, storageProcessor, dataSetValidator, dataStoreCode, fileOperations, mailClient, stagingDirectory, precommitDirectory); - - state = new InitializedState<T>(this); } @Override public IStorageProcessorTransaction prepare(IRollbackStack rollbackStack) { - InitializedState<T> initializedState = (InitializedState<T>) state; - initializedState.prepare(); - - state = new PreparedState<T>(initializedState); + //assert initialized - move to prepared state. return new NullStorageProcessorTransaction(); } @Override - public void runStorageProcessor() throws Throwable + public void preCommit() throws Throwable { - PreparedState<T> preparedState = (PreparedState<T>) state; - preparedState.storeData(); - - state = new StoredState<T>(preparedState); + //assert prepared state, leave in precommit } + @Override + public void moveToTheStore() throws Throwable + { + //assert commited state, leave in stored + } + @Override public void transitionToRolledbackState(Throwable throwable) { - // Rollback may be called on in the stored state or in the prepared state. - if (state instanceof PreparedState) - { - // Container data sets do not use the storage processer -- there is nothing to do - return; - } - - StoredState<T> storedState = (StoredState<T>) state; - - state = new RolledbackState<T>(storedState, UnstoreDataAction.LEAVE_UNTOUCHED, throwable); + //return in rolledback state } @Override public void transitionToUndoneState() { - // Rollback may be called on in the stored state or in the prepared state. In the prepared - // state, there is nothing to do. - if (state instanceof PreparedState) - { - state = new UndoneState<T>((PreparedState<T>) state); - return; - } - - RolledbackState<T> rolledbackState = (RolledbackState<T>) state; - - state = new UndoneState<T>(rolledbackState); + // return in undone state } @Override public void commitStorageProcessor() { - StoredState<T> storedState = (StoredState<T>) state; - storedState.commitStorageProcessor(); - - state = new CommittedState<T>(storedState); + //assert precommitted state - return in committed state } @Override @@ -149,98 +123,4 @@ public class ContainerDataSetStorageAlgorithm<T extends DataSetInformation> exte + "'."; } - private static abstract class DataSetStorageAlgorithmState<T extends DataSetInformation> - { - protected final DataSetStorageAlgorithm<T> storageAlgorithm; - - protected DataSetStorageAlgorithmState(DataSetStorageAlgorithm<T> storageAlgorithm) - { - this.storageAlgorithm = storageAlgorithm; - } - } - - private static class InitializedState<T extends DataSetInformation> extends - DataSetStorageAlgorithmState<T> - { - public InitializedState(DataSetStorageAlgorithm<T> storageAlgorithm) - { - super(storageAlgorithm); - } - - /** - * Prepare registration of a data set. - */ - public void prepare() - { - } - } - - private static class PreparedState<T extends DataSetInformation> extends - DataSetStorageAlgorithmState<T> - { - public PreparedState(InitializedState<T> oldState) - { - super(oldState.storageAlgorithm); - } - - public void storeData() - { - - } - } - - private static class StoredState<T extends DataSetInformation> extends - DataSetStorageAlgorithmState<T> - { - - public StoredState(PreparedState<T> oldState) - { - super(oldState.storageAlgorithm); - } - - /** - * Committed data sets don't use a storage process -- there is nothing to do. - */ - public void commitStorageProcessor() - { - } - } - - private static class CommittedState<T extends DataSetInformation> extends - DataSetStorageAlgorithmState<T> - { - - CommittedState(StoredState<T> oldState) - { - super(oldState.storageAlgorithm); - } - - } - - private static class RolledbackState<T extends DataSetInformation> extends - DataSetStorageAlgorithmState<T> - { - - public RolledbackState(StoredState<T> oldState, UnstoreDataAction action, - Throwable throwable) - { - super(oldState.storageAlgorithm); - } - } - - private static class UndoneState<T extends DataSetInformation> extends - DataSetStorageAlgorithmState<T> - { - - UndoneState(PreparedState<T> oldState) - { - super(oldState.storageAlgorithm); - } - - UndoneState(RolledbackState<T> oldState) - { - super(oldState.storageAlgorithm); - } - - } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithm.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithm.java index 52fa37bcfca..58476b179eb 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithm.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithm.java @@ -180,14 +180,25 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> } /** - * Run the storage processor. Expects prepared state and leaves in stored state. + * Stores the data in precommit. Expects prepared state and leaves in precommited state. */ - public void runStorageProcessor() throws Throwable + public void preCommit() throws Throwable { PreparedState<T> preparedState = (PreparedState<T>) state; - preparedState.storeData(); + preparedState.storeDataInPrecommit(); - state = new StoredState<T>(preparedState); + state = new PrecommittedState<T>(preparedState); + } + + /** + * Moves the data from precommit to the store. Expects Commited State and leaves in stored + * state. + */ + public void moveToTheStore() throws Throwable + { + CommittedState<T> committed = (CommittedState<T>) state; + committed.moveToTheStore(); + state = new StoredState<T>(committed); } /** @@ -196,7 +207,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> */ public void transitionToRolledbackState(Throwable throwable) { - // Rollback may be called on in the stored state or in the prepared state. + // Rollback may be called on in the precommit state or in the prepared state. if (state instanceof PreparedState) { PreparedState<T> preparedState = (PreparedState<T>) state; @@ -204,7 +215,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> { // If a storeData() was attempted and failed, then we need to move to the stored // state in order to rollback - state = new StoredState<T>((PreparedState<T>) state); + state = new PrecommittedState<T>((PreparedState<T>) state); } else { // If no storeData() was invoked, there is nothing to do @@ -212,7 +223,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> } } - StoredState<T> storedState = (StoredState<T>) state; + PrecommittedState<T> storedState = (PrecommittedState<T>) state; storedState.cleanUpMarkerFile(); state = new RolledbackState<T>(storedState, UnstoreDataAction.LEAVE_UNTOUCHED, throwable); @@ -239,7 +250,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> */ public void commitStorageProcessor() { - StoredState<T> storedState = (StoredState<T>) state; + PrecommittedState<T> storedState = (PrecommittedState<T>) state; storedState.commitStorageProcessor(); state = new CommittedState<T>(storedState); @@ -262,7 +273,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> public NewExternalData createExternalData() { - File dataFile = ((StoredState<T>) state).getStoredDirectory(); + File dataFile = ((PrecommittedState<T>) state).getStoredDirectory(); String relativePath = FileUtilities.getRelativeFilePath(storeRoot, dataFile); String absolutePath = dataFile.getAbsolutePath(); assert relativePath != null : String.format( @@ -319,6 +330,11 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> return stagingDirectory; } + public File getPreCommitDirectory() + { + return preCommitDirectory; + } + private static abstract class DataSetStorageAlgorithmState<T extends DataSetInformation> { protected final DataSetStorageAlgorithm<T> storageAlgorithm; @@ -345,9 +361,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> private static class InitializedState<T extends DataSetInformation> extends DataSetStorageAlgorithmState<T> { - protected File stagingBaseDirectory; - - protected File storeBaseDirectory; + protected DataSetStoragePaths storagePaths; protected IStorageProcessorTransaction transaction; @@ -367,13 +381,13 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> IDataStoreStrategy dataStoreStrategy = storageAlgorithm.getDataStoreStrategy(); // Create the staging base directory - stagingBaseDirectory = + File stagingBaseDirectory = new File(storageAlgorithm.getStagingDirectory(), storageAlgorithm .getDataSetInformation().getDataSetCode() + "-storage"); this.rollbackStack.pushAndExecuteCommand(new MkdirsCommand(stagingBaseDirectory .getAbsolutePath())); - storeBaseDirectory = + File storeBaseDirectory = DataSetStorageAlgorithm.createBaseDirectory(dataStoreStrategy, storageAlgorithm.getStoreRoot(), getFileOperations(), storageAlgorithm.getDataSetInformation(), @@ -386,15 +400,20 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> transaction = storageAlgorithm.getStorageProcessor().createTransaction(transactionParameters); + File precommitBaseDirectory = + new File(storageAlgorithm.getPreCommitDirectory(), storageAlgorithm + .getDataSetInformation().getDataSetCode() + "-precommit"); + + storagePaths = + new DataSetStoragePaths(stagingBaseDirectory, storeBaseDirectory, + precommitBaseDirectory); + } } private static class PreparedState<T extends DataSetInformation> extends DataSetStorageAlgorithmState<T> { - protected final File stagingBaseDirectory; - - protected final File storeBaseDirectory; protected final DataSetInformation dataSetInformation; @@ -402,23 +421,24 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> protected final IStorageProcessorTransaction transaction; + protected final DataSetStoragePaths storagePaths; + protected File markerFile; public PreparedState(InitializedState<T> oldState) { super(oldState.storageAlgorithm); - this.stagingBaseDirectory = oldState.stagingBaseDirectory; - this.storeBaseDirectory = oldState.storeBaseDirectory; this.dataSetInformation = storageAlgorithm.getDataSetInformation(); this.transaction = oldState.transaction; this.rollbackStack = oldState.rollbackStack; + this.storagePaths = oldState.storagePaths; } - public void storeData() + public void storeDataInPrecommit() { - //will throw exception if marker file already exists + // will throw exception if marker file already exists markerFile = createProcessingMarkerFile(); - + transactionStoreData(); moveFilesFromStagingToStore(); @@ -437,24 +457,32 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> for (File stagedFile : stagedFiles) { - rollbackStack.pushAndExecuteCommand(new MoveFileCommand(stagedStoredDataDirectory - .getAbsolutePath(), stagedFile.getName(), - storeBaseDirectory.getAbsolutePath(), stagedFile.getName())); + rollbackStack + .pushAndExecuteCommand(new MoveFileCommand(stagedStoredDataDirectory + .getAbsolutePath(), stagedFile.getName(), + storagePaths.precommitBaseDirectory.getAbsolutePath(), stagedFile + .getName())); } } private void transactionStoreData() { + // human readable description for logging purposes String entityDescription = createEntityDescription(); if (getOperationLog().isInfoEnabled()) { getOperationLog().info("Start storing data set for " + entityDescription + "."); } + + // prepare watch for time calculation final StopWatch watch = new StopWatch(); watch.start(); + // actual call to transaction.storeData transaction.storeData(storageAlgorithm.getRegistrationDetails(), getMailClient(), incomingDataSetFile); + + // log time if (getOperationLog().isInfoEnabled()) { getOperationLog().info( @@ -469,14 +497,14 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> private final File createProcessingMarkerFile() { - final File baseDirectory = stagingBaseDirectory; + final File baseDirectory = storagePaths.stagingBaseDirectory; final File baseParentDirectory = baseDirectory.getParentFile(); final String processingDirName = baseDirectory.getName(); markerFile = new File(baseParentDirectory, Constants.PROCESSING_PREFIX + processingDirName); try { - //will throw exception if marker file already exists + // will throw exception if marker file already exists rollbackStack .pushAndExecuteCommand(new NewFileCommand(markerFile.getAbsolutePath())); } catch (final IOExceptionUnchecked ex) @@ -503,32 +531,31 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> } } - private static class StoredState<T extends DataSetInformation> extends + private static class PrecommittedState<T extends DataSetInformation> extends DataSetStorageAlgorithmState<T> { protected final IStorageProcessorTransaction transaction; protected final File markerFile; - protected final File stagingBaseDirectory; - - protected final File storeBaseDirectory; + protected final DataSetStoragePaths storagePaths; - public StoredState(PreparedState<T> oldState) + public PrecommittedState(PreparedState<T> oldState) { super(oldState.storageAlgorithm); this.transaction = oldState.transaction; - this.stagingBaseDirectory = oldState.stagingBaseDirectory; this.markerFile = oldState.markerFile; - this.storeBaseDirectory = oldState.storeBaseDirectory; + this.storagePaths = oldState.storagePaths; } /** * Ask the storage processor to commit. Used by clients of the algorithm. + * <p> */ public void commitStorageProcessor() { - transaction.setStoredDataDirectory(storeBaseDirectory); + // TODO: KUBA - tutaj pewnie powinno byc precommit + transaction.setStoredDataDirectory(storagePaths.precommitBaseDirectory); transaction.commit(); cleanUpMarkerFile(); } @@ -538,7 +565,7 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> */ public File getStoredDirectory() { - return storeBaseDirectory; + return storagePaths.storeBaseDirectory; } /** @@ -558,32 +585,59 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> DataSetStorageAlgorithmState<T> { - protected final File stagingDirectory; + protected final DataSetStoragePaths storagePaths; - CommittedState(StoredState<T> oldState) + CommittedState(PrecommittedState<T> oldState) { super(oldState.storageAlgorithm); - this.stagingDirectory = oldState.stagingBaseDirectory; + this.storagePaths = oldState.storagePaths; cleanUpStagingDirectory(); } private void cleanUpStagingDirectory() { - getFileOperations().delete(stagingDirectory); - if (stagingDirectory.exists()) + getFileOperations().delete(storagePaths.stagingBaseDirectory); + if (storagePaths.stagingBaseDirectory.exists()) { - operationLog.error("Staging directory '" + stagingDirectory + operationLog.error("Staging directory '" + storagePaths.stagingBaseDirectory + "' could not be deleted."); } } + /** + * Moves files from the precommited directory to the store + */ + private void moveToTheStore() + { + File[] stagedFiles = storagePaths.precommitBaseDirectory.listFiles(); + if (null == stagedFiles) + { + return; + } + + for (File stagedFile : stagedFiles) + { + new MoveFileCommand(storagePaths.precommitBaseDirectory.getAbsolutePath(), + stagedFile.getName(), storagePaths.storeBaseDirectory.getAbsolutePath(), + stagedFile.getName()).execute(); + } + } + } + + private static class StoredState<T extends DataSetInformation> extends + DataSetStorageAlgorithmState<T> + { + StoredState(CommittedState<T> oldState) + { + super(oldState.storageAlgorithm); + } } private static class RolledbackState<T extends DataSetInformation> extends DataSetStorageAlgorithmState<T> { - public RolledbackState(StoredState<T> oldState, UnstoreDataAction action, + public RolledbackState(PrecommittedState<T> oldState, UnstoreDataAction action, Throwable throwable) { super(oldState.storageAlgorithm); @@ -603,6 +657,25 @@ public class DataSetStorageAlgorithm<T extends DataSetInformation> { super(oldState.storageAlgorithm); } + } + private static class DataSetStoragePaths + { + protected final File stagingBaseDirectory; + + protected final File storeBaseDirectory; + + protected final File precommitBaseDirectory; + + public DataSetStoragePaths(File stagingBaseDirectory, File storeBaseDirectory, + File precommitBaseDirectory) + { + super(); + this.stagingBaseDirectory = stagingBaseDirectory; + this.storeBaseDirectory = storeBaseDirectory; + this.precommitBaseDirectory = precommitBaseDirectory; + } } + + } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java index fc21a835478..ee303d025fe 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/DataSetStorageAlgorithmRunner.java @@ -167,30 +167,41 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation> /** * Register the data sets. + * + * @return true if some data sets were registered */ - public List<DataSetInformation> runStorageAlgorithms() + public boolean runStorageAlgorithms() { + // all algorithms are now in + // PREPARED STATE + try { + // move data to precommited directory + // Runs or throws a throwable - runStorageProcessors(); + preCommitStorageAlgorithms(); } catch (final Throwable throwable) { rollbackDuringStorageProcessorRun(throwable); - return Collections.emptyList(); + return false; } - dssRegistrationLog.log("Data has been moved to the store."); + dssRegistrationLog.log("Data has been moved to the pre-commit directory."); + + // PRECOMMITED STATE try { + // registers data set with yet non-existing store path. + // Runs or throw a throwable registerDataSetsInApplicationServer(); } catch (final Throwable throwable) { rollbackDuringMetadataRegistration(throwable); - return Collections.emptyList(); + return false; } dssRegistrationLog.log("Data has been registered with the openBIS Application Server."); @@ -202,27 +213,56 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation> dssRegistrationLog.log("Storage processors have committed."); + } catch (final Throwable throwable) + { + // Something has gone really wrong + rollbackAfterStorageProcessorAndMetadataRegistration(throwable); + return false; + } + + // COMMITED + + try + { + storeCommitedDatasets(); + logSuccessfulRegistration(); + dssRegistrationLog.log("Data has been moved to the final store."); + } catch (final Throwable throwable) + { + // Something has gone really wrong + return false; + } + + // move files to the store - ArrayList<DataSetInformation> dataSetInformationCollection = - new ArrayList<DataSetInformation>(); + try + { for (DataSetStorageAlgorithm<T> storageAlgorithm : dataSetStorageAlgorithms) { - dataSetInformationCollection.add(storageAlgorithm.getDataSetInformation()); openBISService.setStorageConfirmed(storageAlgorithm.getDataSetInformation() .getDataSetCode()); } - return dataSetInformationCollection; + dssRegistrationLog.log("Storage has been confirmed in openBIS Application Server."); } catch (final Throwable throwable) { - // Something has gone really wrong - rollbackAfterStorageProcessorAndMetadataRegistration(throwable); - return Collections.emptyList(); + // nothing to rollback. + // Graceful recovery should (and will) take care of this case } + + return !dataSetStorageAlgorithms.isEmpty(); + + // confirm storage in AS + + // STORAGECONFIRMED + } - public List<DataSetInformation> prepareAndRunStorageAlgorithms() + /** + * @returns true if some datasets have been registered + */ + public boolean prepareAndRunStorageAlgorithms() { prepare(); return runStorageAlgorithms(); @@ -252,12 +292,26 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation> ErrorType.POST_REGISTRATION_ERROR); } + /** + * Committed => Stored + */ + private void storeCommitedDatasets() throws Throwable + { + for (DataSetStorageAlgorithm<T> storageAlgorithm : dataSetStorageAlgorithms) + { + System.out.println("Moving to the store"); + storageAlgorithm.moveToTheStore(); + } + } + + /** + * Precommitted => Committed + */ private void commitStorageProcessors() { for (DataSetStorageAlgorithm<T> storageAlgorithm : dataSetStorageAlgorithms) { storageAlgorithm.commitStorageProcessor(); - } } @@ -274,11 +328,14 @@ public class DataSetStorageAlgorithmRunner<T extends DataSetInformation> applicationServerRegistrator.registerDataSetsInApplicationServer(registrationData); } - private void runStorageProcessors() throws Throwable + /** + * Prepared => Precommit + */ + private void preCommitStorageAlgorithms() throws Throwable { for (DataSetStorageAlgorithm<T> storageAlgorithm : dataSetStorageAlgorithms) { - storageAlgorithm.runStorageProcessor(); + storageAlgorithm.preCommit(); } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java index 47393877eb1..252177e257d 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java @@ -628,9 +628,8 @@ abstract class AbstractTransactionState<T extends DataSetInformation> DataSetStorageAlgorithmRunner<T> runner = new DataSetStorageAlgorithmRunner<T>(algorithms, parent, parent, rollbackStack, registrationService.getDssRegistrationLog(), openBisService); - List<DataSetInformation> datasets = runner.prepareAndRunStorageAlgorithms(); - - boolean noDataSetsRegistered = datasets.isEmpty(); + + boolean someDataSetsRegistered = runner.prepareAndRunStorageAlgorithms(); // The queries are optional parts of the commit; catch any errors and inform the // invoker @@ -640,12 +639,12 @@ abstract class AbstractTransactionState<T extends DataSetInformation> { try { - if (noDataSetsRegistered) + if (someDataSetsRegistered) { - query.rollback(); + query.commit(); } else { - query.commit(); + query.rollback(); } query.close(false); } catch (Throwable e) @@ -659,7 +658,7 @@ abstract class AbstractTransactionState<T extends DataSetInformation> parent.invokeDidEncounterSecondaryTransactionErrors(encounteredErrors); } - return datasets.isEmpty() == false; + return someDataSetsRegistered; } /** -- GitLab