Skip to content
Snippets Groups Projects
Commit d39e5b86 authored by felmer's avatar felmer
Browse files

SSDM-8600: Introducing queue for clean-up tasks. No long run cleanupSession() asynchroneously

parent ab56d094
No related branches found
No related tags found
No related merge requests found
......@@ -16,10 +16,14 @@
package ch.systemsx.cisd.openbis.generic.server;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import ch.systemsx.cisd.authentication.ISessionFactory;
import ch.systemsx.cisd.authentication.Principal;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.logging.LogLevel;
......@@ -39,6 +43,8 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.Session.ISessionCleaner;
*/
public final class SessionFactory implements ISessionFactory<Session>
{
private final static BlockingQueue<CleanUpTask> sessionsToBeClosedQueue = new LinkedBlockingQueue<>();
private final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION,
SessionFactory.class);
......@@ -48,6 +54,33 @@ public final class SessionFactory implements ISessionFactory<Session>
private final ISessionWorkspaceProvider sessionWorkspaceProvider;
static
{
Thread thread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
while (true)
{
CleanUpTask task = sessionsToBeClosedQueue.take();
task.execute();
}
} catch (InterruptedException ex)
{
// Exit thread.
} catch (InterruptedExceptionUnchecked ex)
{
// Exit thread.
}
}
}, "Session clean up queue");
thread.setDaemon(true);
thread.start();
}
public SessionFactory()
{
this(null, null, null);
......@@ -95,7 +128,7 @@ public final class SessionFactory implements ISessionFactory<Session>
final String remoteUrl = datastore.getRemoteUrl();
if (StringUtils.isBlank(remoteUrl) == false)
{
dssFactory.createMonitored(remoteUrl, LogLevel.WARN).cleanupSession(sessionToken);
sessionsToBeClosedQueue.add(new CleanUpTask(dssFactory, remoteUrl, sessionToken));
} else
{
operationLog.warn("datastore remoteUrl of datastore " + datastore.getCode()
......@@ -103,4 +136,34 @@ public final class SessionFactory implements ISessionFactory<Session>
}
}
}
private static final class CleanUpTask
{
private IDataStoreServiceFactory dssFactory;
private String remoteUrl;
private String sessionToken;
CleanUpTask(IDataStoreServiceFactory dssFactory, String remoteUrl, String sessionToken)
{
this.dssFactory = dssFactory;
this.remoteUrl = remoteUrl;
this.sessionToken = sessionToken;
}
void execute()
{
try
{
operationLog.info("Execute clean-up task for session " + sessionToken + " on DSS " + remoteUrl);
dssFactory.createMonitored(remoteUrl, LogLevel.WARN).cleanupSession(sessionToken);
operationLog.info("Clean-up task for session " + sessionToken + " finished on DSS " + remoteUrl);
} catch (Exception e)
{
operationLog.error("Clean-up task for session " + sessionToken + " failed on DSS " + remoteUrl
+ ". Reason: " + e, e);
}
}
}
}
......@@ -20,14 +20,12 @@ import static ch.systemsx.cisd.openbis.generic.shared.basic.GenericSharedConstan
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.log4j.Logger;
import org.springframework.remoting.RemoteAccessException;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor;
import ch.systemsx.cisd.common.concurrent.MonitoringProxy;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
import ch.systemsx.cisd.common.logging.LogCategory;
......@@ -44,17 +42,12 @@ import ch.systemsx.cisd.openbis.generic.shared.IDataStoreService;
*/
public class DataStoreServiceFactory implements IDataStoreServiceFactory
{
private final static int NUMBER_OF_CORE_THREADS = 10;
private final Map<String, IDataStoreService> services =
new HashMap<String, IDataStoreService>();
private final static Logger machineLog = LogFactory.getLogger(LogCategory.MACHINE,
IDataStoreService.class);
private final static ExecutorService executorService = new NamingThreadPoolExecutor(
"Monitoring Proxy").corePoolSize(NUMBER_OF_CORE_THREADS).daemonize();
@Override
public IDataStoreService create(String serverURL)
{
......@@ -87,14 +80,8 @@ public class DataStoreServiceFactory implements IDataStoreServiceFactory
.logLevelForNotSuccessfulCalls(LogLevel.WARN)
.timing(TimingParameters.create(-1L, 5, DateUtils.MILLIS_PER_MINUTE))
.exceptionClassSuitableForRetrying(RemoteAccessException.class)
.executorService(executorService)
.callAsynchronously(
IDataStoreService.class.getMethod("cleanupSession", new Class<?>[]
{ String.class })).get();
.get();
} catch (SecurityException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
} catch (NoSuchMethodException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment