diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java index 59eabbe5b9a6858f591bbb00e8be3f8c01d81dfd..c4760dd926f06cbf3dc11cd0b1b7d6fd30dbe85c 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java @@ -51,7 +51,7 @@ public class ParallelizedExecutor { if (operationLog.isInfoEnabled()) { - operationLog.info(String.format("Found %d files to convert.", + operationLog.info(String.format("Found %d files to process.", itemsToProcessOrNull.size())); } if (itemsToProcessOrNull.isEmpty()) @@ -62,10 +62,12 @@ public class ParallelizedExecutor } @Private - static int getInitialNumberOfWorkers(double machineLoad, int maxThreads) + static int getInitialNumberOfWorkers(double machineLoad, int maxThreads, int numberOfTaskItems) { - return (int) Math.max(1, - Math.min(Math.round(NUMBER_OF_CPU_CORES * machineLoad), maxThreads)); + long threads = Math.round(NUMBER_OF_CPU_CORES * machineLoad); + threads = Math.min(threads, maxThreads); + threads = Math.min(threads, numberOfTaskItems); + return (int) Math.max(1, threads); } private static <T> void startUpWorkerThreads(AtomicInteger workersCounter, @@ -94,9 +96,10 @@ public class ParallelizedExecutor */ public static <T> Collection<FailureRecord<T>> process(List<T> itemsToProcessOrNull, ITaskExecutor<T> taskExecutor, double machineLoad, int maxThreads, - int retriesNumberWhenExecutionFails) throws InterruptedExceptionUnchecked, - EnvironmentFailureException + String processDescription, int retriesNumberWhenExecutionFails) + throws InterruptedExceptionUnchecked, EnvironmentFailureException { + long start = System.currentTimeMillis(); final Queue<T> workerQueue = tryFillWorkerQueue(itemsToProcessOrNull); final Collection<FailureRecord<T>> failed = Collections.synchronizedCollection(new ArrayList<FailureRecord<T>>()); @@ -105,7 +108,8 @@ public class ParallelizedExecutor return failed; } final AtomicInteger workersCounter = - new AtomicInteger(getInitialNumberOfWorkers(machineLoad, maxThreads)); + new AtomicInteger(getInitialNumberOfWorkers(machineLoad, maxThreads, + itemsToProcessOrNull.size())); startUpWorkerThreads(workersCounter, workerQueue, failed, taskExecutor, retriesNumberWhenExecutionFails); synchronized (failed) @@ -121,9 +125,20 @@ public class ParallelizedExecutor } } } + logFinished(failed, processDescription, start); return failed; } + private static <T> void logFinished(Collection<FailureRecord<T>> failureReport, + String processDescription, long startTimeMsec) + { + int errorsNumber = failureReport.size(); + String time = "[" + (System.currentTimeMillis() - startTimeMsec) + " msec.] "; + operationLog.info(processDescription + " finished " + + (errorsNumber == 0 ? "successfully" : "with " + errorsNumber + " errors") + " " + + time + "."); + } + /** * Converts the <var>failureRecord</var> to an error string. If * <code>failureRecords.isEmpty()</code>, then return <code>null</code>. diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java index 5aa07269d2d3920c137a1f7ef012b988c78ef4d4..b345348a347eaac67847a9dc7970b767ab77944b 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java @@ -98,7 +98,7 @@ class ParallelizedWorker<T> implements Runnable final T taskOrNull = workerQueue.poll(); if (taskOrNull == null) { - operationLog.info(EXITING_MSG); + operationLog.debug(EXITING_MSG); return; } if (operationLog.isDebugEnabled()) diff --git a/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java b/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java index eee878e8328cd3bbe071cac00c02a1a0e65d5f71..dc35d93d583f524d0e5fb2b41e000aa51aed8319 100644 --- a/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java +++ b/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java @@ -83,7 +83,7 @@ public class FileConverter ITaskExecutor<File> taskExecutor = new FileConversionTaskExecutor(conversionStrategy); Collection<FailureRecord<File>> failureReport = ParallelizedExecutor.process(itemsToProcess, taskExecutor, machineLoad, maxThreads, - MAX_RETRY_OF_FAILED_COMPRESSIONS); + "File conversion", MAX_RETRY_OF_FAILED_COMPRESSIONS); return ParallelizedExecutor.tryFailuresToString(failureReport); } diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java index beab7998f7311beedc8e57af5485f903b0ccfde7..8ba8574de9f5bf38c10dc30fbbc1111548473baa 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java @@ -100,7 +100,7 @@ public class ParallelizedExecutorTest extends AssertJUnit private Collection<FailureRecord<Integer>> process(List<Integer> items, ITaskExecutor<Integer> taskExecutor) { - return ParallelizedExecutor.process(items, taskExecutor, 10, 10, 1); + return ParallelizedExecutor.process(items, taskExecutor, 10, 10, "test", 1); } private static void assertAllExecuted(final boolean[] executed)