Skip to content
Snippets Groups Projects
Commit 89f063ed authored by tpylak's avatar tpylak
Browse files

port changes from S99.x: thumbnails generation improvements/logging, handling more than 6 channels

SVN: 19933
parent 123137dc
No related branches found
No related tags found
No related merge requests found
......@@ -30,6 +30,7 @@ import ch.rinn.restrictions.Private;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
......@@ -107,9 +108,41 @@ public class ParallelizedExecutor
{
return failed;
}
final AtomicInteger workersCounter =
new AtomicInteger(getInitialNumberOfWorkers(machineLoad, maxThreads,
itemsToProcessOrNull.size()));
int numberOfWorkers =
getInitialNumberOfWorkers(machineLoad, maxThreads, itemsToProcessOrNull.size());
if (numberOfWorkers == 1)
{
processInTheSameThread(itemsToProcessOrNull, taskExecutor,
retriesNumberWhenExecutionFails);
} else
{
processinParallel(taskExecutor, retriesNumberWhenExecutionFails, workerQueue, failed,
numberOfWorkers);
}
logFinished(failed, processDescription, start);
return failed;
}
private static <T> void processInTheSameThread(List<T> itemsToProcess,
ITaskExecutor<T> taskExecutor, int retriesNumberWhenExecutionFails)
{
for (T item : itemsToProcess)
{
int counter = retriesNumberWhenExecutionFails;
Status status;
do
{
status = taskExecutor.execute(item);
counter--;
} while (counter > 0 && status.isError());
}
}
private static <T> void processinParallel(ITaskExecutor<T> taskExecutor,
int retriesNumberWhenExecutionFails, final Queue<T> workerQueue,
final Collection<FailureRecord<T>> failed, int numberOfWorkers)
{
final AtomicInteger workersCounter = new AtomicInteger(numberOfWorkers);
startUpWorkerThreads(workersCounter, workerQueue, failed, taskExecutor,
retriesNumberWhenExecutionFails);
synchronized (failed)
......@@ -125,8 +158,6 @@ public class ParallelizedExecutor
}
}
}
logFinished(failed, processDescription, start);
return failed;
}
private static <T> void logFinished(Collection<FailureRecord<T>> failureReport,
......
......@@ -89,10 +89,7 @@ class ParallelizedWorker<T> implements Runnable
{
if (Thread.interrupted())
{
if (operationLog.isInfoEnabled())
{
operationLog.info(INTERRPTED_MSG);
}
operationLog.info(INTERRPTED_MSG);
return;
}
final T taskOrNull = workerQueue.poll();
......@@ -109,6 +106,11 @@ class ParallelizedWorker<T> implements Runnable
int count = 0;
do
{
if (Thread.interrupted())
{
operationLog.info(INTERRPTED_MSG);
return;
}
try
{
status = taskExecutor.execute(taskOrNull);
......@@ -121,11 +123,15 @@ class ParallelizedWorker<T> implements Runnable
status = null;
break;
}
logErrors(status);
if (operationLog.isDebugEnabled())
{
logErrors(status);
}
} while (StatusFlag.RETRIABLE_ERROR.equals(status.getFlag())
&& ++count < retriesNumberWhenExecutionFails);
if (status != null && Status.OK.equals(status) == false)
{
logErrors(status);
failures.add(new FailureRecord<T>(taskOrNull, status));
}
} while (true);
......
......@@ -36,6 +36,7 @@ public class ParallelizedExecutorTest extends AssertJUnit
public void testAllExecuted()
{
int itemsNum = 20;
final long mainThreadId = getCurrentThreadId();
final boolean executed[] = new boolean[itemsNum];
List<Integer> items = createTaskItems(itemsNum);
ITaskExecutor<Integer> taskExecutor = new ITaskExecutor<Integer>()
......@@ -46,6 +47,10 @@ public class ParallelizedExecutorTest extends AssertJUnit
{
fail("Invalid attempt to perform job on the same item twice: item " + item);
}
if (mainThreadId == getCurrentThreadId())
{
fail("Task is executed in the same thread");
}
work(item, 10);
executed[item] = true;
return Status.OK;
......@@ -83,6 +88,40 @@ public class ParallelizedExecutorTest extends AssertJUnit
}
}
@Test
public void testExecutedInTheSameThread()
{
final int numberOfTries = 3;
List<Integer> items = createTaskItems(1);
final long mainThreadId = getCurrentThreadId();
ITaskExecutor<Integer> taskExecutor = new ITaskExecutor<Integer>()
{
int tryNumber = 1;
public Status execute(Integer item)
{
assertEquals(mainThreadId, getCurrentThreadId());
Status status = (tryNumber == 1) ? Status.createError() : Status.OK;
if (tryNumber > 2)
{
fail("To many retries");
}
tryNumber++;
return status;
}
};
// there is one item to process and maxThreads is 1, so the operation should be performed in
// the same thread
Collection<FailureRecord<Integer>> errors =
ParallelizedExecutor.process(items, taskExecutor, 1, 1, "test", numberOfTries);
assertEquals(0, errors.size());
}
private long getCurrentThreadId()
{
return Thread.currentThread().getId();
}
private static void work(Integer item, int timeMsec)
{
try
......@@ -123,4 +162,5 @@ public class ParallelizedExecutorTest extends AssertJUnit
}
return items;
}
}
......@@ -41,6 +41,8 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.utils.ImageUtil;
*/
class Hdf5ThumbnailGenerator implements IHdf5WriterClient
{
private static final int MAX_RETRY_OF_FAILED_GENERATION = 3;
private final List<AcquiredSingleImage> plateImages;
private final File imagesInStoreFolder;
......@@ -62,7 +64,13 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
this.operationLog = operationLog;
}
private Status generateThumbnail(IHDF5SimpleWriter writer, AcquiredSingleImage plateImage)
/**
* @param bufferOutputStream auxiliary stream which can be used as a temporary buffer to save
* the thumbnail. Using it allows not to allocate memory each time when a thumbnail
* is generated.
*/
private Status generateThumbnail(IHDF5SimpleWriter writer, AcquiredSingleImage plateImage,
ByteArrayOutputStream bufferOutputStream)
{
long start = System.currentTimeMillis();
RelativeImageReference imageReference = plateImage.getImageReference();
......@@ -73,18 +81,16 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
ImageUtil.rescale(image, thumbnailsStorageFormat.getMaxWidth(),
thumbnailsStorageFormat.getMaxHeight(), false,
thumbnailsStorageFormat.isHighQuality());
ByteArrayOutputStream output = new ByteArrayOutputStream();
String thumbnailPath = replaceExtensionToPng(imagePath);
try
{
ImageIO.write(thumbnail, "png", output);
String thumbnailPath = replaceExtensionToPng(imagePath);
ImageIO.write(thumbnail, "png", bufferOutputStream);
String path =
relativeThumbnailFilePath + ContentRepository.ARCHIVE_DELIMITER + thumbnailPath;
plateImage.setThumbnailFilePathOrNull(new RelativeImageReference(path, imageReference
.tryGetPage(), imageReference.tryGetColorComponent()));
byte[] byteArray = output.toByteArray();
byte[] byteArray = bufferOutputStream.toByteArray();
if (operationLog.isDebugEnabled())
{
long now = System.currentTimeMillis();
......@@ -99,7 +105,9 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
}
} catch (IOException ex)
{
return Status.createError(ex.getMessage());
ex.printStackTrace();
return Status.createRetriableError(String.format(
"Could not generate a thumbnail '%s': %s", thumbnailPath, ex.getMessage()));
}
return Status.OK;
}
......@@ -121,9 +129,23 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
{
return new ITaskExecutor<AcquiredSingleImage>()
{
private ThreadLocal<ByteArrayOutputStream> outputStreamBuffers =
new ThreadLocal<ByteArrayOutputStream>()
{
@Override
protected ByteArrayOutputStream initialValue()
{
return new ByteArrayOutputStream();
}
};
public Status execute(AcquiredSingleImage plateImage)
{
return generateThumbnail(writer, plateImage);
// each thread will get its own buffer to avoid allocating memory for the
// internal array each time
ByteArrayOutputStream outputStreamBuffer = outputStreamBuffers.get();
outputStreamBuffer.reset();
return generateThumbnail(writer, plateImage, outputStreamBuffer);
}
};
}
......@@ -132,6 +154,6 @@ class Hdf5ThumbnailGenerator implements IHdf5WriterClient
{
ParallelizedExecutor.process(plateImages, createThumbnailGenerator(writer),
thumbnailsStorageFormat.getAllowedMachineLoadDuringGeneration(), 100,
"Thumbnails generation", 1);
"Thumbnails generation", MAX_RETRY_OF_FAILED_GENERATION);
}
}
......@@ -448,7 +448,8 @@ public class ImageChannelsUtils
start = operationLog.isDebugEnabled() ? System.currentTimeMillis() : 0;
image =
ImageUtil.rescale(image, size.getWidth(), size.getHeight(),
requestedSize.enlargeIfNecessary(), requestedSize.isHighQualityRescalingRequired());
requestedSize.enlargeIfNecessary(),
requestedSize.isHighQualityRescalingRequired());
if (operationLog.isDebugEnabled())
{
operationLog.debug("Create thumbnail: " + (System.currentTimeMillis() - start));
......@@ -741,7 +742,7 @@ public class ImageChannelsUtils
case 1:
case 2:
return new int[]
{ 2 - channelIndex };
{ 2 - (channelIndex % 6) };
case 3:
return new int[]
{ 0, 1 };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment