diff --git a/openbis-common/source/java/ch/systemsx/cisd/common/conversation/RateLimitedProgressListener.java b/openbis-common/source/java/ch/systemsx/cisd/common/conversation/RateLimitedProgressListener.java index f8f8abb41ebcac682ca35b33c35ddaf42abba7fd..419b5acb53458c2c243bd22e3efd984bb53826b6 100644 --- a/openbis-common/source/java/ch/systemsx/cisd/common/conversation/RateLimitedProgressListener.java +++ b/openbis-common/source/java/ch/systemsx/cisd/common/conversation/RateLimitedProgressListener.java @@ -16,6 +16,7 @@ package ch.systemsx.cisd.common.conversation; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -23,64 +24,93 @@ import ch.systemsx.cisd.common.serviceconversation.server.ProgressInfo; import ch.systemsx.cisd.common.serviceconversation.server.ServiceConversationServer; /** - * Sends incoming progress updates to the remote client through the ServiceConversationServer. - * The maximum reporting rate is limited by the value given as an argument. + * Sends incoming progress updates to the remote client through the ServiceConversationServer. The + * maximum reporting rate is limited by the value given as an argument. If a progress update is + * received while the previous update is still waiting to be sent (because of the maximum reporting + * rate), the previous update will be discarded. * - * If a progress update is received while the previous update is still waiting to be sent - * (because of the maximum reporting rate), the previous update will be discarded. - * * @author anttil */ public class RateLimitedProgressListener implements IProgressListener { - + private ServiceConversationServer server; + private String conversationId; + private ScheduledThreadPoolExecutor executor; + private Update lastUpdate; + private int interval; - - public RateLimitedProgressListener(ServiceConversationServer server, String conversationId, int reportingInterval) { + + private ScheduledFuture<?> future; + + public RateLimitedProgressListener(ServiceConversationServer server, String conversationId, + int reportingInterval) + { this.server = server; this.conversationId = conversationId; this.interval = reportingInterval; this.executor = new ScheduledThreadPoolExecutor(1); } - + @Override - public synchronized void update(final String label, final int totalItemsToProcess, final int numItemsProcessed) + public synchronized void update(final String label, final int totalItemsToProcess, + final int numItemsProcessed) { - this.executor.remove(this.lastUpdate); + if (future != null) + { + future.cancel(false); + } long lastExecution = 0; - if (this.lastUpdate != null) { + if (this.lastUpdate != null) + { lastExecution = this.lastUpdate.getLastExecution(); } - Update update = new Update(this.server, this.conversationId, new ProgressInfo(label, totalItemsToProcess, numItemsProcessed), lastExecution); + Update update = + new Update(this.server, this.conversationId, new ProgressInfo(label, + totalItemsToProcess, numItemsProcessed), lastExecution); - if (System.currentTimeMillis() - lastExecution > this.interval) { + if (System.currentTimeMillis() - lastExecution > this.interval) + { this.executor.execute(update); - } else { - this.executor.schedule(update, lastExecution + this.interval, TimeUnit.MILLISECONDS); + this.future = null; + } else + { + future = + this.executor.schedule(update, lastExecution + this.interval, + TimeUnit.MILLISECONDS); } - + this.lastUpdate = update; } - - public synchronized void close() { - this.executor.remove(this.lastUpdate); + + public synchronized void close() + { + if (future != null) + { + future.cancel(false); + } this.executor.shutdown(); } - - private static class Update implements Runnable { + + private static class Update implements Runnable + { private ServiceConversationServer server; + private String conversationId; + private ProgressInfo progress; + private long lastExecution; - public Update(ServiceConversationServer server, String conversationId, ProgressInfo progress, long lastExecution) { + public Update(ServiceConversationServer server, String conversationId, + ProgressInfo progress, long lastExecution) + { this.server = server; this.conversationId = conversationId; this.progress = progress; @@ -91,10 +121,11 @@ public class RateLimitedProgressListener implements IProgressListener public void run() { this.lastExecution = System.currentTimeMillis(); - server.reportProgress(conversationId, progress); + server.reportProgress(conversationId, progress); } - - public long getLastExecution() { + + public long getLastExecution() + { return this.lastExecution; } }