From 8378c63b1c3995e2e1cca6add1eda8172b43975c Mon Sep 17 00:00:00 2001
From: felmer <felmer>
Date: Tue, 1 Mar 2011 08:01:28 +0000
Subject: [PATCH] LMS-2077 separate setting share ID from data set locking.
 Locking is only needed for delete data sets at old share (see
 SegmentedStoreUtils.moveDataSetToAnotherShare).

SVN: 20156
---
 .../cisd/etlserver/plugins/IDataSetMover.java |  4 +-
 .../plugins/SegmentedStoreShufflingTask.java  |  5 +-
 .../etlserver/plugins/SimpleShuffling.java    |  4 +-
 .../DssServiceRpcAuthorizationAdvisor.java    | 96 +++++++++++++++----
 .../dss/generic/server/ShareIdManager.java    | 13 ++-
 .../dss/generic/shared/IShareIdManager.java   |  5 +
 .../shared/utils/SegmentedStoreUtils.java     | 29 +++++-
 .../plugins/SimpleShufflingTest.java          | 12 +--
 .../generic/server/ShareIdManagerTest.java    | 10 +-
 .../shared/utils/SegmentedStoreUtilsTest.java | 22 ++++-
 10 files changed, 158 insertions(+), 42 deletions(-)

diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java
index 588fad95c70..240dff83f48 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/IDataSetMover.java
@@ -19,6 +19,8 @@ package ch.systemsx.cisd.etlserver.plugins;
 import java.io.File;
 import java.util.Properties;
 
+import ch.systemsx.cisd.common.logging.ISimpleLogger;
+
 
 /**
  * Strategy of moving a data set to another share. Implementations of this interface should
@@ -33,5 +35,5 @@ public interface IDataSetMover
      * its name is the data set code. The destination folder is <code>share</code>. Its name is
      * the share id. Share id and size will be updated on openBIS.
      */
-    public void moveDataSetToAnotherShare(File dataSetDirInStore, File share);
+    public void moveDataSetToAnotherShare(File dataSetDirInStore, File share, ISimpleLogger logger);
 }
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java
index 3fb76682a2c..b37eaa60d5b 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SegmentedStoreShufflingTask.java
@@ -139,10 +139,11 @@ public class SegmentedStoreShufflingTask implements IMaintenanceTask
 
                         private IShareIdManager manager = ServiceProvider.getShareIdManager();
 
-                        public void moveDataSetToAnotherShare(File dataSetDirInStore, File share)
+                        public void moveDataSetToAnotherShare(File dataSetDirInStore, File share,
+                                ISimpleLogger logger)
                         {
                             SegmentedStoreUtils.moveDataSetToAnotherShare(dataSetDirInStore, share,
-                                    service, manager);
+                                    service, manager, logger);
                         }
                     }, new Log4jSimpleLogger(operationLog));
     }
diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java
index 08d01348be2..97d054c0a4a 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/plugins/SimpleShuffling.java
@@ -179,11 +179,11 @@ public class SimpleShuffling implements ISegmentedStoreShuffling
                 fromShare.getDataSetsOrderedBySize().get(dataSetIndex);
         File dataSetDirInStore = new File(fromShare.getShare(), dataSet.getDataSetLocation());
         String commonMessage =
-                "Moving data set " + dataSet.getDataSetCode() + " from share "
+                "Copying data set " + dataSet.getDataSetCode() + " from share "
                         + fromShare.getShareId() + " to share " + toShare.getShareId();
         logger.log(INFO, commonMessage + " ...");
         long t0 = timeProvider.getTimeInMilliseconds();
-        mover.moveDataSetToAnotherShare(dataSetDirInStore, toShare.getShare());
+        mover.moveDataSetToAnotherShare(dataSetDirInStore, toShare.getShare(), logger);
         from.removeDataSet(dataSetIndex);
         to.addDataSet(dataSet);
         logger.log(INFO, commonMessage + " took "
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java
index a2524dcccb1..3f3de2f18d5 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/DssServiceRpcAuthorizationAdvisor.java
@@ -66,6 +66,83 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.api.internal.authorization.ID
  */
 public class DssServiceRpcAuthorizationAdvisor extends DefaultPointcutAdvisor
 {
+    /**
+     * Proxy of an {@link InputStream} which releases locks when {@link #close()} is invoked.
+     */
+    private static final class InputStreamProxy extends InputStream
+    {
+        private final InputStream inputStream;
+        
+        private final IShareIdManager manager;
+        
+        private InputStreamProxy(InputStream inputStream, IShareIdManager manager)
+        {
+            this.inputStream = inputStream;
+            this.manager = manager;
+        }
+        
+        @Override
+        public void close() throws IOException
+        {
+            try
+            {
+                inputStream.close();
+            } finally
+            {
+                manager.releaseLocks();
+            }
+        }
+        
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException
+        {
+            return inputStream.read(b, off, len);
+        }
+
+        @Override
+        public int read() throws IOException
+        {
+            return inputStream.read();
+        }
+        
+        @Override
+        public int read(byte[] b) throws IOException
+        {
+            return inputStream.read(b);
+        }
+
+        @Override
+        public long skip(long n) throws IOException
+        {
+            return inputStream.skip(n);
+        }
+
+        @Override
+        public int available() throws IOException
+        {
+            return inputStream.available();
+        }
+
+        @Override
+        public void mark(int readlimit)
+        {
+            inputStream.mark(readlimit);
+        }
+
+        @Override
+        public void reset() throws IOException
+        {
+            inputStream.reset();
+        }
+
+        @Override
+        public boolean markSupported()
+        {
+            return inputStream.markSupported();
+        }
+
+    }
+    
     private static final long serialVersionUID = 1L;
 
     private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION,
@@ -107,6 +184,7 @@ public class DssServiceRpcAuthorizationAdvisor extends DefaultPointcutAdvisor
      */
     public static class DssServiceRpcAuthorizationMethodInterceptor implements MethodInterceptor
     {
+
         private IShareIdManager shareIdManager;
         
         public DssServiceRpcAuthorizationMethodInterceptor(IShareIdManager shareIdManager)
@@ -171,23 +249,7 @@ public class DssServiceRpcAuthorizationAdvisor extends DefaultPointcutAdvisor
                 if (result instanceof InputStream)
                 {
                     shouldLocksAutomaticallyBeReleased = false;
-                    final InputStream inputStream = (InputStream) result;
-                    result = new InputStream()
-                        {
-                            @Override
-                            public int read() throws IOException
-                            {
-                                return inputStream.read();
-                            }
-
-                            @Override
-                            public void close() throws IOException
-                            {
-                                super.close();
-                                manager.releaseLocks();
-                            }
-                            
-                        };
+                    result = new InputStreamProxy((InputStream) result, manager);
                 }
                 return result;
             } finally
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java
index cd763fa4127..c5b98e79bee 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManager.java
@@ -67,7 +67,6 @@ public class ShareIdManager implements IShareIdManager
         
         void setShareId(String shareId)
         {
-            await();
             this.shareId = shareId;
         }
         
@@ -84,7 +83,7 @@ public class ShareIdManager implements IShareIdManager
             }
         }
         
-        private void await()
+        void await()
         {
             if (countDownLatch != null)
             {
@@ -171,6 +170,16 @@ public class ShareIdManager implements IShareIdManager
         }
     }
 
+    public void await(String dataSetCode)
+    {
+        Map<String, GuardedShareID> map = getDataSetCodeToShareIdMap();
+        GuardedShareID guardedShareId = map.get(dataSetCode);
+        if (guardedShareId != null)
+        {
+            guardedShareId.await();
+        }
+    }
+
     public void releaseLock(String dataSetCode)
     {
         synchronized (lockedDataSets)
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java
index ae55c6b81b4..8a3a12356d7 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IShareIdManager.java
@@ -48,6 +48,11 @@ public interface IShareIdManager
      */
     public void lock(String dataSetCode);
     
+    /**
+     * Awaits until specified data set is no longer locked.
+     */
+    public void await(String dataSetCode);
+    
     /**
      * Unlocks specified data set. Does nothing if lock already released or data set hasn't been
      * locked.
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java
index e75ff8da831..7c159d99109 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java
@@ -216,13 +216,23 @@ public class SegmentedStoreUtils
      * Moves the specified data set to the specified share. The data set is folder in the store its
      * name is the data set code. The destination folder is <code>share</code>. Its name is the
      * share id.
+     * <p>
+     * This method works as follows:
+     * <ol>
+     * <li>Copying data set to new share.
+     * <li>Sanity check of successfully copied data set.
+     * <li>Changing share id in openBIS AS.
+     * <li>Spawn an asynchronous task which deletes the data set at the old location if all locks on
+     * the data set have been released.
+     * </ol>
      * 
      * @param service to access openBIS AS.
      */
-    public static void moveDataSetToAnotherShare(File dataSetDirInStore, File share,
-            IEncapsulatedOpenBISService service, IShareIdManager shareIdManager)
+    public static void moveDataSetToAnotherShare(final File dataSetDirInStore, File share,
+            IEncapsulatedOpenBISService service, final IShareIdManager shareIdManager,
+            final ISimpleLogger logger)
     {
-        String dataSetCode = dataSetDirInStore.getName();
+        final String dataSetCode = dataSetDirInStore.getName();
         ExternalData dataSet = service.tryGetDataSet(dataSetCode);
         if (dataSet == null)
         {
@@ -239,7 +249,18 @@ public class SegmentedStoreUtils
         String shareId = share.getName();
         shareIdManager.setShareId(dataSetCode, shareId);
         service.updateShareIdAndSize(dataSetCode, shareId, size);
-        FileUtilities.deleteRecursively(dataSetDirInStore);
+        new Thread(new Runnable()
+            {
+                public void run()
+                {
+                    logger.log(LogLevel.INFO, "Await for data set " + dataSetCode
+                            + " to be unlocked.");
+                    shareIdManager.await(dataSetCode);
+                    FileUtilities.deleteRecursively(dataSetDirInStore);
+                    logger.log(LogLevel.INFO, "Data set " + dataSetCode + " at "
+                            + dataSetDirInStore + " has been deleted.");
+                }
+            }).start();
     }
 
     private static void copyToShare(File file, File share)
diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java
index 96a664b49fb..f57347126bc 100644
--- a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java
+++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/plugins/SimpleShufflingTest.java
@@ -161,13 +161,13 @@ public class SimpleShufflingTest extends AbstractFileSystemTestCase
                     one(logger).log(LogLevel.INFO, "END Computing number of data sets to move for share 1");
                     one(logger).log(LogLevel.INFO, "\t2 data sets to move, available space : 102500");
                     
-                    one(logger).log(LogLevel.INFO, "Moving data set ds3 from share 1 to share 3 ...");
-                    one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds3"), share3.getShare());
-                    one(logger).log(LogLevel.INFO, "Moving data set ds3 from share 1 to share 3 took 0 seconds.");
+                    one(logger).log(LogLevel.INFO, "Copying data set ds3 from share 1 to share 3 ...");
+                    one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds3"), share3.getShare(), logger);
+                    one(logger).log(LogLevel.INFO, "Copying data set ds3 from share 1 to share 3 took 0 seconds.");
                     
-                    one(logger).log(LogLevel.INFO, "Moving data set ds2 from share 1 to share 3 ...");
-                    one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds2"), share3.getShare());
-                    one(logger).log(LogLevel.INFO, "Moving data set ds2 from share 1 to share 3 took 0 seconds.");
+                    one(logger).log(LogLevel.INFO, "Copying data set ds2 from share 1 to share 3 ...");
+                    one(dataSetMover).moveDataSetToAnotherShare(new File(share1.getShare(), STORE_PATH + "ds2"), share3.getShare(), logger);
+                    one(logger).log(LogLevel.INFO, "Copying data set ds2 from share 1 to share 3 took 0 seconds.");
                     
                     one(logger).log(LogLevel.INFO, "BEGIN Computing number of data sets to move for share 2");
                     one(logger).log(LogLevel.INFO, "\tSpace needed to free: 1585152 bytes (1548.00 kB, 1.51 MB)");
diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java
index ae770458528..f6f83761a79 100644
--- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java
+++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/ShareIdManagerTest.java
@@ -151,7 +151,7 @@ public class ShareIdManagerTest extends AssertJUnit
                     manager.lock(DS1);
                     try
                     {
-                        manager.setShareId(DS1, "1");
+                        manager.await(DS1);
                     } catch (EnvironmentFailureException ex)
                     {
                         System.out.println(ex);
@@ -193,17 +193,15 @@ public class ShareIdManagerTest extends AssertJUnit
             }, "T1").start();
         ch1.assertNextMessage("locked"); // wait until data set is really locked.
 
-        manager.setShareId(DS1, "1");
+        manager.await(DS1);
 
-        assertEquals("1", manager.getShareId(DS1));
         ch1.assertNextMessage("unlocked"); // wait until thread is finished
         assertEquals("INFO  OPERATION.ShareIdManager"
                 + " - Share id manager initialized with 2 data sets.\n"
                 + "DEBUG OPERATION.ShareIdManager - Data set ds1 has been locked.\n"
                 + "DEBUG OPERATION.ShareIdManager"
                 + " - Data set ds1 is locked by the following threads: T1\n"
-                + "DEBUG OPERATION.ShareIdManager - Unlock data set ds1\n"
-                + "INFO  OPERATION.ShareIdManager - New share of data set ds1 is 1",
+                + "DEBUG OPERATION.ShareIdManager - Unlock data set ds1",
                 logRecorder.getLogContent());
     }
 
@@ -240,7 +238,7 @@ public class ShareIdManagerTest extends AssertJUnit
 
         try
         {
-            manager.setShareId(DS1, "1");
+            manager.await(DS1);
             fail("EnvironmentFailureException expected.");
         } catch (EnvironmentFailureException ex)
         {
diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java
index 9a555bd929d..31aea1d2f16 100644
--- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java
+++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java
@@ -29,6 +29,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import ch.systemsx.cisd.base.tests.AbstractFileSystemTestCase;
+import ch.systemsx.cisd.common.concurrent.MessageChannel;
 import ch.systemsx.cisd.common.filesystem.FileUtilities;
 import ch.systemsx.cisd.common.filesystem.HostAwareFile;
 import ch.systemsx.cisd.common.filesystem.IFreeSpaceProvider;
@@ -51,10 +52,22 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase
     private static final class MockLogger implements ISimpleLogger
     {
         private final StringBuilder builder = new StringBuilder();
+        private final MessageChannel messageChannel = new MessageChannel();
         
         public void log(LogLevel level, String message)
         {
             builder.append(level).append(": ").append(message).append('\n');
+            messageChannel.send(message);
+        }
+        
+        public void assertNextLogMessage(String expectedMessage)
+        {
+            messageChannel.assertNextMessage(expectedMessage);
+        }
+        
+        public void assertNoMoreLogMessages()
+        {
+            messageChannel.assertEmpty();
         }
 
         @Override
@@ -69,7 +82,7 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase
     private Mockery context;
     private IEncapsulatedOpenBISService service;
     private IShareIdManager shareIdManager;
-    private ISimpleLogger log;
+    private MockLogger log;
     private IFreeSpaceProvider freeSpaceProvider;
     private ITimeProvider timeProvider;
 
@@ -203,17 +216,22 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase
                     
                     one(service).updateShareIdAndSize("ds-1", "2", 11L);
                     one(shareIdManager).setShareId("ds-1", "2");
+                    one(shareIdManager).await("ds-1");
                 }
             });
         assertEquals(true, dataSetDirInStore.exists());
         assertFileNames(share2uuid01, "22");
         
-        SegmentedStoreUtils.moveDataSetToAnotherShare(dataSetDirInStore, share2, service, shareIdManager);
+        SegmentedStoreUtils.moveDataSetToAnotherShare(dataSetDirInStore, share2, service, shareIdManager, log);
 
+        log.assertNextLogMessage("Await for data set ds-1 to be unlocked.");
+        log.assertNextLogMessage("Data set ds-1 at " + share1
+                + "/uuid/01/02/03/ds-1 has been deleted.");
         assertEquals(false, dataSetDirInStore.exists());
         assertFileNames(share2uuid01, "02", "22");
         assertEquals("hello world\n",
                 FileUtilities.loadToString(new File(share2uuid01, "02/03/ds-1/original/hello.txt")));
+        log.assertNoMoreLogMessages();
         context.assertIsSatisfied();
     }
     
-- 
GitLab