diff --git a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java index ee4b88a3210336591b0c9162cd959ab5ec01d36a..dbcc6f1ab72ed9ca0ae05b6bfb5c015977ba8194 100644 --- a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java +++ b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionCoordinator.java @@ -1,9 +1,12 @@ package ch.ethz.sis.transaction; +import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; @@ -23,10 +26,17 @@ public class TransactionCoordinator implements ITransactionCoordinator private final List<ITransactionParticipant> participants; + private final Map<UUID, Transaction> transactionMap = new ConcurrentHashMap<>(); + private final ITransactionLog transactionLog; + private final int transactionTimeoutInSeconds; + + private final int transactionCountLimit; + public TransactionCoordinator(final String transactionCoordinatorKey, final String interactiveSessionKey, - final ISessionTokenProvider sessionTokenProvider, final List<ITransactionParticipant> participants, final ITransactionLog transactionLog) + final ISessionTokenProvider sessionTokenProvider, final List<ITransactionParticipant> participants, final ITransactionLog transactionLog, + int transactionTimeoutInSeconds, int transactionCountLimit) { if (transactionCoordinatorKey == null) { @@ -53,11 +63,23 @@ public class TransactionCoordinator implements ITransactionCoordinator throw new IllegalArgumentException("Transaction log cannot be null"); } + if (transactionTimeoutInSeconds <= 0) + { + throw new IllegalArgumentException("Transaction timeout cannot be <= 0"); + } + + if (transactionCountLimit <= 0) + { + throw new IllegalArgumentException("Transaction count cannot be <= 0"); + } + this.transactionCoordinatorKey = transactionCoordinatorKey; this.interactiveSessionKey = interactiveSessionKey; this.sessionTokenProvider = sessionTokenProvider; this.participants = participants; this.transactionLog = transactionLog; + this.transactionTimeoutInSeconds = transactionTimeoutInSeconds; + this.transactionCountLimit = transactionCountLimit; } public void restoreTransactions() @@ -70,34 +92,85 @@ public class TransactionCoordinator implements ITransactionCoordinator { TransactionStatus lastStatus = lastStatuses.get(transactionId); - operationLog.info("Restoring transaction '" + transactionId + "' with last status '" + lastStatus + "'"); + if (TransactionStatus.COMMIT_FINISHED.equals(lastStatus) || TransactionStatus.ROLLBACK_FINISHED.equals(lastStatus)) + { + continue; + } + + Transaction existingTransaction = getTransaction(transactionId); try { - switch (lastStatus) + if (existingTransaction == null) + { + operationLog.info("Restoring transaction '" + transactionId + "' with last status '" + lastStatus + "'"); + + Transaction transaction = createTransaction(transactionId, lastStatus); + + synchronized (transaction) + { + switch (lastStatus) + { + case BEGIN_STARTED: + case BEGIN_FINISHED: + case PREPARE_STARTED: + case ROLLBACK_STARTED: + case ROLLBACK_FAILED: + rollbackTransaction(transaction, null, null, true); + break; + case PREPARE_FINISHED: + case COMMIT_STARTED: + case COMMIT_FAILED: + commitPreparedTransaction(transaction, null, null, true); + break; + default: + throw new IllegalStateException( + "Transaction '" + transactionId + "' has an unsupported last status '" + lastStatus + "'"); + } + } + } else { - case BEGIN_STARTED: - case BEGIN_FINISHED: - case PREPARE_STARTED: - case ROLLBACK_STARTED: - rollbackTransaction(transactionId, null, null, true); - break; - case PREPARE_FINISHED: - case COMMIT_STARTED: - commitPreparedTransaction(transactionId, null, null, true); - break; - case COMMIT_FINISHED: - case ROLLBACK_FINISHED: - // nothing to do - break; - default: - operationLog.error( - "Transaction '" + transactionId + "' restore failed because of an unknown transaction last status '" + lastStatus - + "'"); + synchronized (existingTransaction) + { + if (TransactionStatus.COMMIT_FAILED.equals(lastStatus)) + { + operationLog.info("Another attempt to commit transaction '" + transactionId + "'"); + commitPreparedTransaction(existingTransaction, null, null, true); + } else if (TransactionStatus.ROLLBACK_FAILED.equals(lastStatus)) + { + operationLog.info("Another attempt to rollback transaction '" + transactionId + "'"); + rollbackTransaction(existingTransaction, null, null, true); + } + } } } catch (Exception e) { - operationLog.info("Restore of transaction '" + transactionId + "' with last status '" + lastStatus + "' failed.", e); + if (existingTransaction == null) + { + operationLog.warn("Restore of transaction '" + transactionId + "' with last status '" + lastStatus + "' failed.", e); + } else + { + operationLog.warn( + "Another attempt to finish a transaction '" + transactionId + "' with last status '" + lastStatus + "' failed."); + } + } + } + } + } + + public void cleanupTransactions() + { + for (Transaction transaction : transactionMap.values()) + { + boolean timedOut = System.currentTimeMillis() - transaction.getLastAccessedDate().getTime() > transactionTimeoutInSeconds * 1000L; + + if (timedOut && TransactionStatus.BEGIN_FINISHED.equals(transaction.getTransactionStatus())) + { + synchronized (transaction) + { + operationLog.info("Cleaning up abandoned transaction '" + transaction.getTransactionId() + "' with last status '" + + transaction.getTransactionStatus() + "' and last accessed date '" + transaction.getLastAccessedDate() + "'."); + rollbackTransaction(transaction, null, null, false); } } } @@ -109,36 +182,41 @@ public class TransactionCoordinator implements ITransactionCoordinator checkSessionToken(sessionToken); checkInteractiveSessionKey(interactiveSessionKey); - operationLog.info("Begin transaction '" + transactionId + "' started."); - - transactionLog.logStatus(transactionId, TransactionStatus.BEGIN_STARTED); + Transaction transaction = createTransaction(transactionId, TransactionStatus.NEW); - for (ITransactionParticipant participant : participants) + synchronized (transaction) { - try - { - operationLog.info("Begin transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); + transaction.setTransactionStatus(TransactionStatus.BEGIN_STARTED); - participant.beginTransaction(transactionId, sessionToken, interactiveSessionKey, transactionCoordinatorKey); - } catch (Exception e) - { - operationLog.info( - "Begin transaction '" + transactionId + "' failed for participant '" + participant.getParticipantId() + "'.", e); + operationLog.info("Begin transaction '" + transactionId + "' started."); + for (ITransactionParticipant participant : participants) + { try { - rollbackTransaction(transactionId, sessionToken, interactiveSessionKey); - } catch (Exception ignore) + operationLog.info("Begin transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); + + participant.beginTransaction(transactionId, sessionToken, interactiveSessionKey, transactionCoordinatorKey); + } catch (Exception e) { - } + operationLog.info( + "Begin transaction '" + transactionId + "' failed for participant '" + participant.getParticipantId() + "'.", e); - throw e; + try + { + rollbackTransaction(transactionId, sessionToken, interactiveSessionKey); + } catch (Exception ignore) + { + } + + throw e; + } } - } - transactionLog.logStatus(transactionId, TransactionStatus.BEGIN_FINISHED); + transaction.setTransactionStatus(TransactionStatus.BEGIN_FINISHED); - operationLog.info("Begin transaction '" + transactionId + "' finished successfully."); + operationLog.info("Begin transaction '" + transactionId + "' finished successfully."); + } } @Override public <T> T executeOperation(final UUID transactionId, final String sessionToken, final String interactiveSessionKey, @@ -148,21 +226,34 @@ public class TransactionCoordinator implements ITransactionCoordinator checkSessionToken(sessionToken); checkInteractiveSessionKey(interactiveSessionKey); - operationLog.info("Transaction '" + transactionId + "' execute operation '" + operationName + "' started."); + Transaction transaction = getTransaction(transactionId); - for (ITransactionParticipant participant : participants) + if (transaction == null) + { + throw new IllegalStateException("Transaction '" + transactionId + "' does not exist."); + } + + synchronized (transaction) { - if (Objects.equals(participant.getParticipantId(), participantId)) + checkTransactionStatus(transaction, TransactionStatus.BEGIN_FINISHED); + + operationLog.info("Transaction '" + transactionId + "' execute operation '" + operationName + "' started."); + + for (ITransactionParticipant participant : participants) { - T result = participant.executeOperation(transactionId, sessionToken, interactiveSessionKey, operationName, operationArguments); + if (Objects.equals(participant.getParticipantId(), participantId)) + { + T result = + participant.executeOperation(transactionId, sessionToken, interactiveSessionKey, operationName, operationArguments); - operationLog.info("Transaction '" + transactionId + "' execute operation '" + operationName + "' finished successfully."); + operationLog.info("Transaction '" + transactionId + "' execute operation '" + operationName + "' finished successfully."); - return result; + return result; + } } - } - throw new IllegalArgumentException("Unknown participant id: " + participantId); + throw new IllegalArgumentException("Unknown participant id: " + participantId); + } } @Override public void commitTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey) @@ -171,56 +262,71 @@ public class TransactionCoordinator implements ITransactionCoordinator checkSessionToken(sessionToken); checkInteractiveSessionKey(interactiveSessionKey); - operationLog.info("Commit transaction '" + transactionId + "' started."); + Transaction transaction = getTransaction(transactionId); - prepareTransaction(transactionId, sessionToken, interactiveSessionKey); - commitPreparedTransaction(transactionId, sessionToken, interactiveSessionKey, false); + if (transaction == null) + { + throw new IllegalStateException("Transaction '" + transactionId + "' does not exist."); + } - operationLog.info("Commit transaction '" + transactionId + "' finished successfully."); + synchronized (transaction) + { + checkTransactionStatus(transaction, TransactionStatus.BEGIN_FINISHED); + + operationLog.info("Commit transaction '" + transactionId + "' started."); + + prepareTransaction(transaction, sessionToken, interactiveSessionKey); + commitPreparedTransaction(transaction, sessionToken, interactiveSessionKey, false); + + operationLog.info("Commit transaction '" + transactionId + "' finished successfully."); + } } - private void prepareTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey) + private void prepareTransaction(final Transaction transaction, final String sessionToken, final String interactiveSessionKey) { - operationLog.info("Prepare transaction '" + transactionId + "' started."); + operationLog.info("Prepare transaction '" + transaction.getTransactionId() + "' started."); - transactionLog.logStatus(transactionId, TransactionStatus.PREPARE_STARTED); + transaction.setTransactionStatus(TransactionStatus.PREPARE_STARTED); for (ITransactionParticipant participant : participants) { try { - operationLog.info("Prepare transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); - participant.prepareTransaction(transactionId, sessionToken, interactiveSessionKey, transactionCoordinatorKey); + operationLog.info( + "Prepare transaction '" + transaction.getTransactionId() + "' for participant '" + participant.getParticipantId() + "'."); + participant.prepareTransaction(transaction.getTransactionId(), sessionToken, interactiveSessionKey, transactionCoordinatorKey); } catch (Exception e) { operationLog.info( - "Prepare transaction '" + transactionId + "' failed for participant '" + participant.getParticipantId() + "'.", e); + "Prepare transaction '" + transaction.getTransactionId() + "' failed for participant '" + participant.getParticipantId() + + "'.", e); try { - rollbackTransaction(transactionId, sessionToken, interactiveSessionKey); + rollbackTransaction(transaction.getTransactionId(), sessionToken, interactiveSessionKey); } catch (Exception ignore) { } - operationLog.info("Prepare transaction '" + transactionId + "' failed."); + operationLog.info("Prepare transaction '" + transaction.getTransactionId() + "' failed."); throw e; } } - transactionLog.logStatus(transactionId, TransactionStatus.PREPARE_FINISHED); + transaction.setTransactionStatus(TransactionStatus.PREPARE_FINISHED); - operationLog.info("Prepare transaction '" + transactionId + "' finished successfully."); + operationLog.info("Prepare transaction '" + transaction.getTransactionId() + "' finished successfully."); } - private void commitPreparedTransaction(UUID transactionId, final String sessionToken, final String interactiveSessionKey, boolean restore) + private void commitPreparedTransaction(Transaction transaction, final String sessionToken, final String interactiveSessionKey, + final boolean restore) { - operationLog.info("Commit prepared transaction '" + transactionId + "' started."); + operationLog.info("Commit prepared transaction '" + transaction.getTransactionId() + "' started."); - transactionLog.logStatus(transactionId, TransactionStatus.COMMIT_STARTED); + transaction.setTransactionStatus(TransactionStatus.COMMIT_STARTED); - RuntimeException exception = null; + boolean failed = false; for (ITransactionParticipant participant : participants) { @@ -230,42 +336,45 @@ public class TransactionCoordinator implements ITransactionCoordinator { List<UUID> transactions = participant.getTransactions(transactionCoordinatorKey); - if (transactions != null && transactions.contains(transactionId)) + if (transactions != null && transactions.contains(transaction.getTransactionId())) { operationLog.info( - "Commit prepared transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); - participant.commitTransaction(transactionId, transactionCoordinatorKey); + "Commit prepared transaction '" + transaction.getTransactionId() + "' for participant '" + + participant.getParticipantId() + "'."); + participant.commitTransaction(transaction.getTransactionId(), transactionCoordinatorKey); } else { operationLog.info( - "Skipping commit of prepared transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "Skipping commit of prepared transaction '" + transaction.getTransactionId() + "' for participant '" + + participant.getParticipantId() + "'. The transaction has been already committed at that participant before."); } } else { operationLog.info( - "Commit prepared transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); - participant.commitTransaction(transactionId, sessionToken, interactiveSessionKey); + "Commit prepared transaction '" + transaction.getTransactionId() + "' for participant '" + + participant.getParticipantId() + + "'."); + participant.commitTransaction(transaction.getTransactionId(), sessionToken, interactiveSessionKey); } } catch (RuntimeException e) { operationLog.error( - "Commit prepared transaction '" + transactionId + "' failed for participant '" + participant.getParticipantId() + "'.", e); - if (exception == null) - { - exception = e; - } + "Commit prepared transaction '" + transaction.getTransactionId() + "' failed for participant '" + + participant.getParticipantId() + "'.", e); + failed = true; } } - if (exception == null) + if (failed) { - transactionLog.logStatus(transactionId, TransactionStatus.COMMIT_FINISHED); - operationLog.info("Commit prepared transaction '" + transactionId + "' finished successfully."); + transaction.setTransactionStatus(TransactionStatus.COMMIT_FAILED); + operationLog.info("Commit prepared transaction '" + transaction.getTransactionId() + "' failed."); } else { - operationLog.info("Commit prepared transaction '" + transactionId + "' failed."); - throw exception; + transaction.setTransactionStatus(TransactionStatus.COMMIT_FINISHED); + transactionMap.remove(transaction.getTransactionId()); + operationLog.info("Commit prepared transaction '" + transaction.getTransactionId() + "' finished successfully."); } } @@ -275,16 +384,31 @@ public class TransactionCoordinator implements ITransactionCoordinator checkSessionToken(sessionToken); checkInteractiveSessionKey(interactiveSessionKey); - rollbackTransaction(transactionId, sessionToken, interactiveSessionKey, false); + Transaction transaction = getTransaction(transactionId); + + if (transaction == null) + { + return; + } + + synchronized (transaction) + { + checkTransactionStatus(transaction, TransactionStatus.BEGIN_STARTED, TransactionStatus.BEGIN_FINISHED, + TransactionStatus.PREPARE_STARTED, + TransactionStatus.PREPARE_FINISHED, TransactionStatus.ROLLBACK_STARTED, TransactionStatus.ROLLBACK_FINISHED); + + rollbackTransaction(transaction, sessionToken, interactiveSessionKey, false); + } } - private void rollbackTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey, final boolean restore) + private void rollbackTransaction(final Transaction transaction, final String sessionToken, final String interactiveSessionKey, + final boolean restore) { - operationLog.info("Rollback transaction '" + transactionId + "' started."); + operationLog.info("Rollback transaction '" + transaction.getTransactionId() + "' started."); - transactionLog.logStatus(transactionId, TransactionStatus.ROLLBACK_STARTED); + transaction.setTransactionStatus(TransactionStatus.ROLLBACK_STARTED); - RuntimeException exception = null; + boolean failed = false; for (ITransactionParticipant participant : participants) { @@ -294,40 +418,44 @@ public class TransactionCoordinator implements ITransactionCoordinator { List<UUID> transactions = participant.getTransactions(transactionCoordinatorKey); - if (transactions != null && transactions.contains(transactionId)) + if (transactions != null && transactions.contains(transaction.getTransactionId())) { - operationLog.info("Rollback transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); - participant.rollbackTransaction(transactionId, transactionCoordinatorKey); + operationLog.info( + "Rollback transaction '" + transaction.getTransactionId() + "' for participant '" + participant.getParticipantId() + + "'."); + participant.rollbackTransaction(transaction.getTransactionId(), transactionCoordinatorKey); } else { operationLog.info( - "Skipping rollback of transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "Skipping rollback of transaction '" + transaction.getTransactionId() + "' for participant '" + + participant.getParticipantId() + "'. The transaction has been already rolled back at that participant before."); } } else { - operationLog.info("Rollback transaction '" + transactionId + "' for participant '" + participant.getParticipantId() + "'."); - participant.rollbackTransaction(transactionId, sessionToken, interactiveSessionKey); + operationLog.info( + "Rollback transaction '" + transaction.getTransactionId() + "' for participant '" + participant.getParticipantId() + + "'."); + participant.rollbackTransaction(transaction.getTransactionId(), sessionToken, interactiveSessionKey); } } catch (RuntimeException e) { - operationLog.info("Rollback transaction '" + transactionId + "' failed for participant '" + participant.getParticipantId() + "'.", e); - - if (exception == null) - { - exception = e; - } + operationLog.info( + "Rollback transaction '" + transaction.getTransactionId() + "' failed for participant '" + participant.getParticipantId() + + "'.", e); + failed = true; } } - if (exception == null) + if (failed) { - transactionLog.logStatus(transactionId, TransactionStatus.ROLLBACK_FINISHED); - operationLog.info("Rollback transaction '" + transactionId + "' finished successfully."); + transaction.setTransactionStatus(TransactionStatus.ROLLBACK_FAILED); + operationLog.info("Rollback transaction '" + transaction.getTransactionId() + "' failed."); } else { - operationLog.info("Rollback transaction '" + transactionId + "' failed."); - throw exception; + transaction.setTransactionStatus(TransactionStatus.ROLLBACK_FINISHED); + transactionMap.remove(transaction.getTransactionId()); + operationLog.info("Rollback transaction '" + transaction.getTransactionId() + "' finished successfully."); } } @@ -360,4 +488,107 @@ public class TransactionCoordinator implements ITransactionCoordinator } } + private void checkTransactionStatus(final Transaction transaction, final TransactionStatus... expectedStatuses) + { + for (final TransactionStatus expectedStatus : expectedStatuses) + { + if (transaction.getTransactionStatus() == expectedStatus) + { + return; + } + } + + throw new IllegalStateException( + "Transaction '" + transaction.getTransactionId() + "' unexpected status '" + transaction.getTransactionStatus() + + "'. Expected statuses '" + + Arrays.toString(expectedStatuses) + "'."); + } + + private Transaction createTransaction(UUID transactionId, TransactionStatus initialTransactionStatus) + { + synchronized (transactionMap) + { + Transaction transaction = transactionMap.get(transactionId); + + if (transaction == null) + { + if (transactionMap.size() < transactionCountLimit) + { + transaction = new Transaction(transactionId, initialTransactionStatus); + transactionMap.put(transactionId, transaction); + return transaction; + } else + { + throw new IllegalStateException( + "Cannot create transaction '" + transactionId + "' because transaction count limit (" + transactionCountLimit + + ") has been reached."); + } + } else + { + throw new IllegalStateException("Transaction '" + transactionId + "' already exists."); + } + } + } + + private Transaction getTransaction(UUID transactionId) + { + synchronized (transactionMap) + { + Transaction transaction = transactionMap.get(transactionId); + + if (transaction == null) + { + return null; + } else + { + transaction.setLastAccessedDate(new Date()); + return transaction; + } + + } + } + + private class Transaction + { + + private final UUID transactionId; + + private TransactionStatus transactionStatus; + + private Date lastAccessedDate = new Date(); + + public Transaction(UUID transactionId, TransactionStatus initialTransactionStatus) + { + this.transactionId = transactionId; + this.transactionStatus = initialTransactionStatus; + } + + public UUID getTransactionId() + { + return transactionId; + } + + public TransactionStatus getTransactionStatus() + { + return transactionStatus; + } + + public void setTransactionStatus(final TransactionStatus transactionStatus) + { + transactionLog.logStatus(transactionId, transactionStatus); + this.transactionStatus = transactionStatus; + } + + public Date getLastAccessedDate() + { + return lastAccessedDate; + } + + public void setLastAccessedDate(final Date lastAccessedDate) + { + this.lastAccessedDate = lastAccessedDate; + } + + } + } diff --git a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionParticipant.java b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionParticipant.java index b54201b1a107623a1531c743d2de07565511a1e2..060a14283d5759109b5282da1e0df6b7efe7bc94 100644 --- a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionParticipant.java +++ b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionParticipant.java @@ -2,6 +2,8 @@ package ch.ethz.sis.transaction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,11 +43,14 @@ public class TransactionParticipant implements ITransactionParticipant private final ITransactionLog transactionLog; + private final int transactionTimeoutInSeconds; + private final int transactionCountLimit; public TransactionParticipant(String participantId, String transactionCoordinatorKey, String interactiveSessionKey, ISessionTokenProvider sessionTokenProvider, IDatabaseTransactionProvider databaseTransactionProvider, - ITransactionOperationExecutor operationExecutor, ITransactionLog transactionLog, int transactionCountLimit) + ITransactionOperationExecutor operationExecutor, ITransactionLog transactionLog, int transactionTimeoutInSeconds, + int transactionCountLimit) { if (participantId == null) { @@ -82,6 +87,11 @@ public class TransactionParticipant implements ITransactionParticipant throw new IllegalArgumentException("Transaction log cannot be null"); } + if (transactionTimeoutInSeconds <= 0) + { + throw new IllegalArgumentException("Transaction timeout cannot be <= 0"); + } + if (transactionCountLimit <= 0) { throw new IllegalArgumentException("Transaction count cannot be <= 0"); @@ -94,6 +104,7 @@ public class TransactionParticipant implements ITransactionParticipant this.databaseTransactionProvider = databaseTransactionProvider; this.operationExecutor = operationExecutor; this.transactionLog = transactionLog; + this.transactionTimeoutInSeconds = transactionTimeoutInSeconds; this.transactionCountLimit = transactionCountLimit; } @@ -116,6 +127,18 @@ public class TransactionParticipant implements ITransactionParticipant try { + synchronized (threadMap) + { + TransactionThread thread = threadMap.get(transactionId); + + if (thread == null) + { + thread = new TransactionThread(transactionId, lastStatus, transactionCoordinatorKey); + threadMap.put(transactionId, thread); + thread.startThread(); + } + } + switch (lastStatus) { case BEGIN_STARTED: @@ -147,6 +170,21 @@ public class TransactionParticipant implements ITransactionParticipant } } + public void cleanupTransactions() + { + Collection<TransactionThread> threads; + + synchronized (threadMap) + { + threads = threadMap.values(); + } + + for (TransactionThread thread : threads) + { + thread.finishIfIdle(transactionTimeoutInSeconds); + } + } + @Override public void beginTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey, final String transactionCoordinatorKey) { @@ -263,7 +301,7 @@ public class TransactionParticipant implements ITransactionParticipant operationLog.info("Creating a new thread for transaction '" + transactionId + "'."); - thread = new TransactionThread(transactionId, interactiveSessionKey, transactionCoordinatorKey); + thread = new TransactionThread(transactionId, TransactionStatus.NEW, transactionCoordinatorKey); threadMap.put(transactionId, thread); thread.startThread(); } else @@ -314,12 +352,10 @@ public class TransactionParticipant implements ITransactionParticipant private Object transactionObject; - private TransactionStatus transactionStatus = TransactionStatus.NEW; + private TransactionStatus transactionStatus; private final UUID transactionId; - private final String interactiveSessionKey; - private final String transactionCoordinatorKey; private String sessionToken; @@ -332,10 +368,12 @@ public class TransactionParticipant implements ITransactionParticipant private Throwable operationException; - public TransactionThread(UUID transactionId, String interactiveSessionKey, String transactionCoordinatorKey) + private Date lastAccessedDate = new Date(); + + public TransactionThread(UUID transactionId, TransactionStatus initialTransactionStatus, String transactionCoordinatorKey) { this.transactionId = transactionId; - this.interactiveSessionKey = interactiveSessionKey; + this.transactionStatus = initialTransactionStatus; this.transactionCoordinatorKey = transactionCoordinatorKey; this.thread = new Thread(new Runnable() { @@ -492,6 +530,8 @@ public class TransactionParticipant implements ITransactionParticipant operationArguments = newOperationArguments; operationResult = null; operationException = null; + lastAccessedDate = new Date(); + lock.notifyAll(); operationLog.info("Transaction '" + transactionId + "' thread scheduled operation '" + newOperationName + "' execution."); @@ -504,7 +544,7 @@ public class TransactionParticipant implements ITransactionParticipant } catch (Exception e) { operationLog.error( - "Scheduling of transaction '" + operationName + "' operation '" + operationName + "Scheduling of transaction '" + transactionId + "' operation '" + operationName + "' execution in thread failed or got interrupted.", e); throw new RuntimeException(e); } @@ -530,14 +570,30 @@ public class TransactionParticipant implements ITransactionParticipant return threadFinished; } - public String getInteractiveSessionKey() + private void finishIfIdle(long timeoutInSeconds) { - return interactiveSessionKey; - } + if (transactionCoordinatorKey != null) + { + return; + } - public String getTransactionCoordinatorKey() - { - return transactionCoordinatorKey; + Date timeoutDate = new Date(lastAccessedDate.getTime() + timeoutInSeconds * 1000); + + if (timeoutDate.before(new Date())) + { + return; + } + + synchronized (lock) + { + if (!threadFinished) + { + operationLog.info("Transaction '" + transactionId + "' thread has been idle since + " + lastAccessedDate + + " which is longer than the configured transaction timeout of " + timeoutInSeconds + + " seconds. The transaction is not a two-phase-commit transaction therefore it will be rolled back."); + executeOperation(null, ROLLBACK_TRANSACTION_METHOD, null); + } + } } } diff --git a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionStatus.java b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionStatus.java index 5153e90fd36f91edca96bb93faf44db2687f7b2a..7da704dd98f2c090869ec9762e43371203259bde 100644 --- a/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionStatus.java +++ b/lib-commonbase/source/java/ch/ethz/sis/transaction/TransactionStatus.java @@ -8,9 +8,11 @@ public enum TransactionStatus PREPARE_STARTED(BEGIN_FINISHED), PREPARE_FINISHED(PREPARE_STARTED), COMMIT_STARTED(PREPARE_FINISHED), + COMMIT_FAILED(COMMIT_STARTED), COMMIT_FINISHED(COMMIT_STARTED), ROLLBACK_STARTED(BEGIN_STARTED, BEGIN_FINISHED, PREPARE_STARTED, PREPARE_FINISHED, COMMIT_STARTED), - ROLLBACK_FINISHED(ROLLBACK_STARTED); + ROLLBACK_FINISHED(ROLLBACK_STARTED), + ROLLBACK_FAILED(ROLLBACK_STARTED); private final TransactionStatus[] previousStatuses; diff --git a/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionCoordinatorTest.java b/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionCoordinatorTest.java index 4b2bad2a3c17906a53a3ef70de3c1304dbe63b61..9d2121b40f9f602ea9183dcd184197fa0bd6782b 100644 --- a/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionCoordinatorTest.java +++ b/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionCoordinatorTest.java @@ -44,6 +44,10 @@ public class TransactionCoordinatorTest public static final Object[] TEST_OPERATION_ARGUMENTS = new Object[] { 1, "abc" }; + public static final int TEST_TIMEOUT = 60; + + public static final int TEST_COUNT_LIMIT = 10; + private Mockery mockery; private ITransactionParticipant participant1; @@ -78,7 +82,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -106,7 +110,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Exception beginException = new RuntimeException(); Exception rollbackException = new RuntimeException(); @@ -132,6 +136,8 @@ public class TransactionCoordinatorTest // test that a failing rollback won't prevent other rollbacks from being called will(throwException(rollbackException)); one(participant2).rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); + + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.ROLLBACK_FAILED); } }); @@ -149,7 +155,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -184,7 +190,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Exception executeOperationException = new RuntimeException(); @@ -231,7 +237,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -271,7 +277,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -314,7 +320,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2, participant3), transactionLog); + List.of(participant1, participant2, participant3), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Exception prepareException = new RuntimeException(); Exception rollbackException = new RuntimeException(); @@ -351,6 +357,8 @@ public class TransactionCoordinatorTest will(throwException(rollbackException)); one(participant2).rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); one(participant3).rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); + + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.ROLLBACK_FAILED); } }); @@ -371,7 +379,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2, participant3), transactionLog); + List.of(participant1, participant2, participant3), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Exception commitException1 = new RuntimeException(); Exception commitException2 = new RuntimeException(); @@ -411,20 +419,13 @@ public class TransactionCoordinatorTest one(participant2).commitTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); will(throwException(commitException2)); one(participant3).commitTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); + + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.COMMIT_FAILED); } }); coordinator.beginTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); - - try - { - coordinator.commitTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); - Assert.fail(); - } catch (Exception e) - { - // first commit exception is exposed - assertEquals(e, commitException1); - } + coordinator.commitTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); } @Test @@ -432,7 +433,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -467,7 +468,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2, participant3), transactionLog); + List.of(participant1, participant2, participant3), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Exception rollbackException1 = new RuntimeException(); Exception rollbackException2 = new RuntimeException(); @@ -497,20 +498,13 @@ public class TransactionCoordinatorTest one(participant2).rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); will(throwException(rollbackException2)); one(participant3).rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); + + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.ROLLBACK_FAILED); } }); coordinator.beginTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); - - try - { - coordinator.rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); - fail(); - } catch (Exception e) - { - // first rollback exception is exposed - assertEquals(e, rollbackException1); - } + coordinator.rollbackTransaction(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, TEST_INTERACTIVE_SESSION_KEY); } @Test(dataProvider = "provideTestRestoreTransactionWithStatus") @@ -518,7 +512,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2, participant3), transactionLog); + List.of(participant1, participant2, participant3), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Map<UUID, TransactionStatus> lastStatuses = new HashMap<>(); lastStatuses.put(TEST_TRANSACTION_ID, transactionStatus); @@ -541,6 +535,7 @@ public class TransactionCoordinatorTest case BEGIN_FINISHED: case PREPARE_STARTED: case ROLLBACK_STARTED: + case ROLLBACK_FAILED: // only participant 1 and 2 know the transaction one(participant1).getTransactions(TEST_TRANSACTION_COORDINATOR_KEY); will(returnValue(Collections.singletonList(TEST_TRANSACTION_ID))); @@ -556,6 +551,7 @@ public class TransactionCoordinatorTest { // test that a failing rollback won't prevent other rollbacks from being called will(throwException(exception)); + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.ROLLBACK_FAILED); } one(participant2).rollbackTransaction(TEST_TRANSACTION_ID, TEST_TRANSACTION_COORDINATOR_KEY); @@ -567,6 +563,7 @@ public class TransactionCoordinatorTest break; case PREPARE_FINISHED: case COMMIT_STARTED: + case COMMIT_FAILED: // only participant 1 and 2 know the transaction one(participant1).getTransactions(TEST_TRANSACTION_COORDINATOR_KEY); will(returnValue(Collections.singletonList(TEST_TRANSACTION_ID))); @@ -582,6 +579,7 @@ public class TransactionCoordinatorTest { // test that a failing commit won't prevent other commits from being called will(throwException(exception)); + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.COMMIT_FAILED); } one(participant2).commitTransaction(TEST_TRANSACTION_ID, TEST_TRANSACTION_COORDINATOR_KEY); @@ -605,7 +603,7 @@ public class TransactionCoordinatorTest { TransactionCoordinator coordinator = new TransactionCoordinator(TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - List.of(participant1, participant2), transactionLog); + List.of(participant1, participant2), transactionLog, TEST_TIMEOUT, TEST_COUNT_LIMIT); Map<UUID, TransactionStatus> lastStatuses = new HashMap<>(); lastStatuses.put(TEST_TRANSACTION_ID, TransactionStatus.PREPARE_FINISHED); @@ -635,6 +633,7 @@ public class TransactionCoordinatorTest one(participant1).commitTransaction(TEST_TRANSACTION_ID, TEST_TRANSACTION_COORDINATOR_KEY); will(throwException(exception)); one(participant2).commitTransaction(TEST_TRANSACTION_ID, TEST_TRANSACTION_COORDINATOR_KEY); + one(transactionLog).logStatus(TEST_TRANSACTION_ID, TransactionStatus.COMMIT_FAILED); // restore transaction 2 (only participant 1) one(transactionLog).logStatus(TEST_TRANSACTION_ID_2, TransactionStatus.COMMIT_STARTED); diff --git a/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionParticipantTest.java b/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionParticipantTest.java index 001f90dd0201cf3a980d4fa8cc9c211b21294413..43768c60e1e735b9dd54c9c8a9717603c5678e7a 100644 --- a/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionParticipantTest.java +++ b/lib-commonbase/sourceTest/java/ch/ethz/sis/transaction/TransactionParticipantTest.java @@ -64,6 +64,8 @@ public class TransactionParticipantTest public static final Error TEST_ERROR = new Error("Test error"); + public static final int TEST_TRANSACTION_TIMEOUT = 60; + public static final int TEST_THREAD_COUNT_LIMIT = 5; private Mockery mockery; @@ -97,7 +99,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); MutableObject<String> transaction1BeginThreadName = new MutableObject<>(); MutableObject<String> transaction1PrepareThreadName = new MutableObject<>(); @@ -208,11 +210,11 @@ public class TransactionParticipantTest participant.beginTransaction(TEST_TRANSACTION_ID_2, TEST_SESSION_TOKEN, interactiveSessionKey, transactionCoordinatorKey); // execute 1 String transaction1OperationThreadName = - (String) participant.executeOperation(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, interactiveSessionKey, TEST_OPERATION_NAME, + participant.executeOperation(TEST_TRANSACTION_ID, TEST_SESSION_TOKEN, interactiveSessionKey, TEST_OPERATION_NAME, TEST_OPERATION_ARGUMENTS); // execute 2 String transaction2OperationThreadName = - (String) participant.executeOperation(TEST_TRANSACTION_ID_2, TEST_SESSION_TOKEN, interactiveSessionKey, TEST_OPERATION_NAME_2, + participant.executeOperation(TEST_TRANSACTION_ID_2, TEST_SESSION_TOKEN, interactiveSessionKey, TEST_OPERATION_NAME_2, TEST_OPERATION_ARGUMENTS_2); if (transactionCoordinatorKey != null) @@ -268,7 +270,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -312,7 +314,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -377,7 +379,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -430,7 +432,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -483,7 +485,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -580,7 +582,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -629,7 +631,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -733,7 +735,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -804,7 +806,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -925,7 +927,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -998,7 +1000,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1047,7 +1049,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1071,7 +1073,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1098,7 +1100,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1126,7 +1128,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1145,7 +1147,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1164,7 +1166,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1199,7 +1201,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1242,7 +1244,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1295,7 +1297,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1329,7 +1331,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1364,7 +1366,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1408,7 +1410,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1453,7 +1455,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1499,7 +1501,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1540,7 +1542,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { @@ -1581,7 +1583,7 @@ public class TransactionParticipantTest { TransactionParticipant participant = new TransactionParticipant(TEST_PARTICIPANT_ID, TEST_TRANSACTION_COORDINATOR_KEY, TEST_INTERACTIVE_SESSION_KEY, sessionTokenProvider, - databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_THREAD_COUNT_LIMIT); + databaseTransactionProvider, transactionOperationExecutor, transactionLog, TEST_TRANSACTION_TIMEOUT, TEST_THREAD_COUNT_LIMIT); mockery.checking(new Expectations() { diff --git a/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionCoordinatorApi.java b/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionCoordinatorApi.java index c74f16a26c8d360c1350b4baf7ee629fd0e5068a..6668832720c8932ccadeb6e8fdc529a5d149f905 100644 --- a/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionCoordinatorApi.java +++ b/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionCoordinatorApi.java @@ -42,7 +42,9 @@ public class TransactionCoordinatorApi implements ITransactionCoordinatorApi transactionConfiguration.getInteractiveSessionKey(), new ApplicationServerSessionTokenProvider(applicationServerApi), participants, - new TransactionLog(new File(transactionConfiguration.getTransactionLogFolderPath()), "coordinator")); + new TransactionLog(new File(transactionConfiguration.getTransactionLogFolderPath()), "coordinator"), + transactionConfiguration.getTransactionTimeoutInSeconds(), + transactionConfiguration.getTransactionCountLimit()); } @Override public void beginTransaction(final UUID transactionId, final String sessionToken, final String interactiveSessionKey) diff --git a/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionParticipantApi.java b/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionParticipantApi.java index ae24f7be50bcb33ba79784d1a59a1f4f464c1f75..3515a024dc6226e4e071257f990d9ab524a1a283 100644 --- a/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionParticipantApi.java +++ b/server-application-server/source/java/ch/ethz/sis/openbis/generic/server/asapi/v3/TransactionParticipantApi.java @@ -50,6 +50,7 @@ public class TransactionParticipantApi implements ITransactionParticipantApi new ApplicationServerDatabaseTransactionProvider(transactionManager, daoFactory, databaseContext), new ApplicationServerTransactionOperationExecutor(applicationServerApi), new TransactionLog(new File(transactionConfiguration.getTransactionLogFolderPath()), "application-server"), + transactionConfiguration.getTransactionTimeoutInSeconds(), transactionConfiguration.getTransactionCountLimit() ); } diff --git a/server-application-server/sourceTest/java/ch/ethz/sis/openbis/systemtest/asapi/v3/TransactionTest.java b/server-application-server/sourceTest/java/ch/ethz/sis/openbis/systemtest/asapi/v3/TransactionTest.java index 76958c1fe71b9a05cd5a8e5550c6392bdf3e2a24..3f8f2f72648847a37be8b8aafbc6c0eb0f7b8d51 100644 --- a/server-application-server/sourceTest/java/ch/ethz/sis/openbis/systemtest/asapi/v3/TransactionTest.java +++ b/server-application-server/sourceTest/java/ch/ethz/sis/openbis/systemtest/asapi/v3/TransactionTest.java @@ -62,7 +62,7 @@ public class TransactionTest extends AbstractTest { return v3api.isSessionActive(sessionToken); } - }, Collections.singletonList(participantApi), new TransactionLog(new File("targets/transaction-logs"), "coordinator")); + }, Collections.singletonList(participantApi), new TransactionLog(new File("targets/transaction-logs"), "coordinator"), 60, 10); } @Test @@ -206,8 +206,7 @@ public class TransactionTest extends AbstractTest fail(); } catch (Exception e) { - assertEquals(e.getMessage(), "java.lang.IllegalStateException: Transaction '" + tr1Id - + "' unexpected status 'NEW'. Expected statuses '[BEGIN_FINISHED]'."); + assertEquals(e.getMessage(), "Transaction '" + tr1Id + "' does not exist."); } // after tr1 commit, tr2 sees space1 and space2, noTr sees space1 and space3 @@ -228,8 +227,7 @@ public class TransactionTest extends AbstractTest fail(); } catch (Exception e) { - assertEquals(e.getMessage(), "java.lang.IllegalStateException: Transaction '" + tr2Id - + "' unexpected status 'NEW'. Expected statuses '[BEGIN_FINISHED]'."); + assertEquals(e.getMessage(), "Transaction '" + tr2Id + "' does not exist."); } // after tr1 commit and tr2 rollback, noTr sees space1 and space3