diff --git a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java index dbcc6f1ab72ed9ca0ae05b6bfb5c015977ba8194..659d9547209bf6af0d145d40dc1e08de261483eb 100644 --- a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java +++ b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java @@ -6,7 +6,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -84,6 +86,8 @@ public class TransactionCoordinator implements ITransactionCoordinator public void restoreTransactions() { + operationLog.info("Starting restore of unfinished transactions"); + Map<UUID, TransactionStatus> lastStatuses = transactionLog.getLastStatuses(); if (lastStatuses != null && !lastStatuses.isEmpty()) @@ -103,11 +107,12 @@ public class TransactionCoordinator implements ITransactionCoordinator { if (existingTransaction == null) { - operationLog.info("Restoring transaction '" + transactionId + "' with last status '" + lastStatus + "'"); + operationLog.info( + "Restoring transaction '" + transactionId + "' with last status '" + lastStatus + "' from the transaction log."); Transaction transaction = createTransaction(transactionId, lastStatus); - synchronized (transaction) + transaction.lockOrSkip(() -> { switch (lastStatus) { @@ -127,21 +132,22 @@ public class TransactionCoordinator implements ITransactionCoordinator throw new IllegalStateException( "Transaction '" + transactionId + "' has an unsupported last status '" + lastStatus + "'"); } - } + }); } else { - synchronized (existingTransaction) + operationLog.info("Another attempt to finish transaction '" + transactionId + "' with last status '" + lastStatus + "'"); + + existingTransaction.lockOrSkip(() -> { if (TransactionStatus.COMMIT_FAILED.equals(lastStatus)) { - operationLog.info("Another attempt to commit transaction '" + transactionId + "'"); + commitPreparedTransaction(existingTransaction, null, null, true); } else if (TransactionStatus.ROLLBACK_FAILED.equals(lastStatus)) { - operationLog.info("Another attempt to rollback transaction '" + transactionId + "'"); rollbackTransaction(existingTransaction, null, null, true); } - } + }); } } catch (Exception e) { @@ -155,25 +161,40 @@ public class TransactionCoordinator implements ITransactionCoordinator } } } + } else + { + operationLog.info("No unfinished transactions found in the transaction log"); } + + operationLog.info("Finished restore of unfinished transactions"); } public void cleanupTransactions() { + operationLog.info("Starting cleanup of abandoned transactions"); + for (Transaction transaction : transactionMap.values()) { - boolean timedOut = System.currentTimeMillis() - transaction.getLastAccessedDate().getTime() > transactionTimeoutInSeconds * 1000L; - - if (timedOut && TransactionStatus.BEGIN_FINISHED.equals(transaction.getTransactionStatus())) + transaction.lockOrSkip(() -> { - synchronized (transaction) + boolean timedOut = System.currentTimeMillis() - transaction.getLastAccessedDate().getTime() > transactionTimeoutInSeconds * 1000L; + + if (timedOut && TransactionStatus.BEGIN_FINISHED.equals(transaction.getTransactionStatus())) { - operationLog.info("Cleaning up abandoned transaction '" + transaction.getTransactionId() + "' with last status '" - + transaction.getTransactionStatus() + "' and last accessed date '" + transaction.getLastAccessedDate() + "'."); - rollbackTransaction(transaction, null, null, false); + try + { + operationLog.info("Cleaning up abandoned transaction '" + transaction.getTransactionId() + "' with last status '" + + transaction.getTransactionStatus() + "' and last accessed date '" + transaction.getLastAccessedDate() + "'."); + rollbackTransaction(transaction, null, null, false); + } catch (Exception e) + { + operationLog.warn("Cleanup of abandoned transaction '" + transaction.getTransactionId() + "' failed.", e); + } } - } + }); } + + operationLog.info("Finished cleanup of abandoned transactions"); } @Override public void beginTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey) @@ -184,7 +205,7 @@ public class TransactionCoordinator implements ITransactionCoordinator Transaction transaction = createTransaction(transactionId, TransactionStatus.NEW); - synchronized (transaction) + transaction.lockOrFail(() -> { transaction.setTransactionStatus(TransactionStatus.BEGIN_STARTED); @@ -216,7 +237,9 @@ public class TransactionCoordinator implements ITransactionCoordinator transaction.setTransactionStatus(TransactionStatus.BEGIN_FINISHED); operationLog.info("Begin transaction '" + transactionId + "' finished successfully."); - } + + return null; + }); } @Override public <T> T executeOperation(final UUID transactionId, final String sessionToken, final String interactiveSessionKey, @@ -233,7 +256,7 @@ public class TransactionCoordinator implements ITransactionCoordinator throw new IllegalStateException("Transaction '" + transactionId + "' does not exist."); } - synchronized (transaction) + return transaction.lockOrFail(() -> { checkTransactionStatus(transaction, TransactionStatus.BEGIN_FINISHED); @@ -253,7 +276,7 @@ public class TransactionCoordinator implements ITransactionCoordinator } throw new IllegalArgumentException("Unknown participant id: " + participantId); - } + }); } @Override public void commitTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey) @@ -269,17 +292,27 @@ public class TransactionCoordinator implements ITransactionCoordinator throw new IllegalStateException("Transaction '" + transactionId + "' does not exist."); } - synchronized (transaction) + transaction.lockOrFail(() -> { checkTransactionStatus(transaction, TransactionStatus.BEGIN_FINISHED); operationLog.info("Commit transaction '" + transactionId + "' started."); prepareTransaction(transaction, sessionToken, interactiveSessionKey); - commitPreparedTransaction(transaction, sessionToken, interactiveSessionKey, false); + + try + { + commitPreparedTransaction(transaction, sessionToken, interactiveSessionKey, false); + } catch (Exception ignore) + { + // do not throw the exception to the client as there is nothing it can do, + // the commit will be retried automatically by the coordinator + } operationLog.info("Commit transaction '" + transactionId + "' finished successfully."); - } + + return null; + }); } private void prepareTransaction(final Transaction transaction, final String sessionToken, final String interactiveSessionKey) @@ -326,7 +359,7 @@ public class TransactionCoordinator implements ITransactionCoordinator transaction.setTransactionStatus(TransactionStatus.COMMIT_STARTED); - boolean failed = false; + RuntimeException exception = null; for (ITransactionParticipant participant : participants) { @@ -362,19 +395,23 @@ public class TransactionCoordinator implements ITransactionCoordinator operationLog.error( "Commit prepared transaction '" + transaction.getTransactionId() + "' failed for participant '" + participant.getParticipantId() + "'.", e); - failed = true; + if (exception == null) + { + exception = e; + } } } - if (failed) - { - transaction.setTransactionStatus(TransactionStatus.COMMIT_FAILED); - operationLog.info("Commit prepared transaction '" + transaction.getTransactionId() + "' failed."); - } else + if (exception == null) { transaction.setTransactionStatus(TransactionStatus.COMMIT_FINISHED); transactionMap.remove(transaction.getTransactionId()); operationLog.info("Commit prepared transaction '" + transaction.getTransactionId() + "' finished successfully."); + } else + { + transaction.setTransactionStatus(TransactionStatus.COMMIT_FAILED); + operationLog.info("Commit prepared transaction '" + transaction.getTransactionId() + "' failed."); + throw exception; } } @@ -391,14 +428,23 @@ public class TransactionCoordinator implements ITransactionCoordinator return; } - synchronized (transaction) + transaction.lockOrFail(() -> { checkTransactionStatus(transaction, TransactionStatus.BEGIN_STARTED, TransactionStatus.BEGIN_FINISHED, - TransactionStatus.PREPARE_STARTED, - TransactionStatus.PREPARE_FINISHED, TransactionStatus.ROLLBACK_STARTED, TransactionStatus.ROLLBACK_FINISHED); + TransactionStatus.PREPARE_STARTED, TransactionStatus.PREPARE_FINISHED, TransactionStatus.ROLLBACK_STARTED, + TransactionStatus.ROLLBACK_FINISHED); - rollbackTransaction(transaction, sessionToken, interactiveSessionKey, false); - } + try + { + rollbackTransaction(transaction, sessionToken, interactiveSessionKey, false); + } catch (Exception ignore) + { + // do not throw the exception to the client as there is nothing it can do, + // the rollback will be retried automatically by the coordinator + } + + return null; + }); } private void rollbackTransaction(final Transaction transaction, final String sessionToken, final String interactiveSessionKey, @@ -408,7 +454,7 @@ public class TransactionCoordinator implements ITransactionCoordinator transaction.setTransactionStatus(TransactionStatus.ROLLBACK_STARTED); - boolean failed = false; + RuntimeException exception = null; for (ITransactionParticipant participant : participants) { @@ -443,19 +489,23 @@ public class TransactionCoordinator implements ITransactionCoordinator operationLog.info( "Rollback transaction '" + transaction.getTransactionId() + "' failed for participant '" + participant.getParticipantId() + "'.", e); - failed = true; + if (exception == null) + { + exception = e; + } } } - if (failed) - { - transaction.setTransactionStatus(TransactionStatus.ROLLBACK_FAILED); - operationLog.info("Rollback transaction '" + transaction.getTransactionId() + "' failed."); - } else + if (exception == null) { transaction.setTransactionStatus(TransactionStatus.ROLLBACK_FINISHED); transactionMap.remove(transaction.getTransactionId()); operationLog.info("Rollback transaction '" + transaction.getTransactionId() + "' finished successfully."); + } else + { + transaction.setTransactionStatus(TransactionStatus.ROLLBACK_FAILED); + operationLog.info("Rollback transaction '" + transaction.getTransactionId() + "' failed."); + throw exception; } } @@ -557,6 +607,8 @@ public class TransactionCoordinator implements ITransactionCoordinator private Date lastAccessedDate = new Date(); + private final ReentrantLock lock = new ReentrantLock(); + public Transaction(UUID transactionId, TransactionStatus initialTransactionStatus) { this.transactionId = transactionId; @@ -579,6 +631,48 @@ public class TransactionCoordinator implements ITransactionCoordinator this.transactionStatus = transactionStatus; } + public <T> T lockOrFail(Callable<T> action) + { + if (lock.tryLock()) + { + try + { + return action.call(); + } catch (RuntimeException e) + { + throw e; + } catch (Exception e) + { + throw new RuntimeException(e); + } finally + { + lock.unlock(); + } + } else + { + throw new RuntimeException( + "Cannot execute a new action on transaction '" + transactionId + "' as it is still busy executing a previous action."); + } + } + + public void lockOrSkip(Runnable action) + { + if (lock.tryLock()) + { + try + { + action.run(); + } finally + { + lock.unlock(); + } + } else + { + operationLog.info( + "Cannot execute a new action on transaction '" + transactionId + "' as it is still busy executing a previous action."); + } + } + public Date getLastAccessedDate() { return lastAccessedDate;