From fc3245d69a7408a7eb7cd502504f7f444a4771df Mon Sep 17 00:00:00 2001 From: juanf <juanf> Date: Mon, 22 Sep 2014 12:26:20 +0000 Subject: [PATCH] SSDM-351: Implement a queue for all asynchronous registartion from openbis UI (big refactoring) SVN: 32470 --- .../generic/server/AbstractServer.java | 46 ++-- .../web/server/GenericClientService.java | 223 +++++++++++------- .../web/server/queue/ConsumerQueue.java | 68 ++++-- .../client/web/server/queue/ConsumerTask.java | 35 ++- .../plugin/generic/server/GenericServer.java | 216 ----------------- .../generic/server/GenericServerLogger.java | 72 +----- .../plugin/generic/shared/IGenericServer.java | 68 ------ .../source/java/genericApplicationContext.xml | 4 + 8 files changed, 256 insertions(+), 476 deletions(-) diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/AbstractServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/AbstractServer.java index 925a65ede51..c43144f565e 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/AbstractServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/AbstractServer.java @@ -99,6 +99,9 @@ import ch.systemsx.cisd.openbis.generic.shared.util.ServerUtils; */ public abstract class AbstractServer<T> extends AbstractServiceWithLogger<T> implements IServer { + protected static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10, 360, + TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + private final static String ETL_SERVER_USERNAME_PREFIX = "etlserver"; protected static final class AuthenticatedPersonBasedPrincipalProvider implements @@ -978,25 +981,30 @@ public abstract class AbstractServer<T> extends AbstractServiceWithLogger<T> imp protected void executeASync(final String userEmail, final IASyncAction action) { final IMailClient mailClient = new MailClient(mailClientParameters); - StringWriter writer = new StringWriter(); - boolean success = true; - Date startDate = new Date(); - try - { - success = action.doAction(writer); - } catch (RuntimeException e) - { - operationLog.error("Asynchronous action '" + action.getName() + "' failed. ", e); - success = false; - } finally - { - try { - sendEmail(mailClient, writer.toString(), getSubject(action.getName(), startDate, success), userEmail); - } catch(Exception ex) { - operationLog.error("Asynchronous action '" + action.getName() + "' failed. ", ex); - } - - } + Runnable task = new Runnable() + { + @Override + public void run() + { + StringWriter writer = new StringWriter(); + boolean success = true; + Date startDate = new Date(); + try + { + success = action.doAction(writer); + } catch (RuntimeException e) + { + operationLog.error("Asynchronous action '" + action.getName() + + "' failed. ", e); + success = false; + } finally + { + sendEmail(mailClient, writer.toString(), + getSubject(action.getName(), startDate, success), userEmail); + } + } + }; + executor.submit(task); } protected void sendEmail(IMailClient mailClient, String content, String subject, diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/GenericClientService.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/GenericClientService.java index 4140ef0e3ca..283c15701fe 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/GenericClientService.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/GenericClientService.java @@ -16,6 +16,8 @@ package ch.systemsx.cisd.openbis.plugin.generic.client.web.server; +import java.io.IOException; +import java.io.Writer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -31,6 +33,7 @@ import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import ch.rinn.restrictions.Private; +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.servlet.IRequestContextProvider; import ch.systemsx.cisd.openbis.common.spring.IUncheckedMultipartFile; import ch.systemsx.cisd.openbis.generic.client.web.client.dto.DataSetUpdates; @@ -106,6 +109,9 @@ public class GenericClientService extends AbstractClientService implements IGene @Resource(name = ResourceNames.GENERIC_PLUGIN_SERVER) private IGenericServer genericServer; + @Resource(name = "registration-queue") + private ConsumerQueue asyncRegistrationQueue; + public GenericClientService() { } @@ -191,23 +197,29 @@ public class GenericClientService extends AbstractClientService implements IGene { final BatchOperationKind operationKind = updateExisting ? BatchOperationKind.UPDATE : BatchOperationKind.REGISTRATION; uploadedFiles = getUploadedFiles(sessionKey, httpSession); - BatchSamplesOperation info = parseSamples(sampleType, httpSession, uploadedFiles, defaultGroupIdentifier, isAutoGenerateCodes, true, null, operationKind, sessionToken); + BatchSamplesOperation info = parseSamples(sampleType, uploadedFiles, defaultGroupIdentifier, isAutoGenerateCodes, true, null, operationKind, sessionToken); if (async) { final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncSamplesTask = new ConsumerTask() { @Override - public String getTaskName() { return "Samples Registration Task"; } + public String getName() { return "Samples Registration Task"; } + + @Override + public String getUserEmail() { return userEmail; } @Override - public void executeTask() + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - BatchSamplesOperation asyncInfo = parseSamples(sampleType, httpSession, asyncUploadedFiles, defaultGroupIdentifier, isAutoGenerateCodes, true, null, operationKind, sessionToken); - //Execute task and clean files - genericServer.registerOrUpdateSamplesAsync(sessionToken, asyncInfo.getSamples(), userEmail); - cleanUploadedFiles(sessionKey, httpSession, asyncUploadedFiles); + try { + //Some stuff is repeated on the async executor, this is expected + BatchSamplesOperation asyncInfo = parseSamples(sampleType, asyncUploadedFiles, defaultGroupIdentifier, isAutoGenerateCodes, true, null, operationKind, sessionToken); + //Execute task and clean files + genericServer.registerOrUpdateSamples(sessionToken, asyncInfo.getSamples()); + } finally { + cleanUploadedFiles(sessionKey, httpSession, asyncUploadedFiles); + } } }; @@ -243,7 +255,7 @@ public class GenericClientService extends AbstractClientService implements IGene } } finally { if (async && (asyncSamplesTask != null)) { - ConsumerQueue.addTaskAsLast(asyncSamplesTask); + asyncRegistrationQueue.addTaskAsLast(asyncSamplesTask); } else { cleanUploadedFiles(sessionKey, httpSession, uploadedFiles); } @@ -265,23 +277,29 @@ public class GenericClientService extends AbstractClientService implements IGene try { uploadedFiles = getUploadedFiles(sessionKey, httpSession); - BatchSamplesOperation info = parseSamples(sampleType, httpSession, uploadedFiles, defaultGroupIdentifier, false, true, null, BatchOperationKind.UPDATE, sessionToken); + BatchSamplesOperation info = parseSamples(sampleType, uploadedFiles, defaultGroupIdentifier, false, true, null, BatchOperationKind.UPDATE, sessionToken); if (async) { final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncSamplesTask = new ConsumerTask() { @Override - public String getTaskName() { return "Samples Update Task"; } + public String getName() { return "Samples Update Task"; } @Override - public void executeTask() + public String getUserEmail() { return userEmail; } + + @Override + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - BatchSamplesOperation asyncInfo = parseSamples(sampleType, httpSession, asyncUploadedFiles, defaultGroupIdentifier, false, true, null, BatchOperationKind.UPDATE, sessionToken); - //Execute task and clean files - genericServer.updateSamplesAsync(sessionToken, asyncInfo.getSamples(), userEmail); - cleanUploadedFiles(sessionKey, httpSession, asyncUploadedFiles); + try { + //Some stuff is repeated on the async executor, this is expected + BatchSamplesOperation asyncInfo = parseSamples(sampleType, asyncUploadedFiles, defaultGroupIdentifier, false, true, null, BatchOperationKind.UPDATE, sessionToken); + //Execute task and clean files + genericServer.updateSamples(sessionToken, asyncInfo.getSamples()); + } finally { + cleanUploadedFiles(sessionKey, httpSession, asyncUploadedFiles); + } } }; @@ -297,7 +315,7 @@ public class GenericClientService extends AbstractClientService implements IGene throw UserFailureExceptionTranslator.translate(e); } finally { if (async && (asyncSamplesTask != null)) { - ConsumerQueue.addTaskAsLast(asyncSamplesTask); + asyncRegistrationQueue.addTaskAsLast(asyncSamplesTask); } else { cleanUploadedFiles(sessionKey, httpSession, uploadedFiles); } @@ -324,7 +342,7 @@ public class GenericClientService extends AbstractClientService implements IGene { uploadedFiles = getUploadedFiles(sessionKey, session); - BatchSamplesOperation samplesInfo = parseSamples(sampleType, session, uploadedFiles, defaultGroupIdentifier, defaultGroupIdentifier != null, true, "SAMPLES", operationKind, sessionToken); + BatchSamplesOperation samplesInfo = parseSamples(sampleType, uploadedFiles, defaultGroupIdentifier, defaultGroupIdentifier != null, true, "SAMPLES", operationKind, sessionToken); final MaterialType materialType = new MaterialType(); materialType.setCode(EntityType.DEFINED_IN_FILE); @@ -335,18 +353,23 @@ public class GenericClientService extends AbstractClientService implements IGene final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncGeneralTask = new ConsumerTask() { @Override - public String getTaskName() { return "General Batch Import Task"; } + public String getName() { return "General Batch Import Task"; } + + @Override + public String getUserEmail() { return userEmail; } @Override - public void executeTask() + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - BatchSamplesOperation asyncSamplesInfo = parseSamples(sampleType, session, asyncUploadedFiles, defaultGroupIdentifier, defaultGroupIdentifier != null, true, "SAMPLES", operationKind, sessionToken); - BatchMaterialsOperation asyncMaterialsInfo = parseMaterials(session, asyncUploadedFiles, materialType, "MATERIALS", updateExisting); - - //Execute task and clean files - genericServer.registerOrUpdateSamplesAndMaterialsAsync(sessionToken, asyncSamplesInfo.getSamples(), asyncMaterialsInfo.getMaterials(), userEmail); - cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); + try { + //Some stuff is repeated on the async executor, this is expected + BatchSamplesOperation asyncSamplesInfo = parseSamples(sampleType, asyncUploadedFiles, defaultGroupIdentifier, defaultGroupIdentifier != null, true, "SAMPLES", operationKind, sessionToken); + BatchMaterialsOperation asyncMaterialsInfo = parseMaterials(session, asyncUploadedFiles, materialType, "MATERIALS", updateExisting); + //Execute task and clean files + genericServer.registerOrUpdateSamplesAndMaterials(sessionToken, asyncSamplesInfo.getSamples(), asyncMaterialsInfo.getMaterials()); + } finally { + cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); + } } }; @@ -367,7 +390,7 @@ public class GenericClientService extends AbstractClientService implements IGene } finally { if (async && (asyncGeneralTask != null)) { - ConsumerQueue.addTaskAsLast(asyncGeneralTask); + asyncRegistrationQueue.addTaskAsLast(asyncGeneralTask); } else { cleanUploadedFiles(sessionKey, session, uploadedFiles); } @@ -423,8 +446,7 @@ public class GenericClientService extends AbstractClientService implements IGene return helper.exp; } - private BatchSamplesOperation parseSamples(final SampleType sampleType, - HttpSession httpSession, UploadedFilesBean uploadedFiles, + private BatchSamplesOperation parseSamples(final SampleType sampleType, UploadedFilesBean uploadedFiles, String defaultGroupIdentifier, final boolean isAutoGenerateCodes, final boolean allowExperiments, String excelSheetName, BatchOperationKind operationKind, String sessionToken) { @@ -460,7 +482,7 @@ public class GenericClientService extends AbstractClientService implements IGene try { uploadedFiles = getUploadedFiles(sessionKey, httpSession); - return parseSamples(sampleType, httpSession, uploadedFiles, defaultGroupIdentifier, isAutoGenerateCodes, allowExperiments, excelSheetName, operationKind, sessionToken); + return parseSamples(sampleType, uploadedFiles, defaultGroupIdentifier, isAutoGenerateCodes, allowExperiments, excelSheetName, operationKind, sessionToken); } finally { cleanUploadedFiles(sessionKey, httpSession, uploadedFiles); @@ -520,17 +542,23 @@ public class GenericClientService extends AbstractClientService implements IGene final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncMaterialTask = new ConsumerTask() { @Override - public String getTaskName() { return "Materials Registration Task"; } + public String getName() { return "Materials Registration Task"; } + + @Override + public String getUserEmail() { return userEmail; } @Override - public void executeTask() + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - BatchMaterialsOperation asyncResults = parseMaterials(session, asyncUploadedFiles, materialType, null, updateExisting); - List<NewMaterialsWithTypes> asyncMaterials = asyncResults.getMaterials(); - //Execute task and clean files - genericServer.registerOrUpdateMaterialsAsync(sessionToken, asyncMaterials, userEmail); - cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); + try { + //Some stuff is repeated on the async executor, this is expected + BatchMaterialsOperation asyncResults = parseMaterials(session, asyncUploadedFiles, materialType, null, updateExisting); + List<NewMaterialsWithTypes> asyncMaterials = asyncResults.getMaterials(); + //Execute task and clean files + genericServer.registerOrUpdateMaterials(sessionToken, asyncMaterials); + } finally { + cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); + } } }; @@ -542,7 +570,7 @@ public class GenericClientService extends AbstractClientService implements IGene } } finally { if (async && (asyncMaterialTask != null)) { - ConsumerQueue.addTaskAsLast(asyncMaterialTask); + asyncRegistrationQueue.addTaskAsLast(asyncMaterialTask); } else { cleanUploadedFiles(sessionKey, session, uploadedFiles); } @@ -572,16 +600,30 @@ public class GenericClientService extends AbstractClientService implements IGene final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncMaterialTask = new ConsumerTask() { @Override - public String getTaskName() { return "Materials Update Task"; } + public String getName() { return "Materials Update Task"; } + + @Override + public String getUserEmail() { return userEmail; } @Override - public void executeTask() + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - BatchMaterialsOperation asyncResults = parseMaterials(session, asyncUploadedFiles, materialType, null, true); - //Execute task and clean files - genericServer.updateMaterialsAsync(sessionToken, asyncResults.getMaterials(),ignoreUnregisteredMaterials, userEmail); - cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); + try + { + //Some stuff is repeated on the async executor, this is expected + BatchMaterialsOperation asyncResults = parseMaterials(session, asyncUploadedFiles, materialType, null, true); + //Execute task and clean files + int updateCount = genericServer.updateMaterials(sessionToken, asyncResults.getMaterials(), ignoreUnregisteredMaterials); + MaterialBatchUpdateResultMessage message = new MaterialBatchUpdateResultMessage(asyncResults.getMaterials(), updateCount, ignoreUnregisteredMaterials); + writer.write(message.toString()); + } catch (IOException e) + { + CheckedExceptionTunnel.wrapIfNecessary(e); + } finally { + cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); + } + + } }; @@ -595,7 +637,7 @@ public class GenericClientService extends AbstractClientService implements IGene } } finally { if (async && (asyncMaterialTask != null)) { - ConsumerQueue.addTaskAsLast(asyncMaterialTask); + asyncRegistrationQueue.addTaskAsLast(asyncMaterialTask); } else { cleanUploadedFiles(sessionKey, session, uploadedFiles); } @@ -631,24 +673,30 @@ public class GenericClientService extends AbstractClientService implements IGene final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncExperimentTask = new ConsumerTask() { @Override - public String getTaskName() { return "Experiments Update Task"; } + public String getName() { return "Experiments Update Task"; } @Override - public void executeTask() + public String getUserEmail() { return userEmail; } + + @Override + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - final Collection<NamedInputStream> asyncFiles = new ArrayList<NamedInputStream>(asyncUploadedFiles.size()); - for (IUncheckedMultipartFile f : asyncUploadedFiles.iterable()) - { - asyncFiles.add(new NamedInputStream(f.getInputStream(), f.getOriginalFilename())); + try { + //Some stuff is repeated on the async executor, this is expected + final Collection<NamedInputStream> asyncFiles = new ArrayList<NamedInputStream>(asyncUploadedFiles.size()); + for (IUncheckedMultipartFile f : asyncUploadedFiles.iterable()) + { + asyncFiles.add(new NamedInputStream(f.getInputStream(), f.getOriginalFilename())); + } + UpdatedExperimentLoader loaderAsync = new UpdatedExperimentLoader(); + loaderAsync.load(asyncFiles); + applyDefaultSpaceProjectToExperiments(loaderAsync.getNewBasicExperiments(), sessionToken); + final UpdatedExperimentsWithType updatedExperimentsAsync = new UpdatedExperimentsWithType(experimentType, loaderAsync.getNewBasicExperiments()); + //Execute task and clean files + genericServer.updateExperiments(sessionToken, updatedExperimentsAsync); + } finally { + cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); } - UpdatedExperimentLoader loaderAsync = new UpdatedExperimentLoader(); - loaderAsync.load(asyncFiles); - applyDefaultSpaceProjectToExperiments(loaderAsync.getNewBasicExperiments(), sessionToken); - final UpdatedExperimentsWithType updatedExperimentsAsync = new UpdatedExperimentsWithType(experimentType, loaderAsync.getNewBasicExperiments()); - //Execute task and clean files - genericServer.updateExperimentsAsync(sessionToken, updatedExperimentsAsync, userEmail); - cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); } }; @@ -663,7 +711,7 @@ public class GenericClientService extends AbstractClientService implements IGene } finally { if (async && (asyncExperimentTask != null)) { - ConsumerQueue.addTaskAsLast(asyncExperimentTask); + asyncRegistrationQueue.addTaskAsLast(asyncExperimentTask); } else { cleanUploadedFiles(sessionKey, session, uploadedFiles); } @@ -702,16 +750,19 @@ public class GenericClientService extends AbstractClientService implements IGene final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncExperimentTask = new ConsumerTask() { @Override - public String getTaskName() { return "Experiments Registration Task"; } + public String getName() { return "Experiments Registration Task"; } @Override - public void executeTask() + public String getUserEmail() { return userEmail; } + + @Override + public void doActionOrThrowException(Writer writer) { ExperimentLoader asyncLoader = getExperimentsFromFiles(asyncUploadedFiles); applyDefaultSpaceProjectToExperiments(asyncLoader.getNewBasicExperiments(), sessionToken); NewExperimentsWithType newExperiments = new NewExperimentsWithType(experimentType.getCode(), asyncLoader.getNewBasicExperiments()); //Execute task and clean files - genericServer.registerExperimentsAsync(sessionToken, newExperiments, userEmail); + genericServer.registerExperiments(sessionToken, newExperiments); cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); } }; @@ -726,7 +777,7 @@ public class GenericClientService extends AbstractClientService implements IGene } finally { if (async && (asyncExperimentTask != null)) { - ConsumerQueue.addTaskAsLast(asyncExperimentTask); + asyncRegistrationQueue.addTaskAsLast(asyncExperimentTask); } else { cleanUploadedFiles(sessionKey, session, uploadedFiles); } @@ -1019,24 +1070,30 @@ public class GenericClientService extends AbstractClientService implements IGene final UploadedFilesBean asyncUploadedFiles = uploadedFiles; asyncDatasetsTask = new ConsumerTask() { @Override - public String getTaskName() { return "Datasets Registration Task"; } + public String getName() { return "Datasets Registration Task"; } + + @Override + public String getUserEmail() { return userEmail; } @Override - public void executeTask() + public void doActionOrThrowException(Writer writer) { - //Some stuff is repeated on the async executor, this is expected - Collection<NamedInputStream> asyncFiles = new ArrayList<NamedInputStream>(asyncUploadedFiles.size()); - for (IUncheckedMultipartFile f : asyncUploadedFiles.iterable()) - { - asyncFiles.add(new NamedInputStream(f.getInputStream(), f.getOriginalFilename())); + try { + //Some stuff is repeated on the async executor, this is expected + Collection<NamedInputStream> asyncFiles = new ArrayList<NamedInputStream>(asyncUploadedFiles.size()); + for (IUncheckedMultipartFile f : asyncUploadedFiles.iterable()) + { + asyncFiles.add(new NamedInputStream(f.getInputStream(), f.getOriginalFilename())); + } + DataSetLoader asyncLoader = new DataSetLoader(); + asyncLoader.load(asyncFiles); + NewDataSetsWithTypes asyncNewDataSetsWithTypes = new NewDataSetsWithTypes(dataSetType, asyncLoader.getNewDataSets()); + + //Execute task and clean files + genericServer.updateDataSets(sessionToken, asyncNewDataSetsWithTypes); + } finally { + cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); } - DataSetLoader asyncLoader = new DataSetLoader(); - asyncLoader.load(asyncFiles); - NewDataSetsWithTypes asyncNewDataSetsWithTypes = new NewDataSetsWithTypes(dataSetType, asyncLoader.getNewDataSets()); - - //Execute task and clean files - genericServer.updateDataSetsAsync(sessionToken, asyncNewDataSetsWithTypes, userEmail); - cleanUploadedFiles(sessionKey, session, asyncUploadedFiles); } }; String fileName = loader.getResults().get(0).getFileName(); @@ -1049,7 +1106,7 @@ public class GenericClientService extends AbstractClientService implements IGene } finally { if (async && (asyncDatasetsTask != null)) { - ConsumerQueue.addTaskAsLast(asyncDatasetsTask); + asyncRegistrationQueue.addTaskAsLast(asyncDatasetsTask); } else { cleanUploadedFiles(sessionKey, session, uploadedFiles); } diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerQueue.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerQueue.java index 9c5f65f8e2f..a9e16f6e4ec 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerQueue.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerQueue.java @@ -1,10 +1,15 @@ package ch.systemsx.cisd.openbis.plugin.generic.client.web.server.queue; +import java.io.StringWriter; +import java.util.Date; import java.util.Deque; import java.util.LinkedList; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.mail.IMailClient; +import ch.systemsx.cisd.common.mail.MailClient; +import ch.systemsx.cisd.common.mail.MailClientParameters; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -14,46 +19,77 @@ import org.apache.log4j.Logger; */ public final class ConsumerQueue { - private static final Deque<ConsumerTask> consumerQueue = new LinkedList<ConsumerTask>(); - private static final Logger trackingLog = LogFactory.getLogger(LogCategory.TRACKING, ConsumerQueue.class); + public ConsumerQueue(MailClientParameters mailClientParameters) { + this.mailClientParameters = mailClientParameters; + } - public static final synchronized void addTaskAsLast(ConsumerTask task) { + public final synchronized void addTaskAsLast(ConsumerTask task) { consumerQueue.addLast(task); } - private static final synchronized ConsumerTask getNextTask() { + private final synchronized ConsumerTask getNextTask() { return consumerQueue.pollFirst(); } + private final Deque<ConsumerTask> consumerQueue = new LinkedList<ConsumerTask>(); + private final Logger trackingLog = LogFactory.getLogger(LogCategory.TRACKING, ConsumerQueue.class); + private final MailClientParameters mailClientParameters; + // Consumer Thread - static { + { Thread consumerThread = new Thread() { @Override public void run() { while(true) { - //We start with a null task for save coding - ConsumerTask consumerTask = new ConsumerTask() { - @Override - public String getTaskName() { return "Null Task"; } - - @Override - public void executeTask() {} - }; - + StringWriter writer = new StringWriter(); + boolean success = true; + Date startDate = new Date(); + ConsumerTask consumerTask = null; try { consumerTask = getNextTask(); if(consumerTask != null) { - consumerTask.executeTask(); + success = consumerTask.doAction(writer); } else { Thread.sleep(1000 * 5); } } catch(Throwable anyError) { - trackingLog.log(Level.ERROR, consumerTask.getTaskName(), anyError); + trackingLog.log(Level.ERROR, consumerTask.getUserEmail(), anyError); + success = false; + } finally { + if(consumerTask != null) { + try { + final IMailClient mailClient = new MailClient(mailClientParameters); + sendEmail(mailClient, writer.toString(), getSubject(consumerTask.getName(), startDate, success), consumerTask.getUserEmail()); + } catch(Throwable anyErrorOnMail) { + trackingLog.log(Level.ERROR, consumerTask.getUserEmail(), anyErrorOnMail); + } + } } } } }; consumerThread.start(); } + + // + // Mail management + // + private void sendEmail(IMailClient mailClient, String content, String subject, + String... recipient) + { + mailClient.sendMessage(subject, content, null, null, recipient); + } + + private String getSubject(String actionName, Date startDate, boolean success) + { + return addDate(actionName + " " + (success ? "successfully performed" : "failed"), + startDate); + } + + private String addDate(String subject, Date startDate) + { + return subject + " (initiated at " + startDate + ")"; + } } + diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerTask.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerTask.java index f5e4d19177f..e4be5b0e0ec 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerTask.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/client/web/server/queue/ConsumerTask.java @@ -1,7 +1,36 @@ package ch.systemsx.cisd.openbis.plugin.generic.client.web.server.queue; -public interface ConsumerTask +import java.io.IOException; +import java.io.Writer; + +import ch.systemsx.cisd.common.exceptions.UserFailureException; + +public abstract class ConsumerTask { - String getTaskName(); - void executeTask(); + public abstract String getName(); + public abstract String getUserEmail(); + public abstract void doActionOrThrowException(Writer writer); + + public boolean doAction(Writer messageWriter) + { + try + { + doActionOrThrowException(messageWriter); + } catch (RuntimeException ex) + { + try + { + messageWriter.write(getName() + " has failed with a following exception: "); + messageWriter.write(ex.getMessage()); + messageWriter.write("\n\nPlease correct the error or contact your administrator."); + } catch (IOException writingEx) + { + throw new UserFailureException(writingEx.getMessage() + + " when trying to throw exception: " + ex.getMessage(), ex); + } + throw ex; + } + return true; + + } } diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServer.java index e1c2054377b..be6fd5edd78 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServer.java @@ -468,31 +468,6 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen } } - @Override - @RolesAllowed(RoleWithHierarchy.SPACE_USER) - @Capability("WRITE_SAMPLE") - public void updateSamplesAsync(final String sessionToken, @AuthorizationGuard(guardClass = NewSamplesWithTypePredicate.class) - final List<NewSamplesWithTypes> newSamplesWithType, final String userEmail) throws UserFailureException - { - assert sessionToken != null : "Unspecified session token."; - checkSession(sessionToken); - - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Sample Batch Update"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.updateSamples(sessionToken, newSamplesWithType); - } - }); - } - @Override @RolesAllowed(RoleWithHierarchy.SPACE_POWER_USER) @Capability("WRITE_DATASET") @@ -523,31 +498,6 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen convertDataSets(newDataSets)); } - @Override - @RolesAllowed(RoleWithHierarchy.SPACE_POWER_USER) - @Capability("WRITE_DATASET") - public void updateDataSetsAsync(final String sessionToken, @AuthorizationGuard(guardClass = NewDataSetsWithTypePredicate.class) - final NewDataSetsWithTypes dataSets, final String userEmail) throws UserFailureException - { - assert sessionToken != null : "Unspecified session token."; - checkSession(sessionToken); - - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Data Set Batch Update"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.updateDataSets(sessionToken, dataSets); - } - }); - } - private void updateSamples(final Session session, final NewSamplesWithTypes updatedSamplesWithType) { @@ -765,45 +715,6 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen return count; } - @Override - @RolesAllowed(RoleWithHierarchy.INSTANCE_ADMIN) - @Capability("WRITE_MATERIAL") - public void updateMaterialsAsync(final String sessionToken, - final List<NewMaterialsWithTypes> newMaterials, - final boolean ignoreUnregisteredMaterials, final String userEmail) - throws UserFailureException - { - assert sessionToken != null : "Unspecified session token."; - checkSession(sessionToken); - - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Material Batch Update"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - try - { - int updateCount = - genericServer.updateMaterials(sessionToken, newMaterials, - ignoreUnregisteredMaterials); - MaterialBatchUpdateResultMessage message = - new MaterialBatchUpdateResultMessage(newMaterials, updateCount, - ignoreUnregisteredMaterials); - messageWriter.write(message.toString()); - } catch (IOException e) - { - CheckedExceptionTunnel.wrapIfNecessary(e); - } - } - }); - } - @Override @RolesAllowed(RoleWithHierarchy.SPACE_OBSERVER) public AttachmentWithContent getProjectFileAttachment(String sessionToken, @@ -913,32 +824,6 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen } } - @Override - @RolesAllowed(RoleWithHierarchy.INSTANCE_ADMIN) - @Capability("WRITE_MATERIAL") - public void registerOrUpdateMaterialsAsync(final String sessionToken, - final List<NewMaterialsWithTypes> materials, final String userEmail) - throws UserFailureException - { - assert sessionToken != null : "Unspecified session token."; - checkSession(sessionToken); - - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Material Batch Registration"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.registerOrUpdateMaterials(sessionToken, materials); - } - }); - } - @Override @RolesAllowed(RoleWithHierarchy.SPACE_USER) @Capability("WRITE_EXPERIMENT_SAMPLE") @@ -973,32 +858,6 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen experimentTypePE)); } - @Override - @RolesAllowed(RoleWithHierarchy.SPACE_USER) - @Capability("WRITE_EXPERIMENT_SAMPLE") - public void registerExperimentsAsync(final String sessionToken, @AuthorizationGuard(guardClass = NewExperimentsWithTypePredicate.class) - final NewExperimentsWithType experiments, String userEmail) - throws UserFailureException - { - assert sessionToken != null : "Unspecified session token."; - checkSession(sessionToken); - - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Experiment Batch Registration"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.registerExperiments(sessionToken, experiments); - } - }); - } - /** * @param sessionToken The session token for the request * @param experiments Should be a NewExperimentsWithType where the newExperiments contains a collection of {@link UpdatedBasicExperiment} objects. @@ -1037,32 +896,6 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen experimentTypePE)); } - @Override - @RolesAllowed(RoleWithHierarchy.SPACE_USER) - @Capability("WRITE_EXPERIMENT_SAMPLE") - public void updateExperimentsAsync(final String sessionToken, @AuthorizationGuard(guardClass = UpdatedExperimentsWithTypePredicate.class) - final UpdatedExperimentsWithType experiments, final String userEmail) - throws UserFailureException - { - assert sessionToken != null : "Unspecified session token."; - checkSession(sessionToken); - - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Experiment Batch Update"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.updateExperiments(sessionToken, experiments); - } - }); - } - /** * @param updatedExperiments The experiments should actually be instances of UpdatedBasicExperiment. */ @@ -1146,53 +979,4 @@ public final class GenericServer extends AbstractServer<IGenericServer> implemen privateRegisterOrUpdateSamples(sessionToken, newSamplesWithType); } - @Override - @RolesAllowed(RoleWithHierarchy.INSTANCE_ADMIN) - @Capability("WRITE_EXPERIMENT_SAMPLE_MATERIAL") - public void registerOrUpdateSamplesAndMaterialsAsync(final String sessionToken, - final List<NewSamplesWithTypes> newSamplesWithType, - final List<NewMaterialsWithTypes> newMaterialsWithType, String userEmail) - throws UserFailureException - { - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "General Batch Import"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.registerOrUpdateSamplesAndMaterials(sessionToken, - newSamplesWithType, newMaterialsWithType); - } - }); - } - - @Override - @RolesAllowed(RoleWithHierarchy.SPACE_USER) - @Capability("WRITE_EXPERIMENT_SAMPLE_MATERIAL") - public void registerOrUpdateSamplesAsync(final String sessionToken, - @AuthorizationGuard(guardClass = NewSamplesWithTypePredicate.class) - final List<NewSamplesWithTypes> newSamplesWithType, String userEmail) - throws UserFailureException - { - executeASync(userEmail, new AbstractASyncAction() - { - @Override - public String getName() - { - return "Sample Batch Registration"; - } - - @Override - protected void doActionOrThrowException(Writer messageWriter) - { - genericServer.registerOrUpdateSamples(sessionToken, newSamplesWithType); - } - }); - } - } diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServerLogger.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServerLogger.java index 3bc2bcbb19d..94a841749d1 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServerLogger.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/server/GenericServerLogger.java @@ -130,15 +130,6 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric return 0; } - @Override - public void updateMaterialsAsync(String sessionToken, List<NewMaterialsWithTypes> newMaterials, - boolean ignoreUnregisteredMaterials, String userEmail) throws UserFailureException - { - logTracking(sessionToken, "update_materials_async", - "MATERIALS(%S) IGNORE_UNREGISTERED_MATERIALS(%S) USER_EMAIL(%S)", - getMaterials(newMaterials), ignoreUnregisteredMaterials, userEmail); - } - @Override public AttachmentWithContent getExperimentFileAttachment(final String sessionToken, final TechId experimentId, final String filename, final Integer versionOrNull) @@ -227,13 +218,6 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric logTracking(sessionToken, "update_samples", "SAMPLES(%s)", getSamples(updatedSamplesWithType)); } - @Override - public void updateSamplesAsync(String sessionToken, List<NewSamplesWithTypes> updatedSamplesWithType, String userEmail) - throws UserFailureException - { - logTracking(sessionToken, "update_samples_async", "SAMPLES(%s) EMAIL(%s)", getSamples(updatedSamplesWithType), userEmail); - } - @Override public void registerOrUpdateMaterials(String sessionToken, List<NewMaterialsWithTypes> materials) { @@ -245,19 +229,6 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric } } - @Override - public void registerOrUpdateMaterialsAsync(String sessionToken, - List<NewMaterialsWithTypes> materials, String userEmail) - { - for (NewMaterialsWithTypes materialsWithType : materials) - { - logTracking(sessionToken, "registerOrUpdateMaterialsAsync", - "type(%s) numberOfMaterials(%s) userEmail(%s)", materialsWithType - .getEntityType().getCode(), materialsWithType.getNewEntities().size(), - userEmail); - } - } - @Override public void registerOrUpdateSamples(String sessionToken, List<NewSamplesWithTypes> newSamplesWithType) throws UserFailureException @@ -273,13 +244,6 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric dataSets.getDataSetType().getCode(), dataSets.getNewDataSets().size()); } - @Override - public void updateDataSetsAsync(String sessionToken, NewDataSetsWithTypes dataSets, String userEmail) throws UserFailureException - { - logTracking(sessionToken, "update_data_sets", "TYPE(%s) DATA_SETS(%s) EMAIL(%s)", - dataSets.getDataSetType().getCode(), dataSets.getNewDataSets().size(), userEmail); - } - @Override public void registerExperiments(String sessionToken, NewExperimentsWithType experiments) throws UserFailureException @@ -288,13 +252,6 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric experiments.getExperimentTypeCode(), experiments.getNewExperiments().size()); } - @Override - public void registerExperimentsAsync(String sessionToken, NewExperimentsWithType experiments, String userEmail) throws UserFailureException - { - logTracking(sessionToken, "register_experiments_async", "TYPE(%s) EXPERIMENTS(%s) EMAIL(%s)", - experiments.getExperimentTypeCode(), experiments.getNewExperiments().size(), userEmail); - } - @Override public void updateExperiments(String sessionToken, UpdatedExperimentsWithType experiments) throws UserFailureException @@ -302,14 +259,7 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric logTracking(sessionToken, "update_experiments", "TYPE(%s) EXPERIMENTS(%s)", experiments .getExperimentType().getCode(), experiments.getUpdatedExperiments().size()); } - - @Override - public void updateExperimentsAsync(String sessionToken, UpdatedExperimentsWithType experiments, String userEmail) throws UserFailureException - { - logTracking(sessionToken, "update_experiments_async", "TYPE(%s) EXPERIMENTS(%s) EMAIL(%s)", experiments - .getExperimentType().getCode(), experiments.getUpdatedExperiments().size(), userEmail); - } - + @Override public void registerOrUpdateSamplesAndMaterials(final String sessionToken, final List<NewSamplesWithTypes> newSamplesWithType, @@ -321,26 +271,6 @@ final class GenericServerLogger extends AbstractServerLogger implements IGeneric getMaterials(newMaterialsWithType)); } - @Override - public void registerOrUpdateSamplesAndMaterialsAsync(final String sessionToken, - final List<NewSamplesWithTypes> newSamplesWithType, - final List<NewMaterialsWithTypes> newMaterialsWithType, String userEmail) - throws UserFailureException - { - logTracking(sessionToken, "register_or_update_samples_and_materials_async", - "SAMPLES(%s) MATERIALS(%s) EMAIL(%s)", getSamples(newSamplesWithType), - getMaterials(newMaterialsWithType), userEmail); - } - - @Override - public void registerOrUpdateSamplesAsync(String sessionToken, - List<NewSamplesWithTypes> newSamplesWithType, String userEmail) - throws UserFailureException - { - logTracking(sessionToken, "register_or_update_samples_async", "SAMPLES(%s) EMAIL(%s)", - getSamples(newSamplesWithType), userEmail); - } - private static String getSamples(final List<NewSamplesWithTypes> newSamplesWithType) { StringBuilder samples = new StringBuilder(); diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/shared/IGenericServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/shared/IGenericServer.java index 64fc0332aec..8ec947826e1 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/shared/IGenericServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/generic/shared/IGenericServer.java @@ -110,15 +110,6 @@ public interface IGenericServer extends IServer public void registerOrUpdateSamples(final String sessionToken, final List<NewSamplesWithTypes> newSamplesWithType) throws UserFailureException; - /** - * Asynchronously registers or updates samples of different types in batches. - */ - @Transactional - @DatabaseCreateOrDeleteModification(value = ObjectKind.SAMPLE) - public void registerOrUpdateSamplesAsync(final String sessionToken, - final List<NewSamplesWithTypes> newSamplesWithType, String userEmail) - throws UserFailureException; - /** * Registers or updates samples and materials of different types in batches. */ @@ -129,17 +120,6 @@ public interface IGenericServer extends IServer final List<NewSamplesWithTypes> newSamplesWithType, List<NewMaterialsWithTypes> newMaterialsWithType) throws UserFailureException; - /** - * Asynchronously registers or updates samples and materials of different types in batches. - */ - @Transactional - @DatabaseCreateOrDeleteModification(value = - { ObjectKind.SAMPLE, ObjectKind.MATERIAL }) - public void registerOrUpdateSamplesAndMaterialsAsync(final String sessionToken, - final List<NewSamplesWithTypes> newSamplesWithType, - final List<NewMaterialsWithTypes> newMaterialsWithType, String userEmail) - throws UserFailureException; - /** * Updates samples of different types in batches. */ @@ -148,14 +128,6 @@ public interface IGenericServer extends IServer public void updateSamples(final String sessionToken, final List<NewSamplesWithTypes> newSamplesWithType) throws UserFailureException; - /** - * Asynchronously updates samples of different types in batches. - */ - @Transactional - @DatabaseUpdateModification(value = ObjectKind.SAMPLE) - public void updateSamplesAsync(final String sessionToken, - final List<NewSamplesWithTypes> newSamplesWithType, String userEmail) throws UserFailureException; - /** * Registers experiment. At the same time samples may be registered or updated. */ @@ -174,14 +146,6 @@ public interface IGenericServer extends IServer public void registerExperiments(String sessionToken, final NewExperimentsWithType experiments) throws UserFailureException; - /** - * Asynchronously registers experiments in batch. - */ - @Transactional - @DatabaseCreateOrDeleteModification(value = ObjectKind.EXPERIMENT) - public void registerExperimentsAsync(String sessionToken, final NewExperimentsWithType experiments, String userEmail) - throws UserFailureException; - /** * Update experiments in batch. */ @@ -190,14 +154,6 @@ public interface IGenericServer extends IServer public void updateExperiments(String sessionToken, final UpdatedExperimentsWithType experiments) throws UserFailureException; - /** - * Asynchronously update experiments in batch. - */ - @Transactional - @DatabaseUpdateModification(value = ObjectKind.EXPERIMENT) - public void updateExperimentsAsync(String sessionToken, final UpdatedExperimentsWithType experiments, String userEmail) - throws UserFailureException; - /** * Registers materials in batch. */ @@ -216,14 +172,6 @@ public interface IGenericServer extends IServer public int updateMaterials(String sessionToken, List<NewMaterialsWithTypes> newMaterials, boolean ignoreUnregisteredMaterials) throws UserFailureException; - /** - * Asynchronously updates materials in batch. - */ - @Transactional - @DatabaseCreateOrDeleteModification(value = ObjectKind.MATERIAL) - public void updateMaterialsAsync(String sessionToken, List<NewMaterialsWithTypes> newMaterials, - boolean ignoreUnregisteredMaterials, String userEmail) throws UserFailureException; - /** * Registers new materials or if they exist updates in batch their properties (properties which are not mentioned stay unchanged). */ @@ -232,14 +180,6 @@ public interface IGenericServer extends IServer public void registerOrUpdateMaterials(String sessionToken, List<NewMaterialsWithTypes> materials) throws UserFailureException; - /** - * Asynchronously registers new materials or if they exist updates in batch their properties (properties which are not mentioned stay unchanged). - */ - @Transactional - @DatabaseCreateOrDeleteModification(value = ObjectKind.MATERIAL) - public void registerOrUpdateMaterialsAsync(String sessionToken, - List<NewMaterialsWithTypes> materials, String userEmail) throws UserFailureException; - /** * Returns attachment described by given sample identifier, filename and version. */ @@ -299,12 +239,4 @@ public interface IGenericServer extends IServer public void updateDataSets(final String sessionToken, final NewDataSetsWithTypes dataSets) throws UserFailureException; - /** - * Asynchronously updates data sets of different types in batches. - */ - @Transactional - @DatabaseUpdateModification(value = ObjectKind.DATA_SET) - public void updateDataSetsAsync(final String sessionToken, final NewDataSetsWithTypes dataSets, String userEmail) - throws UserFailureException; - } diff --git a/openbis/source/java/genericApplicationContext.xml b/openbis/source/java/genericApplicationContext.xml index 367fce924c9..a9a18ac6481 100644 --- a/openbis/source/java/genericApplicationContext.xml +++ b/openbis/source/java/genericApplicationContext.xml @@ -198,6 +198,10 @@ <property name="maxResults" value="${hibernate.search.maxResults}" /> </bean> + <bean id="registration-queue" class="ch.systemsx.cisd.openbis.plugin.generic.client.web.server.queue.ConsumerQueue"> + <constructor-arg ref="mail-client-parameters" /> + </bean> + <!-- // HTML Escaping --> -- GitLab