Skip to content
Snippets Groups Projects
Commit 696b221f authored by felmer's avatar felmer
Browse files

SSDM-2069: MonitoredIoStreamCopier: removing unnecessary synchronized blocks....

SSDM-2069: MonitoredIoStreamCopier: removing unnecessary synchronized blocks. Using the copier for multi data set archiving.

SVN: 34331
parent 216bab5f
No related branches found
No related tags found
No related merge requests found
......@@ -314,21 +314,29 @@ public class Tar implements Closeable
public static void main(String[] args) throws IOException
{
if (args.length != 2)
if (args.length < 2)
{
System.err.println("Tar <tarfile> <directory>");
System.err.println("Tar [-p] <tarfile> <directory>");
System.exit(1);
}
final File tarFile = new File(args[0]);
final File directory = new File(args[1]);
int tarFileIndex = 0;
int dirIndex = 1;
boolean parallel = args[0].equals("-p");
if (parallel)
{
tarFileIndex++;
dirIndex++;
}
final File tarFile = new File(args[tarFileIndex]);
final File directory = new File(args[dirIndex]);
Tar tar = null;
try
{
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB);
Long maxQueueSize = parallel ? 5 * FileUtils.ONE_MB : null;
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, maxQueueSize);
copier.setLogger(new ConsoleLogger());
tar = new Tar(tarFile, copier);
tar.add(directory, directory.getPath().length());
tar.close();
} finally
{
if (tar != null)
......
......@@ -167,26 +167,23 @@ public class MonitoredIOStreamCopier
@Override
public void run()
{
synchronized (queue)
while (true)
{
while (true)
try
{
try
WritingItem writingItem = queue.take();
if (writingItem.data == null)
{
WritingItem writingItem = queue.take();
if (writingItem.data == null)
{
break;
}
writingItem.write();
} catch (InterruptedException ex)
{
// silently ignored
} catch (Throwable ex)
{
writingException = ex;
break;
}
writingItem.write();
} catch (InterruptedException ex)
{
// silently ignored
} catch (Throwable ex)
{
writingException = ex;
break;
}
}
}
......@@ -201,19 +198,16 @@ public class MonitoredIOStreamCopier
return;
}
addToQueue(new WritingItem(null, null, 0));
synchronized (queue)
try
{
try
{
writingThread.join();
} catch (InterruptedException ex)
{
// silently ignored
}
if (writingException != null)
{
throw writingException;
}
writingThread.join();
} catch (InterruptedException ex)
{
// silently ignored
}
if (writingException != null)
{
throw writingException;
}
}
......
......@@ -43,12 +43,12 @@ public class TarDataSetPackager extends AbstractDataSetPackager
private final Tar tar;
public TarDataSetPackager(File tarFile, IHierarchicalContentProvider contentProvider,
DataSetExistenceChecker dataSetExistenceChecker, int bufferSize)
DataSetExistenceChecker dataSetExistenceChecker, int bufferSize, Long maxQueueSizeInBytesOrNull)
{
super(contentProvider, dataSetExistenceChecker);
try
{
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier(bufferSize);
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier(bufferSize, maxQueueSizeInBytesOrNull);
copier.setLogger(new Log4jSimpleLogger(operationLog));
tar = new Tar(tarFile, copier);
} catch (FileNotFoundException e)
......
......@@ -45,6 +45,8 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.utils.DataSetExistenceChecker
*/
public class TarPackageManager extends AbstractPackageManager
{
private static final String MAXIMUM_QUEUE_SIZE_IN_BYTES_KEY = "maximum-queue-size-in-bytes";
private static final String BUFFER_SIZE_KEY = "buffer-size";
private static final int DEFAULT_BUFFER_SIZE = (int) (10 * FileUtils.ONE_MB);
......@@ -55,10 +57,14 @@ public class TarPackageManager extends AbstractPackageManager
private final ISimpleLogger ioSpeedLogger;
private Long maxQueueSize;
public TarPackageManager(Properties properties, ISimpleLogger ioSpeedLogger)
{
this.tempFolder = PropertyUtils.getDirectory(properties, RsyncArchiver.TEMP_FOLDER, null);
bufferSize = PropertyUtils.getInt(properties, BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
long maxSize = PropertyUtils.getLong(properties, MAXIMUM_QUEUE_SIZE_IN_BYTES_KEY, 5 * bufferSize);
maxQueueSize = maxSize == 0 ? null : maxSize;
this.ioSpeedLogger = ioSpeedLogger;
}
......@@ -71,7 +77,7 @@ public class TarPackageManager extends AbstractPackageManager
@Override
protected AbstractDataSetPackager createPackager(File packageFile, DataSetExistenceChecker existenceChecker)
{
return new TarDataSetPackager(packageFile, getContentProvider(), existenceChecker, bufferSize);
return new TarDataSetPackager(packageFile, getContentProvider(), existenceChecker, bufferSize, maxQueueSize);
}
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment