Skip to content
Snippets Groups Projects
Commit cd62d468 authored by buczekp's avatar buczekp
Browse files

[LMS-1453] improved updater queue (it doesn't crash on exception but retries...

[LMS-1453] improved updater queue (it doesn't crash on exception but retries update after some time)

SVN: 15263
parent 6df52c02
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ import java.io.File; ...@@ -20,6 +20,7 @@ import java.io.File;
import java.util.List; import java.util.List;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.remoting.RemoteAccessException;
import ch.rinn.restrictions.Private; import ch.rinn.restrictions.Private;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
...@@ -28,6 +29,7 @@ import ch.systemsx.cisd.common.collections.ExtendedBlockingQueueFactory; ...@@ -28,6 +29,7 @@ import ch.systemsx.cisd.common.collections.ExtendedBlockingQueueFactory;
import ch.systemsx.cisd.common.collections.IExtendedBlockingQueue; import ch.systemsx.cisd.common.collections.IExtendedBlockingQueue;
import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecorator; import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecorator;
import ch.systemsx.cisd.common.collections.RecordBasedQueuePersister; import ch.systemsx.cisd.common.collections.RecordBasedQueuePersister;
import ch.systemsx.cisd.common.exceptions.UserFailureException;
import ch.systemsx.cisd.common.filesystem.ICloseable; import ch.systemsx.cisd.common.filesystem.ICloseable;
import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogFactory;
...@@ -52,6 +54,9 @@ public class QueueingDataSetStatusUpdaterService ...@@ -52,6 +54,9 @@ public class QueueingDataSetStatusUpdaterService
private final static Logger operationLog = private final static Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, QueueingDataSetStatusUpdaterService.class); LogFactory.getLogger(LogCategory.OPERATION, QueueingDataSetStatusUpdaterService.class);
private static final Logger notificationLog =
LogFactory.getLogger(LogCategory.NOTIFY, QueueingDataSetStatusUpdaterService.class);
private static final int INITIAL_RECORD_SIZE = 128; private static final int INITIAL_RECORD_SIZE = 128;
@Private @Private
...@@ -86,12 +91,12 @@ public class QueueingDataSetStatusUpdaterService ...@@ -86,12 +91,12 @@ public class QueueingDataSetStatusUpdaterService
*/ */
public static synchronized final void start(final File queueFile, TimingParameters parameters) public static synchronized final void start(final File queueFile, TimingParameters parameters)
{ {
updater = createDataSetStatusUpdater();
final PersistentExtendedBlockingQueueDecorator<DataSetCodeWithStatus> persistentQueue = final PersistentExtendedBlockingQueueDecorator<DataSetCodeWithStatus> persistentQueue =
ExtendedBlockingQueueFactory.createPersistRecordBased(queueFile, ExtendedBlockingQueueFactory.createPersistRecordBased(queueFile,
INITIAL_RECORD_SIZE); INITIAL_RECORD_SIZE);
queue = persistentQueue; queue = persistentQueue;
queueCloseableOrNull = persistentQueue; queueCloseableOrNull = persistentQueue;
updater = createDataSetStatusUpdater();
thread = new Thread(new Runnable() thread = new Thread(new Runnable()
{ {
public void run() public void run()
...@@ -101,10 +106,33 @@ public class QueueingDataSetStatusUpdaterService ...@@ -101,10 +106,33 @@ public class QueueingDataSetStatusUpdaterService
while (true) while (true)
{ {
final DataSetCodeWithStatus dataSet = queue.peekWait(); final DataSetCodeWithStatus dataSet = queue.peekWait();
updater.updateDataSetStatus(dataSet.getDataSetCode(), dataSet try
.getStatus()); {
// Note: this is the only consumer of this queue. updater.updateDataSetStatus(dataSet.getDataSetCode(), dataSet
queue.take(); .getStatus());
// Note: this is the only consumer of this queue.
queue.take();
// If update succeeded than it is possible that other updates
// that failed before will work too so we can reduce sleep time
// for next failures.
Sleeper.resetSleepTime();
} catch (RemoteAccessException ex)
{
// If connection with openBIS fails it is not possible
// the same problem will occur for other updates in the queue,
// so we just retry after increasing time.
notifyUpdateFailure(dataSet, ex);
Sleeper.sleepAndIncreaseSleepTime();
} catch (UserFailureException ex)
{
// OpenBIS failure occurred - the problem may be connected with
// certain data set so move this item to the end of the queue and
// try to update other data sets before retrying.
notifyUpdateFailure(dataSet, ex);
Sleeper.sleepAndIncreaseSleepTime();
queue.add(dataSet);
queue.remove();
}
} }
} catch (InterruptedException ex) } catch (InterruptedException ex)
{ {
...@@ -114,6 +142,14 @@ public class QueueingDataSetStatusUpdaterService ...@@ -114,6 +142,14 @@ public class QueueingDataSetStatusUpdaterService
// Exit thread. // Exit thread.
} }
} }
private void notifyUpdateFailure(final DataSetCodeWithStatus dataSet, Exception ex)
{
notificationLog.error("Update of data set " + dataSet.getDataSetCode()
+ " status to '" + dataSet.getStatus()
+ "' has failed.\nRetry will occur not sooner than "
+ Sleeper.getCurrentSleepTime() + ".", ex);
}
}, "Updater Queue"); }, "Updater Queue");
thread.setDaemon(true); thread.setDaemon(true);
thread.start(); thread.start();
...@@ -216,6 +252,50 @@ public class QueueingDataSetStatusUpdaterService ...@@ -216,6 +252,50 @@ public class QueueingDataSetStatusUpdaterService
// Cannot be instantiated. // Cannot be instantiated.
} }
/**
* Helper class with a {@link #sleepAndIncreaseSleepTime()} method that invokes
* {@link Thread#sleep(long)} with an increasing amount time.
*/
private static class Sleeper
{
private static final long SECOND_IN_MILIS = 1000;
/** start from 1 minute */
private static final long INITIAL_SLEEP_TIME = 60 * SECOND_IN_MILIS;
/** 1 day */
private static final long MAX_SLEEP_TIME = 86400 * SECOND_IN_MILIS;
/** after each sleep increase the sleep time using this factor */
private static final int FACTOR = 10;
/** current sleep time in miliseconds */
private static long sleepTime = INITIAL_SLEEP_TIME;
public static String getCurrentSleepTime()
{
long seconds = sleepTime / SECOND_IN_MILIS;
return seconds + "s";
}
public static void sleepAndIncreaseSleepTime() throws InterruptedException
{
operationLog.info("Going to sleep for " + getCurrentSleepTime());
Thread.sleep(sleepTime);
sleepTime *= FACTOR;
if (sleepTime > MAX_SLEEP_TIME)
{
sleepTime = MAX_SLEEP_TIME;
}
}
public static void resetSleepTime()
{
sleepTime = INITIAL_SLEEP_TIME;
}
}
/** /**
* A role that can update data set status. * A role that can update data set status.
* *
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment