diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java index bb04afe12ac8afe73eec6ba98b405d50ed51ebc8..3e3d23f229e045468dac1d395351554b36470085 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ConfigProvider.java @@ -70,4 +70,10 @@ public class ConfigProvider implements IConfigProvider { return DssPropertyParametersUtil.getDataStreamTimeout(properties); } + + @Override + public int getDataStreamMaxTimeout() + { + return DssPropertyParametersUtil.getDataStreamMaxTimeout(properties); + } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java index 1e7594c32987de1e521815aeac0a2a258a85cb65..f00856b3401bf1b5c759d834b0afd2a8e91faa2d 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepository.java @@ -80,14 +80,16 @@ public class StreamRepository implements IStreamRepository private final long minimumTime; + private final long maximumTime; + public StreamRepository(IConfigProvider configProvider) { - this(configProvider.getDataStreamTimeout(), new IdGenerator(), - SystemTimeProvider.SYSTEM_TIME_PROVIDER); + this(configProvider.getDataStreamTimeout(), configProvider.getDataStreamMaxTimeout(), + new IdGenerator(), SystemTimeProvider.SYSTEM_TIME_PROVIDER); } @Private - StreamRepository(int minimumTimeInSecondsToKeepStreams, + StreamRepository(int minimumTimeInSecondsToKeepStreams, int maximumTimeInSecondsToKeepStreams, IUniqueIdGenerator inputStreamIDGenerator, ITimeProvider timeProvider) { if (minimumTimeInSecondsToKeepStreams <= 0) @@ -97,12 +99,21 @@ public class StreamRepository implements IStreamRepository + minimumTimeInSecondsToKeepStreams); } minimumTime = minimumTimeInSecondsToKeepStreams * 1000L; + + if (maximumTimeInSecondsToKeepStreams <= 0) + { + throw new IllegalArgumentException( + "Maximum time to keep streams is not a positive number: " + + maximumTimeInSecondsToKeepStreams); + } + maximumTime = maximumTimeInSecondsToKeepStreams * 1000L; this.inputStreamIDGenerator = inputStreamIDGenerator; this.timeProvider = timeProvider; } @Override - public synchronized String addStream(InputStream inputStream, String path, long validityInSeconds) + public synchronized String addStream(InputStream inputStream, String path, + long validityInSeconds) { removeStaleInputStreams(); String id = inputStreamIDGenerator.createUniqueID(); @@ -110,6 +121,7 @@ public class StreamRepository implements IStreamRepository long validityInMs = validityInSeconds * 1000L; long validity = (validityInMs < minimumTime) ? minimumTime : validityInMs; + validity = (validityInMs > maximumTime) ? maximumTime : validity; streams.put(id, new InputStreamWithValidityDuration(new InputStreamWithPath(inputStream, path), timestamp, validity)); return id; diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java index f3ae67af575608d4306315c9757ac9d7335aa1ff..7fa6e4b30cce83fa754492ef6e1debfa800f7ff4 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/IConfigProvider.java @@ -45,4 +45,9 @@ public interface IConfigProvider * Return the minimum time (in seconds) that data streams are kept before expiring. */ int getDataStreamTimeout(); + + /** + * Return the maximum time (in seconds) that data streams are kept before expiring. + */ + int getDataStreamMaxTimeout(); } diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java index dc5e695b48441bbafa0b5f39ddd7907452388403..e35bf73cecab2016432a2cad8e8303a389b976d8 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/DssPropertyParametersUtil.java @@ -58,7 +58,11 @@ public class DssPropertyParametersUtil public static final String DATA_STREAM_TIMEOUT = "data-stream-timeout"; - public static final int MINIMUM_TIME_TO_KEEP_STREAMS_DEFAULT = 20; + public static final String DATA_STREAM_MAX_TIMEOUT = "data-stream-max-timeout"; + + public static final int MINIMUM_TIME_TO_KEEP_STREAMS_DEFAULT = 5; + + public static final int MAXIMUM_TIME_TO_KEEP_STREAMS_DEFAULT = 60 * 60 * 4; // 4 hours /** * Temp directory for dss usage. @@ -100,8 +104,7 @@ public class DssPropertyParametersUtil ExtendedProperties serviceProperties = extendProperties(properties); CorePluginsInjector injector = new CorePluginsInjector(ScannerType.DSS, DssPluginType.values()); - Map<String, File> pluginFolders = - injector.injectCorePlugins(serviceProperties); + Map<String, File> pluginFolders = injector.injectCorePlugins(serviceProperties); if (PluginContainer.tryGetInstance() == null) { @@ -170,6 +173,12 @@ public class DssPropertyParametersUtil MINIMUM_TIME_TO_KEEP_STREAMS_DEFAULT); } + public static int getDataStreamMaxTimeout(Properties serviceProperties) + { + return PropertyUtils.getPosInt(serviceProperties, DATA_STREAM_MAX_TIMEOUT, + MAXIMUM_TIME_TO_KEEP_STREAMS_DEFAULT); + } + public static File getDssInternalTempDir(final Properties properties) { return getDssInternalTempDir(FileOperations.getInstance(), properties); diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java index f8b033c4a95a95f3f8e5b71807230cdcc4b7d7a3..03e99248b054c92767673e37ca7da592e0cadf17 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/StreamRepositoryTest.java @@ -64,7 +64,7 @@ public class StreamRepositoryTest extends AssertJUnit @Test public void testAddingAndRetrievingTwoStreams() { - StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider); + StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider); ByteArrayInputStream stream1 = new ByteArrayInputStream("s1".getBytes()); String id1 = repository.addStream(stream1, "f1.txt", 0); ByteArrayInputStream stream2 = new ByteArrayInputStream("s2".getBytes()); @@ -84,7 +84,7 @@ public class StreamRepositoryTest extends AssertJUnit @Test public void testThatAStreamCanBeRetrievedOnlyOnce() { - StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider); + StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider); ByteArrayInputStream stream1 = new ByteArrayInputStream("s1".getBytes()); String id1 = repository.addStream(stream1, "f1.txt", 0); @@ -104,7 +104,7 @@ public class StreamRepositoryTest extends AssertJUnit @Test public void testAddingAndRetrievingTwoStreamsButSecondStreamNoLongerExists() { - StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider); + StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider); ByteArrayInputStream stream1 = new ByteArrayInputStream("s1".getBytes()); String id1 = repository.addStream(stream1, "f1.txt", 0); ByteArrayInputStream stream2 = new ByteArrayInputStream("s2".getBytes()); @@ -130,7 +130,7 @@ public class StreamRepositoryTest extends AssertJUnit @Test public void testValidityDuration() { - StreamRepository repository = new StreamRepository(2, idGenerator, timeProvider); + StreamRepository repository = new StreamRepository(2, 10, idGenerator, timeProvider); ByteArrayInputStream stream0 = new ByteArrayInputStream("s1".getBytes()); repository.addStream(stream0, "f1.txt", 5); ByteArrayInputStream stream1 = new ByteArrayInputStream("s2".getBytes()); @@ -152,4 +152,28 @@ public class StreamRepositoryTest extends AssertJUnit assertEquals("Stream 1 is no longer available.", ex.getMessage()); } } + + @Test + public void testMaxValidityDuration() + { + StreamRepository repository = new StreamRepository(2, 3, idGenerator, timeProvider); + ByteArrayInputStream stream0 = new ByteArrayInputStream("s1".getBytes()); + repository.addStream(stream0, "f1.txt", 10); + + // Advance to the requested time, but beyond the stream repository's max allowed + timeProvider.getTimeInMilliseconds(); + timeProvider.getTimeInMilliseconds(); + timeProvider.getTimeInMilliseconds(); + timeProvider.getTimeInMilliseconds(); + timeProvider.getTimeInMilliseconds(); + timeProvider.getTimeInMilliseconds(); + try + { + repository.getStream("0"); + fail("IllegalArgumentException expected"); + } catch (IllegalArgumentException ex) + { + assertEquals("Stream 0 is no longer available.", ex.getMessage()); + } + } }