From 89f063ed3f57549ec0c50a7ca0787831c616076a Mon Sep 17 00:00:00 2001
From: tpylak <tpylak>
Date: Mon, 14 Feb 2011 11:21:11 +0000
Subject: [PATCH] port changes from S99.x: thumbnails generation
 improvements/logging, handling more than 6 channels

SVN: 19933
---
 .../concurrent/ParallelizedExecutor.java      | 41 ++++++++++++++++---
 .../common/concurrent/ParallelizedWorker.java | 16 +++++---
 .../concurrent/ParallelizedExecutorTest.java  | 40 ++++++++++++++++++
 .../dss/etl/Hdf5ThumbnailGenerator.java       | 40 ++++++++++++++----
 .../server/images/ImageChannelsUtils.java     |  5 ++-
 5 files changed, 121 insertions(+), 21 deletions(-)

diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java
index c4760dd926f..bfc95f62ced 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutor.java
@@ -30,6 +30,7 @@ import ch.rinn.restrictions.Private;
 import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
 import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
 import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
+import ch.systemsx.cisd.common.exceptions.Status;
 import ch.systemsx.cisd.common.logging.LogCategory;
 import ch.systemsx.cisd.common.logging.LogFactory;
 
@@ -107,9 +108,41 @@ public class ParallelizedExecutor
         {
             return failed;
         }
-        final AtomicInteger workersCounter =
-                new AtomicInteger(getInitialNumberOfWorkers(machineLoad, maxThreads,
-                        itemsToProcessOrNull.size()));
+        int numberOfWorkers =
+                getInitialNumberOfWorkers(machineLoad, maxThreads, itemsToProcessOrNull.size());
+        if (numberOfWorkers == 1)
+        {
+            processInTheSameThread(itemsToProcessOrNull, taskExecutor,
+                    retriesNumberWhenExecutionFails);
+        } else
+        {
+            processinParallel(taskExecutor, retriesNumberWhenExecutionFails, workerQueue, failed,
+                    numberOfWorkers);
+        }
+        logFinished(failed, processDescription, start);
+        return failed;
+    }
+
+    private static <T> void processInTheSameThread(List<T> itemsToProcess,
+            ITaskExecutor<T> taskExecutor, int retriesNumberWhenExecutionFails)
+    {
+        for (T item : itemsToProcess)
+        {
+            int counter = retriesNumberWhenExecutionFails;
+            Status status;
+            do
+            {
+                status = taskExecutor.execute(item);
+                counter--;
+            } while (counter > 0 && status.isError());
+        }
+    }
+
+    private static <T> void processinParallel(ITaskExecutor<T> taskExecutor,
+            int retriesNumberWhenExecutionFails, final Queue<T> workerQueue,
+            final Collection<FailureRecord<T>> failed, int numberOfWorkers)
+    {
+        final AtomicInteger workersCounter = new AtomicInteger(numberOfWorkers);
         startUpWorkerThreads(workersCounter, workerQueue, failed, taskExecutor,
                 retriesNumberWhenExecutionFails);
         synchronized (failed)
@@ -125,8 +158,6 @@ public class ParallelizedExecutor
                 }
             }
         }
-        logFinished(failed, processDescription, start);
-        return failed;
     }
 
     private static <T> void logFinished(Collection<FailureRecord<T>> failureReport,
diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java
index b345348a347..d2ec4a50b3d 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ParallelizedWorker.java
@@ -89,10 +89,7 @@ class ParallelizedWorker<T> implements Runnable
             {
                 if (Thread.interrupted())
                 {
-                    if (operationLog.isInfoEnabled())
-                    {
-                        operationLog.info(INTERRPTED_MSG);
-                    }
+                    operationLog.info(INTERRPTED_MSG);
                     return;
                 }
                 final T taskOrNull = workerQueue.poll();
@@ -109,6 +106,11 @@ class ParallelizedWorker<T> implements Runnable
                 int count = 0;
                 do
                 {
+                    if (Thread.interrupted())
+                    {
+                        operationLog.info(INTERRPTED_MSG);
+                        return;
+                    }
                     try
                     {
                         status = taskExecutor.execute(taskOrNull);
@@ -121,11 +123,15 @@ class ParallelizedWorker<T> implements Runnable
                         status = null;
                         break;
                     }
-                    logErrors(status);
+                    if (operationLog.isDebugEnabled())
+                    {
+                        logErrors(status);
+                    }
                 } while (StatusFlag.RETRIABLE_ERROR.equals(status.getFlag())
                         && ++count < retriesNumberWhenExecutionFails);
                 if (status != null && Status.OK.equals(status) == false)
                 {
+                    logErrors(status);
                     failures.add(new FailureRecord<T>(taskOrNull, status));
                 }
             } while (true);
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java
index 8ba8574de9f..15c75882853 100644
--- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ParallelizedExecutorTest.java
@@ -36,6 +36,7 @@ public class ParallelizedExecutorTest extends AssertJUnit
     public void testAllExecuted()
     {
         int itemsNum = 20;
+        final long mainThreadId = getCurrentThreadId();
         final boolean executed[] = new boolean[itemsNum];
         List<Integer> items = createTaskItems(itemsNum);
         ITaskExecutor<Integer> taskExecutor = new ITaskExecutor<Integer>()
@@ -46,6 +47,10 @@ public class ParallelizedExecutorTest extends AssertJUnit
                     {
                         fail("Invalid attempt to perform job on the same item twice: item " + item);
                     }
+                    if (mainThreadId == getCurrentThreadId())
+                    {
+                        fail("Task is executed in the same thread");
+                    }
                     work(item, 10);
                     executed[item] = true;
                     return Status.OK;
@@ -83,6 +88,40 @@ public class ParallelizedExecutorTest extends AssertJUnit
         }
     }
 
+    @Test
+    public void testExecutedInTheSameThread()
+    {
+        final int numberOfTries = 3;
+        List<Integer> items = createTaskItems(1);
+        final long mainThreadId = getCurrentThreadId();
+        ITaskExecutor<Integer> taskExecutor = new ITaskExecutor<Integer>()
+            {
+                int tryNumber = 1;
+
+                public Status execute(Integer item)
+                {
+                    assertEquals(mainThreadId, getCurrentThreadId());
+                    Status status = (tryNumber == 1) ? Status.createError() : Status.OK;
+                    if (tryNumber > 2)
+                    {
+                        fail("To many retries");
+                    }
+                    tryNumber++;
+                    return status;
+                }
+            };
+        // there is one item to process and maxThreads is 1, so the operation should be performed in
+        // the same thread
+        Collection<FailureRecord<Integer>> errors =
+                ParallelizedExecutor.process(items, taskExecutor, 1, 1, "test", numberOfTries);
+        assertEquals(0, errors.size());
+    }
+
+    private long getCurrentThreadId()
+    {
+        return Thread.currentThread().getId();
+    }
+
     private static void work(Integer item, int timeMsec)
     {
         try
@@ -123,4 +162,5 @@ public class ParallelizedExecutorTest extends AssertJUnit
         }
         return items;
     }
+
 }
diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/Hdf5ThumbnailGenerator.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/Hdf5ThumbnailGenerator.java
index 098222c0cee..287130aabc5 100644
--- a/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/Hdf5ThumbnailGenerator.java
+++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/etl/Hdf5ThumbnailGenerator.java
@@ -41,6 +41,8 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.utils.ImageUtil;
  */
 class Hdf5ThumbnailGenerator implements IHdf5WriterClient
 {
+    private static final int MAX_RETRY_OF_FAILED_GENERATION = 3;
+
     private final List<AcquiredSingleImage> plateImages;
 
     private final File imagesInStoreFolder;
@@ -62,7 +64,13 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
         this.operationLog = operationLog;
     }
 
-    private Status generateThumbnail(IHDF5SimpleWriter writer, AcquiredSingleImage plateImage)
+    /**
+     * @param bufferOutputStream auxiliary stream which can be used as a temporary buffer to save
+     *            the thumbnail. Using it allows not to allocate memory each time when a thumbnail
+     *            is generated.
+     */
+    private Status generateThumbnail(IHDF5SimpleWriter writer, AcquiredSingleImage plateImage,
+            ByteArrayOutputStream bufferOutputStream)
     {
         long start = System.currentTimeMillis();
         RelativeImageReference imageReference = plateImage.getImageReference();
@@ -73,18 +81,16 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
                 ImageUtil.rescale(image, thumbnailsStorageFormat.getMaxWidth(),
                         thumbnailsStorageFormat.getMaxHeight(), false,
                         thumbnailsStorageFormat.isHighQuality());
-        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        String thumbnailPath = replaceExtensionToPng(imagePath);
         try
         {
-            ImageIO.write(thumbnail, "png", output);
-
-            String thumbnailPath = replaceExtensionToPng(imagePath);
+            ImageIO.write(thumbnail, "png", bufferOutputStream);
 
             String path =
                     relativeThumbnailFilePath + ContentRepository.ARCHIVE_DELIMITER + thumbnailPath;
             plateImage.setThumbnailFilePathOrNull(new RelativeImageReference(path, imageReference
                     .tryGetPage(), imageReference.tryGetColorComponent()));
-            byte[] byteArray = output.toByteArray();
+            byte[] byteArray = bufferOutputStream.toByteArray();
             if (operationLog.isDebugEnabled())
             {
                 long now = System.currentTimeMillis();
@@ -99,7 +105,9 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
             }
         } catch (IOException ex)
         {
-            return Status.createError(ex.getMessage());
+            ex.printStackTrace();
+            return Status.createRetriableError(String.format(
+                    "Could not generate a thumbnail '%s': %s", thumbnailPath, ex.getMessage()));
         }
         return Status.OK;
     }
@@ -121,9 +129,23 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
     {
         return new ITaskExecutor<AcquiredSingleImage>()
             {
+                private ThreadLocal<ByteArrayOutputStream> outputStreamBuffers =
+                        new ThreadLocal<ByteArrayOutputStream>()
+                            {
+                                @Override
+                                protected ByteArrayOutputStream initialValue()
+                                {
+                                    return new ByteArrayOutputStream();
+                                }
+                            };
+
                 public Status execute(AcquiredSingleImage plateImage)
                 {
-                    return generateThumbnail(writer, plateImage);
+                    // each thread will get its own buffer to avoid allocating memory for the
+                    // internal array each time
+                    ByteArrayOutputStream outputStreamBuffer = outputStreamBuffers.get();
+                    outputStreamBuffer.reset();
+                    return generateThumbnail(writer, plateImage, outputStreamBuffer);
                 }
             };
     }
@@ -132,6 +154,6 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
     {
         ParallelizedExecutor.process(plateImages, createThumbnailGenerator(writer),
                 thumbnailsStorageFormat.getAllowedMachineLoadDuringGeneration(), 100,
-                "Thumbnails generation", 1);
+                "Thumbnails generation", MAX_RETRY_OF_FAILED_GENERATION);
     }
 }
diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/generic/server/images/ImageChannelsUtils.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/generic/server/images/ImageChannelsUtils.java
index 8c2f35c3390..e45a433b7d7 100644
--- a/screening/source/java/ch/systemsx/cisd/openbis/dss/generic/server/images/ImageChannelsUtils.java
+++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/generic/server/images/ImageChannelsUtils.java
@@ -448,7 +448,8 @@ public class ImageChannelsUtils
             start = operationLog.isDebugEnabled() ? System.currentTimeMillis() : 0;
             image =
                     ImageUtil.rescale(image, size.getWidth(), size.getHeight(),
-                            requestedSize.enlargeIfNecessary(), requestedSize.isHighQualityRescalingRequired());
+                            requestedSize.enlargeIfNecessary(),
+                            requestedSize.isHighQualityRescalingRequired());
             if (operationLog.isDebugEnabled())
             {
                 operationLog.debug("Create thumbnail: " + (System.currentTimeMillis() - start));
@@ -741,7 +742,7 @@ public class ImageChannelsUtils
             case 1:
             case 2:
                 return new int[]
-                    { 2 - channelIndex };
+                    { 2 - (channelIndex % 6) };
             case 3:
                 return new int[]
                     { 0, 1 };
-- 
GitLab