diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java index 588fad95c704e1b3fa2d443ee858bbb3ee9d1f21..240dff83f481f404aadefbf0c10349c0ec5a039e 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java @@ -19,6 +19,8 @@ package ch.systemsx.cisd.etlserver.plugins; import java.io.File; import java.util.Properties; +import ch.systemsx.cisd.common.logging.ISimpleLogger; + /** * Strategy of moving a data set to another share. Implementations of this interface should @@ -33,5 +35,5 @@ public interface IDataSetMover * its name is the data set code. The destination folder is <code>share</code>. Its name is * the share id. Share id and size will be updated on openBIS. */ - public void moveDataSetToAnotherShare(File dataSetDirInStore, File share); + public void moveDataSetToAnotherShare(File dataSetDirInStore, File share, ISimpleLogger logger); } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java index 3fb76682a2c46b391e59a9986c143f0ce411e452..b37eaa60d5b7883bab390cc3019d6e428db58d88 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java @@ -139,10 +139,11 @@ public class SegmentedStoreShufflingTask implements IMaintenanceTask private IShareIdManager manager = ServiceProvider.getShareIdManager(); - public void moveDataSetToAnotherShare(File dataSetDirInStore, File share) + public void moveDataSetToAnotherShare(File dataSetDirInStore, File share, + ISimpleLogger logger) { SegmentedStoreUtils.moveDataSetToAnotherShare(dataSetDirInStore, share, - service, manager); + service, manager, logger); } }, new Log4jSimpleLogger(operationLog)); } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java index 08d01348be2d5d430f693920308cfc2047d0d0db..97d054c0a4aa5d807f6ff0168fa41318a2b41d0a 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java @@ -179,11 +179,11 @@ public class SimpleShuffling implements ISegmentedStoreShuffling fromShare.getDataSetsOrderedBySize().get(dataSetIndex); File dataSetDirInStore = new File(fromShare.getShare(), dataSet.getDataSetLocation()); String commonMessage = - "Moving data set " + dataSet.getDataSetCode() + " from share " + "Copying data set " + dataSet.getDataSetCode() + " from share " + fromShare.getShareId() + " to share " + toShare.getShareId(); logger.log(INFO, commonMessage + " ..."); long t0 = timeProvider.getTimeInMilliseconds(); - mover.moveDataSetToAnotherShare(dataSetDirInStore, toShare.getShare()); + mover.moveDataSetToAnotherShare(dataSetDirInStore, toShare.getShare(), logger); from.removeDataSet(dataSetIndex); to.addDataSet(dataSet); logger.log(INFO, commonMessage + " took " diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java index a2524dcccb1f007be90cfee8849aefb441079d04..3f3de2f18d5ef359b047c0118eb5b72b739676eb 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java @@ -66,6 +66,83 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.api.internal.authorization.ID */ public class DssServiceRpcAuthorizationAdvisor extends DefaultPointcutAdvisor { + /** + * Proxy of an {@link InputStream} which releases locks when {@link #close()} is invoked. + */ + private static final class InputStreamProxy extends InputStream + { + private final InputStream inputStream; + + private final IShareIdManager manager; + + private InputStreamProxy(InputStream inputStream, IShareIdManager manager) + { + this.inputStream = inputStream; + this.manager = manager; + } + + @Override + public void close() throws IOException + { + try + { + inputStream.close(); + } finally + { + manager.releaseLocks(); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + return inputStream.read(b, off, len); + } + + @Override + public int read() throws IOException + { + return inputStream.read(); + } + + @Override + public int read(byte[] b) throws IOException + { + return inputStream.read(b); + } + + @Override + public long skip(long n) throws IOException + { + return inputStream.skip(n); + } + + @Override + public int available() throws IOException + { + return inputStream.available(); + } + + @Override + public void mark(int readlimit) + { + inputStream.mark(readlimit); + } + + @Override + public void reset() throws IOException + { + inputStream.reset(); + } + + @Override + public boolean markSupported() + { + return inputStream.markSupported(); + } + + } + private static final long serialVersionUID = 1L; private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, @@ -107,6 +184,7 @@ public class DssServiceRpcAuthorizationAdvisor extends DefaultPointcutAdvisor */ public static class DssServiceRpcAuthorizationMethodInterceptor implements MethodInterceptor { + private IShareIdManager shareIdManager; public DssServiceRpcAuthorizationMethodInterceptor(IShareIdManager shareIdManager) @@ -171,23 +249,7 @@ public class DssServiceRpcAuthorizationAdvisor extends DefaultPointcutAdvisor if (result instanceof InputStream) { shouldLocksAutomaticallyBeReleased = false; - final InputStream inputStream = (InputStream) result; - result = new InputStream() - { - @Override - public int read() throws IOException - { - return inputStream.read(); - } - - @Override - public void close() throws IOException - { - super.close(); - manager.releaseLocks(); - } - - }; + result = new InputStreamProxy((InputStream) result, manager); } return result; } finally diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java index cd763fa412760551112d8ae0b7e31091ccaea117..c5b98e79beea8e5815b2f9f61185505c021457c9 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java @@ -67,7 +67,6 @@ public class ShareIdManager implements IShareIdManager void setShareId(String shareId) { - await(); this.shareId = shareId; } @@ -84,7 +83,7 @@ public class ShareIdManager implements IShareIdManager } } - private void await() + void await() { if (countDownLatch != null) { @@ -171,6 +170,16 @@ public class ShareIdManager implements IShareIdManager } } + public void await(String dataSetCode) + { + Map<String, GuardedShareID> map = getDataSetCodeToShareIdMap(); + GuardedShareID guardedShareId = map.get(dataSetCode); + if (guardedShareId != null) + { + guardedShareId.await(); + } + } + public void releaseLock(String dataSetCode) { synchronized (lockedDataSets) diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java index ae55c6b81b4203eeef84f1a71de0c632dcdbbc18..8a3a12356d72a6d6292280bc28fc631419909d98 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java @@ -48,6 +48,11 @@ public interface IShareIdManager */ public void lock(String dataSetCode); + /** + * Awaits until specified data set is no longer locked. + */ + public void await(String dataSetCode); + /** * Unlocks specified data set. Does nothing if lock already released or data set hasn't been * locked. diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java index e75ff8da831493896360a12f7e02372bad2b958b..7c159d991091708618511cc636b8f4728e7ad5c1 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java @@ -216,13 +216,23 @@ public class SegmentedStoreUtils * Moves the specified data set to the specified share. The data set is folder in the store its * name is the data set code. The destination folder is <code>share</code>. Its name is the * share id. + * <p> + * This method works as follows: + * <ol> + * <li>Copying data set to new share. + * <li>Sanity check of successfully copied data set. + * <li>Changing share id in openBIS AS. + * <li>Spawn an asynchronous task which deletes the data set at the old location if all locks on + * the data set have been released. + * </ol> * * @param service to access openBIS AS. */ - public static void moveDataSetToAnotherShare(File dataSetDirInStore, File share, - IEncapsulatedOpenBISService service, IShareIdManager shareIdManager) + public static void moveDataSetToAnotherShare(final File dataSetDirInStore, File share, + IEncapsulatedOpenBISService service, final IShareIdManager shareIdManager, + final ISimpleLogger logger) { - String dataSetCode = dataSetDirInStore.getName(); + final String dataSetCode = dataSetDirInStore.getName(); ExternalData dataSet = service.tryGetDataSet(dataSetCode); if (dataSet == null) { @@ -239,7 +249,18 @@ public class SegmentedStoreUtils String shareId = share.getName(); shareIdManager.setShareId(dataSetCode, shareId); service.updateShareIdAndSize(dataSetCode, shareId, size); - FileUtilities.deleteRecursively(dataSetDirInStore); + new Thread(new Runnable() + { + public void run() + { + logger.log(LogLevel.INFO, "Await for data set " + dataSetCode + + " to be unlocked."); + shareIdManager.await(dataSetCode); + FileUtilities.deleteRecursively(dataSetDirInStore); + logger.log(LogLevel.INFO, "Data set " + dataSetCode + " at " + + dataSetDirInStore + " has been deleted."); + } + }).start(); } private static void copyToShare(File file, File share) diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java index 96a664b49fb9675ec604ec92fab389d5edb92ea8..f57347126bc240b078a1a2a375c32e0e84b67b94 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java @@ -161,13 +161,13 @@ public class SimpleShufflingTest extends AbstractFileSystemTestCase one(logger).log(LogLevel.INFO, "END Computing number of data sets to move for share 1"); one(logger).log(LogLevel.INFO, "\t2 data sets to move, available space : 102500"); - one(logger).log(LogLevel.INFO, "Moving data set ds3 from share 1 to share 3 ..."); - one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds3"), share3.getShare()); - one(logger).log(LogLevel.INFO, "Moving data set ds3 from share 1 to share 3 took 0 seconds."); + one(logger).log(LogLevel.INFO, "Copying data set ds3 from share 1 to share 3 ..."); + one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds3"), share3.getShare(), logger); + one(logger).log(LogLevel.INFO, "Copying data set ds3 from share 1 to share 3 took 0 seconds."); - one(logger).log(LogLevel.INFO, "Moving data set ds2 from share 1 to share 3 ..."); - one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds2"), share3.getShare()); - one(logger).log(LogLevel.INFO, "Moving data set ds2 from share 1 to share 3 took 0 seconds."); + one(logger).log(LogLevel.INFO, "Copying data set ds2 from share 1 to share 3 ..."); + one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds2"), share3.getShare(), logger); + one(logger).log(LogLevel.INFO, "Copying data set ds2 from share 1 to share 3 took 0 seconds."); one(logger).log(LogLevel.INFO, "BEGIN Computing number of data sets to move for share 2"); one(logger).log(LogLevel.INFO, "\tSpace needed to free: 1585152 bytes (1548.00 kB, 1.51 MB)"); diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java index ae77045852896e9f98c279f7973b856284f56f76..f6f83761a79699d1bfff40252d66238fe38514fc 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java @@ -151,7 +151,7 @@ public class ShareIdManagerTest extends AssertJUnit manager.lock(DS1); try { - manager.setShareId(DS1, "1"); + manager.await(DS1); } catch (EnvironmentFailureException ex) { System.out.println(ex); @@ -193,17 +193,15 @@ public class ShareIdManagerTest extends AssertJUnit }, "T1").start(); ch1.assertNextMessage("locked"); // wait until data set is really locked. - manager.setShareId(DS1, "1"); + manager.await(DS1); - assertEquals("1", manager.getShareId(DS1)); ch1.assertNextMessage("unlocked"); // wait until thread is finished assertEquals("INFO OPERATION.ShareIdManager" + " - Share id manager initialized with 2 data sets.\n" + "DEBUG OPERATION.ShareIdManager - Data set ds1 has been locked.\n" + "DEBUG OPERATION.ShareIdManager" + " - Data set ds1 is locked by the following threads: T1\n" - + "DEBUG OPERATION.ShareIdManager - Unlock data set ds1\n" - + "INFO OPERATION.ShareIdManager - New share of data set ds1 is 1", + + "DEBUG OPERATION.ShareIdManager - Unlock data set ds1", logRecorder.getLogContent()); } @@ -240,7 +238,7 @@ public class ShareIdManagerTest extends AssertJUnit try { - manager.setShareId(DS1, "1"); + manager.await(DS1); fail("EnvironmentFailureException expected."); } catch (EnvironmentFailureException ex) { diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java index 9a555bd929d4b3e9d08b501ca66c15097704eafd..31aea1d2f160e48e438912cc219bf5b8e19373bd 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java @@ -29,6 +29,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import ch.systemsx.cisd.base.tests.AbstractFileSystemTestCase; +import ch.systemsx.cisd.common.concurrent.MessageChannel; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.filesystem.HostAwareFile; import ch.systemsx.cisd.common.filesystem.IFreeSpaceProvider; @@ -51,10 +52,22 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase private static final class MockLogger implements ISimpleLogger { private final StringBuilder builder = new StringBuilder(); + private final MessageChannel messageChannel = new MessageChannel(); public void log(LogLevel level, String message) { builder.append(level).append(": ").append(message).append('\n'); + messageChannel.send(message); + } + + public void assertNextLogMessage(String expectedMessage) + { + messageChannel.assertNextMessage(expectedMessage); + } + + public void assertNoMoreLogMessages() + { + messageChannel.assertEmpty(); } @Override @@ -69,7 +82,7 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase private Mockery context; private IEncapsulatedOpenBISService service; private IShareIdManager shareIdManager; - private ISimpleLogger log; + private MockLogger log; private IFreeSpaceProvider freeSpaceProvider; private ITimeProvider timeProvider; @@ -203,17 +216,22 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase one(service).updateShareIdAndSize("ds-1", "2", 11L); one(shareIdManager).setShareId("ds-1", "2"); + one(shareIdManager).await("ds-1"); } }); assertEquals(true, dataSetDirInStore.exists()); assertFileNames(share2uuid01, "22"); - SegmentedStoreUtils.moveDataSetToAnotherShare(dataSetDirInStore, share2, service, shareIdManager); + SegmentedStoreUtils.moveDataSetToAnotherShare(dataSetDirInStore, share2, service, shareIdManager, log); + log.assertNextLogMessage("Await for data set ds-1 to be unlocked."); + log.assertNextLogMessage("Data set ds-1 at " + share1 + + "/uuid/01/02/03/ds-1 has been deleted."); assertEquals(false, dataSetDirInStore.exists()); assertFileNames(share2uuid01, "02", "22"); assertEquals("hello world\n", FileUtilities.loadToString(new File(share2uuid01, "02/03/ds-1/original/hello.txt"))); + log.assertNoMoreLogMessages(); context.assertIsSatisfied(); }