From 2073b0efa2a61496ea929343cb0a092cc11c8a73 Mon Sep 17 00:00:00 2001
From: tpylak <tpylak>
Date: Thu, 10 Feb 2011 14:08:04 +0000
Subject: [PATCH] LMS-2027 minor parallelized executor: logging, comments

SVN: 19880
---
 .../concurrent/ParallelizedExecutor.java      | 29 ++++++++++++++-----
 .../common/concurrent/ParallelizedWorker.java |  2 +-
 .../common/fileconverter/FileConverter.java   |  2 +-
 .../concurrent/ParallelizedExecutorTest.java  |  2 +-
 4 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java
index 59eabbe5b9a..c4760dd926f 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java
@@ -51,7 +51,7 @@ public class ParallelizedExecutor
     {
         if (operationLog.isInfoEnabled())
         {
-            operationLog.info(String.format("Found %d files to convert.",
+            operationLog.info(String.format("Found %d files to process.",
                     itemsToProcessOrNull.size()));
         }
         if (itemsToProcessOrNull.isEmpty())
@@ -62,10 +62,12 @@ public class ParallelizedExecutor
     }
 
     @Private
-    static int getInitialNumberOfWorkers(double machineLoad, int maxThreads)
+    static int getInitialNumberOfWorkers(double machineLoad, int maxThreads, int numberOfTaskItems)
     {
-        return (int) Math.max(1,
-                Math.min(Math.round(NUMBER_OF_CPU_CORES * machineLoad), maxThreads));
+        long threads = Math.round(NUMBER_OF_CPU_CORES * machineLoad);
+        threads = Math.min(threads, maxThreads);
+        threads = Math.min(threads, numberOfTaskItems);
+        return (int) Math.max(1, threads);
     }
 
     private static <T> void startUpWorkerThreads(AtomicInteger workersCounter,
@@ -94,9 +96,10 @@ public class ParallelizedExecutor
      */
     public static <T> Collection<FailureRecord<T>> process(List<T> itemsToProcessOrNull,
             ITaskExecutor<T> taskExecutor, double machineLoad, int maxThreads,
-            int retriesNumberWhenExecutionFails) throws InterruptedExceptionUnchecked,
-            EnvironmentFailureException
+            String processDescription, int retriesNumberWhenExecutionFails)
+            throws InterruptedExceptionUnchecked, EnvironmentFailureException
     {
+        long start = System.currentTimeMillis();
         final Queue<T> workerQueue = tryFillWorkerQueue(itemsToProcessOrNull);
         final Collection<FailureRecord<T>> failed =
                 Collections.synchronizedCollection(new ArrayList<FailureRecord<T>>());
@@ -105,7 +108,8 @@ public class ParallelizedExecutor
             return failed;
         }
         final AtomicInteger workersCounter =
-                new AtomicInteger(getInitialNumberOfWorkers(machineLoad, maxThreads));
+                new AtomicInteger(getInitialNumberOfWorkers(machineLoad, maxThreads,
+                        itemsToProcessOrNull.size()));
         startUpWorkerThreads(workersCounter, workerQueue, failed, taskExecutor,
                 retriesNumberWhenExecutionFails);
         synchronized (failed)
@@ -121,9 +125,20 @@ public class ParallelizedExecutor
                 }
             }
         }
+        logFinished(failed, processDescription, start);
         return failed;
     }
 
+    private static <T> void logFinished(Collection<FailureRecord<T>> failureReport,
+            String processDescription, long startTimeMsec)
+    {
+        int errorsNumber = failureReport.size();
+        String time = "[" + (System.currentTimeMillis() - startTimeMsec) + " msec.] ";
+        operationLog.info(processDescription + " finished "
+                + (errorsNumber == 0 ? "successfully" : "with " + errorsNumber + " errors") + " "
+                + time + ".");
+    }
+
     /**
      * Converts the <var>failureRecord</var> to an error string. If
      * <code>failureRecords.isEmpty()</code>, then return <code>null</code>.
diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java
index 5aa07269d2d..b345348a347 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java
@@ -98,7 +98,7 @@ class ParallelizedWorker<T> implements Runnable
                 final T taskOrNull = workerQueue.poll();
                 if (taskOrNull == null)
                 {
-                    operationLog.info(EXITING_MSG);
+                    operationLog.debug(EXITING_MSG);
                     return;
                 }
                 if (operationLog.isDebugEnabled())
diff --git a/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java b/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java
index eee878e8328..dc35d93d583 100644
--- a/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java
+++ b/common/source/java/ch/systemsx/cisd/common/fileconverter/FileConverter.java
@@ -83,7 +83,7 @@ public class FileConverter
         ITaskExecutor<File> taskExecutor = new FileConversionTaskExecutor(conversionStrategy);
         Collection<FailureRecord<File>> failureReport =
                 ParallelizedExecutor.process(itemsToProcess, taskExecutor, machineLoad, maxThreads,
-                        MAX_RETRY_OF_FAILED_COMPRESSIONS);
+                        "File conversion", MAX_RETRY_OF_FAILED_COMPRESSIONS);
         return ParallelizedExecutor.tryFailuresToString(failureReport);
     }
 
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java
index beab7998f73..8ba8574de9f 100644
--- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java
@@ -100,7 +100,7 @@ public class ParallelizedExecutorTest extends AssertJUnit
     private Collection<FailureRecord<Integer>> process(List<Integer> items,
             ITaskExecutor<Integer> taskExecutor)
     {
-        return ParallelizedExecutor.process(items, taskExecutor, 10, 10, 1);
+        return ParallelizedExecutor.process(items, taskExecutor, 10, 10, "test", 1);
     }
 
     private static void assertAllExecuted(final boolean[] executed)
-- 
GitLab