From fe79be1d0f7d6dfbb598d500b81ab398b9e29e59 Mon Sep 17 00:00:00 2001
From: cramakri <cramakri>
Date: Thu, 21 Feb 2013 09:56:43 +0000
Subject: [PATCH] BIS-329 SP-500 : Added maximum validity duration for streams

SVN: 28402
---
 .../cisd/etlserver/ConfigProvider.java        |  6 ++++
 .../dss/generic/server/StreamRepository.java  | 20 +++++++++---
 .../dss/generic/shared/IConfigProvider.java   |  5 +++
 .../utils/DssPropertyParametersUtil.java      | 15 +++++++--
 .../generic/server/StreamRepositoryTest.java  | 32 ++++++++++++++++---
 5 files changed, 67 insertions(+), 11 deletions(-)

diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java
index bb04afe12ac..3e3d23f229e 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java
@@ -70,4 +70,10 @@ public class ConfigProvider implements IConfigProvider
     {
         return DssPropertyParametersUtil.getDataStreamTimeout(properties);
     }
+
+    @Override
+    public int getDataStreamMaxTimeout()
+    {
+        return DssPropertyParametersUtil.getDataStreamMaxTimeout(properties);
+    }
 }
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java
index 1e7594c3298..f00856b3401 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java
@@ -80,14 +80,16 @@ public class StreamRepository implements IStreamRepository
 
     private final long minimumTime;
 
+    private final long maximumTime;
+
     public StreamRepository(IConfigProvider configProvider)
     {
-        this(configProvider.getDataStreamTimeout(), new IdGenerator(),
-                SystemTimeProvider.SYSTEM_TIME_PROVIDER);
+        this(configProvider.getDataStreamTimeout(), configProvider.getDataStreamMaxTimeout(),
+                new IdGenerator(), SystemTimeProvider.SYSTEM_TIME_PROVIDER);
     }
 
     @Private
-    StreamRepository(int minimumTimeInSecondsToKeepStreams,
+    StreamRepository(int minimumTimeInSecondsToKeepStreams, int maximumTimeInSecondsToKeepStreams,
             IUniqueIdGenerator inputStreamIDGenerator, ITimeProvider timeProvider)
     {
         if (minimumTimeInSecondsToKeepStreams <= 0)
@@ -97,12 +99,21 @@ public class StreamRepository implements IStreamRepository
                             + minimumTimeInSecondsToKeepStreams);
         }
         minimumTime = minimumTimeInSecondsToKeepStreams * 1000L;
+
+        if (maximumTimeInSecondsToKeepStreams <= 0)
+        {
+            throw new IllegalArgumentException(
+                    "Maximum time to keep streams is not a positive number: "
+                            + maximumTimeInSecondsToKeepStreams);
+        }
+        maximumTime = maximumTimeInSecondsToKeepStreams * 1000L;
         this.inputStreamIDGenerator = inputStreamIDGenerator;
         this.timeProvider = timeProvider;
     }
 
     @Override
-    public synchronized String addStream(InputStream inputStream, String path, long validityInSeconds)
+    public synchronized String addStream(InputStream inputStream, String path,
+            long validityInSeconds)
     {
         removeStaleInputStreams();
         String id = inputStreamIDGenerator.createUniqueID();
@@ -110,6 +121,7 @@ public class StreamRepository implements IStreamRepository
 
         long validityInMs = validityInSeconds * 1000L;
         long validity = (validityInMs < minimumTime) ? minimumTime : validityInMs;
+        validity = (validityInMs > maximumTime) ? maximumTime : validity;
         streams.put(id, new InputStreamWithValidityDuration(new InputStreamWithPath(inputStream,
                 path), timestamp, validity));
         return id;
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java
index f3ae67af575..7fa6e4b30cc 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java
@@ -45,4 +45,9 @@ public interface IConfigProvider
      * Return the minimum time (in seconds) that data streams are kept before expiring.
      */
     int getDataStreamTimeout();
+
+    /**
+     * Return the maximum time (in seconds) that data streams are kept before expiring.
+     */
+    int getDataStreamMaxTimeout();
 }
diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java
index dc5e695b484..e35bf73ceca 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java
@@ -58,7 +58,11 @@ public class DssPropertyParametersUtil
 
     public static final String DATA_STREAM_TIMEOUT = "data-stream-timeout";
 
-    public static final int MINIMUM_TIME_TO_KEEP_STREAMS_DEFAULT = 20;
+    public static final String DATA_STREAM_MAX_TIMEOUT = "data-stream-max-timeout";
+
+    public static final int MINIMUM_TIME_TO_KEEP_STREAMS_DEFAULT = 5;
+
+    public static final int MAXIMUM_TIME_TO_KEEP_STREAMS_DEFAULT = 60 * 60 * 4; // 4 hours
 
     /**
      * Temp directory for dss usage.
@@ -100,8 +104,7 @@ public class DssPropertyParametersUtil
         ExtendedProperties serviceProperties = extendProperties(properties);
         CorePluginsInjector injector =
                 new CorePluginsInjector(ScannerType.DSS, DssPluginType.values());
-        Map<String, File> pluginFolders =
-                injector.injectCorePlugins(serviceProperties);
+        Map<String, File> pluginFolders = injector.injectCorePlugins(serviceProperties);
 
         if (PluginContainer.tryGetInstance() == null)
         {
@@ -170,6 +173,12 @@ public class DssPropertyParametersUtil
                 MINIMUM_TIME_TO_KEEP_STREAMS_DEFAULT);
     }
 
+    public static int getDataStreamMaxTimeout(Properties serviceProperties)
+    {
+        return PropertyUtils.getPosInt(serviceProperties, DATA_STREAM_MAX_TIMEOUT,
+                MAXIMUM_TIME_TO_KEEP_STREAMS_DEFAULT);
+    }
+
     public static File getDssInternalTempDir(final Properties properties)
     {
         return getDssInternalTempDir(FileOperations.getInstance(), properties);
diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java
index f8b033c4a95..03e99248b05 100644
--- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java
+++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java
@@ -64,7 +64,7 @@ public class StreamRepositoryTest extends AssertJUnit
     @Test
     public void testAddingAndRetrievingTwoStreams()
     {
-        StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider);
+        StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider);
         ByteArrayInputStream stream1 = new ByteArrayInputStream("s1".getBytes());
         String id1 = repository.addStream(stream1, "f1.txt", 0);
         ByteArrayInputStream stream2 = new ByteArrayInputStream("s2".getBytes());
@@ -84,7 +84,7 @@ public class StreamRepositoryTest extends AssertJUnit
     @Test
     public void testThatAStreamCanBeRetrievedOnlyOnce()
     {
-        StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider);
+        StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider);
         ByteArrayInputStream stream1 = new ByteArrayInputStream("s1".getBytes());
         String id1 = repository.addStream(stream1, "f1.txt", 0);
 
@@ -104,7 +104,7 @@ public class StreamRepositoryTest extends AssertJUnit
     @Test
     public void testAddingAndRetrievingTwoStreamsButSecondStreamNoLongerExists()
     {
-        StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider);
+        StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider);
         ByteArrayInputStream stream1 = new ByteArrayInputStream("s1".getBytes());
         String id1 = repository.addStream(stream1, "f1.txt", 0);
         ByteArrayInputStream stream2 = new ByteArrayInputStream("s2".getBytes());
@@ -130,7 +130,7 @@ public class StreamRepositoryTest extends AssertJUnit
     @Test
     public void testValidityDuration()
     {
-        StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider);
+        StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider);
         ByteArrayInputStream stream0 = new ByteArrayInputStream("s1".getBytes());
         repository.addStream(stream0, "f1.txt", 5);
         ByteArrayInputStream stream1 = new ByteArrayInputStream("s2".getBytes());
@@ -152,4 +152,28 @@ public class StreamRepositoryTest extends AssertJUnit
             assertEquals("Stream 1 is no longer available.", ex.getMessage());
         }
     }
+
+    @Test
+    public void testMaxValidityDuration()
+    {
+        StreamRepository repository = new StreamRepository(2, 3, idGenerator, timeProvider);
+        ByteArrayInputStream stream0 = new ByteArrayInputStream("s1".getBytes());
+        repository.addStream(stream0, "f1.txt", 10);
+
+        // Advance to the requested time, but beyond the stream repository's max allowed
+        timeProvider.getTimeInMilliseconds();
+        timeProvider.getTimeInMilliseconds();
+        timeProvider.getTimeInMilliseconds();
+        timeProvider.getTimeInMilliseconds();
+        timeProvider.getTimeInMilliseconds();
+        timeProvider.getTimeInMilliseconds();
+        try
+        {
+            repository.getStream("0");
+            fail("IllegalArgumentException expected");
+        } catch (IllegalArgumentException ex)
+        {
+            assertEquals("Stream 0 is no longer available.", ex.getMessage());
+        }
+    }
 }
-- 
GitLab