Skip to content
Snippets Groups Projects
Commit 9566ef56 authored by cramakri's avatar cramakri
Browse files

LMS-2515 make data set registration transaction rollback more robust in case...

LMS-2515 make data set registration transaction rollback more robust in case unreliable file systems

SVN: 22985
parent ce591b53
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,8 @@ import java.util.List; ...@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import net.lemnik.eodsql.DynamicTransactionQuery; import net.lemnik.eodsql.DynamicTransactionQuery;
import ch.systemsx.cisd.base.exceptions.IOExceptionUnchecked;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
import ch.systemsx.cisd.common.exceptions.NotImplementedException; import ch.systemsx.cisd.common.exceptions.NotImplementedException;
import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.filesystem.FileUtilities;
import ch.systemsx.cisd.etlserver.DynamicTransactionQueryFactory; import ch.systemsx.cisd.etlserver.DynamicTransactionQueryFactory;
...@@ -83,8 +85,14 @@ abstract class AbstractTransactionState<T extends DataSetInformation> ...@@ -83,8 +85,14 @@ abstract class AbstractTransactionState<T extends DataSetInformation>
* @author Chandrasekhar Ramakrishnan * @author Chandrasekhar Ramakrishnan
*/ */
static class LiveTransactionState<T extends DataSetInformation> extends static class LiveTransactionState<T extends DataSetInformation> extends
AbstractTransactionState<T> AbstractTransactionState<T> implements RollbackStack.IRollbackStackDelegate
{ {
// Wait for up to 5 minutes for the file system to become available
private static final int MAX_DIRECTORY_AVAILABLE_WAIT_COUNT = 6 * 5;
// Poll every 10 seconds
private static final int STAGING_DIR_AVAILABILITY_POLLING_WAIT_TIME = 10 * 1000;
// Keeps track of steps that have been executed and may need to be reverted. Elements are // Keeps track of steps that have been executed and may need to be reverted. Elements are
// kept in the order they need to be reverted. // kept in the order they need to be reverted.
private final RollbackStack rollbackStack; private final RollbackStack rollbackStack;
...@@ -518,7 +526,7 @@ abstract class AbstractTransactionState<T extends DataSetInformation> ...@@ -518,7 +526,7 @@ abstract class AbstractTransactionState<T extends DataSetInformation>
*/ */
public void rollback() public void rollback()
{ {
rollbackStack.rollbackAll(); rollbackStack.rollbackAll(this);
registeredDataSets.clear(); registeredDataSets.clear();
for (DynamicTransactionQuery query : queriesToCommit.values()) for (DynamicTransactionQuery query : queriesToCommit.values())
{ {
...@@ -658,6 +666,43 @@ abstract class AbstractTransactionState<T extends DataSetInformation> ...@@ -658,6 +666,43 @@ abstract class AbstractTransactionState<T extends DataSetInformation>
{ {
return false; return false;
} }
@Override
public void willContinueRollbackAll(RollbackStack stack)
{
// Stop rolling back if the thread was interrupted
InterruptedExceptionUnchecked.check();
// Poll until the folder becomes accessible
if (null != FileUtilities.checkDirectoryFullyAccessible(stagingDirectory, "staging"))
{
boolean keepPolling = true;
for (int waitCount = 0; waitCount < MAX_DIRECTORY_AVAILABLE_WAIT_COUNT
&& keepPolling; ++waitCount)
{
try
{
Thread.sleep(STAGING_DIR_AVAILABILITY_POLLING_WAIT_TIME);
// If the directory is not accessible (i.e., return not null), wait again
keepPolling =
(null != FileUtilities.checkDirectoryFullyAccessible(
stagingDirectory, "staging"));
} catch (InterruptedException e)
{
throw new InterruptedExceptionUnchecked(e);
}
}
// The file never became available -- throw an exception
if (null != FileUtilities
.checkDirectoryFullyAccessible(stagingDirectory, "staging"))
{
throw new IOExceptionUnchecked("The staging directory "
+ stagingDirectory.getAbsolutePath()
+ " is not available. Could not rollback transaction.");
}
}
}
} }
private static abstract class TerminalTransactionState<T extends DataSetInformation> extends private static abstract class TerminalTransactionState<T extends DataSetInformation> extends
......
...@@ -37,6 +37,22 @@ import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecora ...@@ -37,6 +37,22 @@ import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecora
*/ */
class RollbackStack class RollbackStack
{ {
/**
* Delegate methods for the rollback stack, giving clients of the stack control over its
* behavior.
*
* @author Chandrasekhar Ramakrishnan
*/
public static interface IRollbackStackDelegate
{
/**
* Informs clients that the stack will rollback another item. Implementations may throw
* exceptions or block the thread until the stack can continue.
*/
void willContinueRollbackAll(RollbackStack stack);
}
// The files that store the persistent queue. Used for discarding the queues. // The files that store the persistent queue. Used for discarding the queues.
private final File queue1File; private final File queue1File;
...@@ -184,11 +200,29 @@ class RollbackStack ...@@ -184,11 +200,29 @@ class RollbackStack
* execution. * execution.
*/ */
public void rollbackAll() public void rollbackAll()
{
rollbackAll(new IRollbackStackDelegate()
{
@Override
public void willContinueRollbackAll(RollbackStack stack)
{
// Don't do anything
}
});
}
/**
* Rollback any commands that have been executed. Rollback is done in the reverse order of
* execution.
*/
public void rollbackAll(IRollbackStackDelegate delegate)
{ {
getOperationLog().info("Rolling back stack " + this); getOperationLog().info("Rolling back stack " + this);
// Pop and rollback all // Pop and rollback all
while (size() > 0) while (size() > 0)
{ {
delegate.willContinueRollbackAll(this);
rollbackAndPop(); rollbackAndPop();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment