diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java index e79cdb405cac584d0d6bee5d25e2ffd6ac74384f..e5893cbaebc77e7d03ed5e5ccdd592cc9754b8c9 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/AbstractTransactionState.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import net.lemnik.eodsql.DynamicTransactionQuery; +import ch.systemsx.cisd.base.exceptions.IOExceptionUnchecked; +import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.common.exceptions.NotImplementedException; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.etlserver.DynamicTransactionQueryFactory; @@ -83,8 +85,14 @@ abstract class AbstractTransactionState<T extends DataSetInformation> * @author Chandrasekhar Ramakrishnan */ static class LiveTransactionState<T extends DataSetInformation> extends - AbstractTransactionState<T> + AbstractTransactionState<T> implements RollbackStack.IRollbackStackDelegate { + // Wait for up to 5 minutes for the file system to become available + private static final int MAX_DIRECTORY_AVAILABLE_WAIT_COUNT = 6 * 5; + + // Poll every 10 seconds + private static final int STAGING_DIR_AVAILABILITY_POLLING_WAIT_TIME = 10 * 1000; + // Keeps track of steps that have been executed and may need to be reverted. Elements are // kept in the order they need to be reverted. private final RollbackStack rollbackStack; @@ -518,7 +526,7 @@ abstract class AbstractTransactionState<T extends DataSetInformation> */ public void rollback() { - rollbackStack.rollbackAll(); + rollbackStack.rollbackAll(this); registeredDataSets.clear(); for (DynamicTransactionQuery query : queriesToCommit.values()) { @@ -658,6 +666,43 @@ abstract class AbstractTransactionState<T extends DataSetInformation> { return false; } + + @Override + public void willContinueRollbackAll(RollbackStack stack) + { + // Stop rolling back if the thread was interrupted + InterruptedExceptionUnchecked.check(); + + // Poll until the folder becomes accessible + if (null != FileUtilities.checkDirectoryFullyAccessible(stagingDirectory, "staging")) + { + boolean keepPolling = true; + for (int waitCount = 0; waitCount < MAX_DIRECTORY_AVAILABLE_WAIT_COUNT + && keepPolling; ++waitCount) + { + try + { + Thread.sleep(STAGING_DIR_AVAILABILITY_POLLING_WAIT_TIME); + // If the directory is not accessible (i.e., return not null), wait again + keepPolling = + (null != FileUtilities.checkDirectoryFullyAccessible( + stagingDirectory, "staging")); + } catch (InterruptedException e) + { + throw new InterruptedExceptionUnchecked(e); + } + } + + // The file never became available -- throw an exception + if (null != FileUtilities + .checkDirectoryFullyAccessible(stagingDirectory, "staging")) + { + throw new IOExceptionUnchecked("The staging directory " + + stagingDirectory.getAbsolutePath() + + " is not available. Could not rollback transaction."); + } + } + } } private static abstract class TerminalTransactionState<T extends DataSetInformation> extends diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/RollbackStack.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/RollbackStack.java index b1f41fea78589f045af615657679dcd0b35dba1c..1788451118269d5c3ea2ea7d96fd7a9acaf4807d 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/RollbackStack.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/registrator/api/v1/impl/RollbackStack.java @@ -37,6 +37,22 @@ import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecora */ class RollbackStack { + /** + * Delegate methods for the rollback stack, giving clients of the stack control over its + * behavior. + * + * @author Chandrasekhar Ramakrishnan + */ + public static interface IRollbackStackDelegate + { + + /** + * Informs clients that the stack will rollback another item. Implementations may throw + * exceptions or block the thread until the stack can continue. + */ + void willContinueRollbackAll(RollbackStack stack); + } + // The files that store the persistent queue. Used for discarding the queues. private final File queue1File; @@ -184,11 +200,29 @@ class RollbackStack * execution. */ public void rollbackAll() + { + rollbackAll(new IRollbackStackDelegate() + { + + @Override + public void willContinueRollbackAll(RollbackStack stack) + { + // Don't do anything + } + }); + } + + /** + * Rollback any commands that have been executed. Rollback is done in the reverse order of + * execution. + */ + public void rollbackAll(IRollbackStackDelegate delegate) { getOperationLog().info("Rolling back stack " + this); // Pop and rollback all while (size() > 0) { + delegate.willContinueRollbackAll(this); rollbackAndPop(); } }