diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCache.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCache.java index fb7a40b5b373eadb9b6b2ab3faee3184475af2c0..740bac2cf0cd650219493a02d31a35a144680d58 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCache.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCache.java @@ -32,12 +32,15 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.IOUtils; +import org.apache.log4j.Logger; import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.filesystem.FileOperations; import ch.systemsx.cisd.common.filesystem.IFileOperations; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.ITimeProvider; import ch.systemsx.cisd.common.utilities.SystemTimeProvider; import ch.systemsx.cisd.openbis.dss.generic.shared.api.v1.IDssServiceRpcGeneric; @@ -53,6 +56,9 @@ public class ContentCache implements IContentCache { public static final String CACHE_WORKSPACE_FOLDER_KEY = "cache-workspace-folder"; + private final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + ContentCache.class); + static final String CACHE_FOLDER = "cached"; static final String DOWNLOADING_FOLDER = "downloading"; @@ -84,6 +90,7 @@ public class ContentCache implements IContentCache this.timeProvider = timeProvider; fileOperations.removeRecursivelyQueueing(new File(cacheWorkspace, DOWNLOADING_FOLDER)); fileLockManager = new LockManager(); + operationLog.info("Content cache created. Workspace: " + cacheWorkspace.getAbsolutePath()); } @Override @@ -138,10 +145,7 @@ public class ContentCache implements IContentCache { downloadFile(sessionToken, dataSetLocation, path); } - File dataSetFolder = - new File(workspace, createDataSetPath(CACHE_FOLDER, - dataSetLocation.getDataSetCode())); - dataSetFolder.setLastModified(timeProvider.getTimeInMilliseconds()); + touchDataSetFolder(dataSetLocation); return file; } finally { @@ -150,16 +154,79 @@ public class ContentCache implements IContentCache } @Override - public InputStream getInputStream(String sessionToken, IDatasetLocation dataSetLocation, + public InputStream getInputStream(String sessionToken, final IDatasetLocation dataSetLocation, DataSetPathInfo path) { - try - { - return new FileInputStream(getFile(sessionToken, dataSetLocation, path)); - } catch (FileNotFoundException ex) + final String pathInWorkspace = createPathInWorkspace(CACHE_FOLDER, dataSetLocation, path); + fileLockManager.lock(pathInWorkspace); + final File file = new File(workspace, pathInWorkspace); + if (file.exists()) { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); + try + { + FileInputStream fileInputStream = + new FileInputStream(getFile(sessionToken, dataSetLocation, path)); + touchDataSetFolder(dataSetLocation); + return fileInputStream; + } catch (FileNotFoundException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } finally + { + fileLockManager.unlock(pathInWorkspace); + } } + final File tempFile = createTempFile(); + final InputStream inputStream = createInputStream(sessionToken, dataSetLocation, path); + final OutputStream fileOutputStream = createFileOutputStream(tempFile); + return new InputStream() + { + private boolean closed; + + @Override + public int read() throws IOException + { + int b = inputStream.read(); + fileOutputStream.write(b); + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + int count = inputStream.read(b, off, len); + if (count >= 0) + { + fileOutputStream.write(b, off, count); + } + return count; + } + + @Override + public void close() throws IOException + { + if (closed) + { + return; + } + inputStream.close(); + fileOutputStream.close(); + moveDownloadedFileToCache(tempFile, pathInWorkspace); + touchDataSetFolder(dataSetLocation); + closed = true; + fileLockManager.unlock(pathInWorkspace); + } + + @Override + protected void finalize() throws Throwable + { + if (closed == false) + { + fileLockManager.unlock(pathInWorkspace); + } + super.finalize(); + } + }; } private void downloadFile(String sessionToken, IDatasetLocation dataSetLocation, @@ -168,18 +235,10 @@ public class ContentCache implements IContentCache InputStream input = null; try { - String dataStoreUrl = dataSetLocation.getDataStoreUrl(); - IDssServiceRpcGeneric service = serviceFactory.getService(dataStoreUrl); - String dataSetCode = dataSetLocation.getDataSetCode(); - String relativePath = path.getRelativePath(); - String url = - service.getDownloadUrlForFileForDataSet(sessionToken, dataSetCode, relativePath); - input = createURL(url).openStream(); + input = createInputStream(sessionToken, dataSetLocation, path); File downloadedFile = createFileFromInputStream(dataSetLocation, path, input); String pathInWorkspace = createPathInWorkspace(CACHE_FOLDER, dataSetLocation, path); - File file = new File(workspace, pathInWorkspace); - createFolder(file.getParentFile()); - downloadedFile.renameTo(file); + moveDownloadedFileToCache(downloadedFile, pathInWorkspace); } catch (Exception ex) { throw CheckedExceptionTunnel.wrapIfNecessary(ex); @@ -189,6 +248,65 @@ public class ContentCache implements IContentCache } } + private void moveDownloadedFileToCache(File downloadedFile, String pathInWorkspace) + { + File file = new File(workspace, pathInWorkspace); + createFolder(file.getParentFile()); + boolean success = downloadedFile.renameTo(file); + String msg = "'" + pathInWorkspace + "' successfully downloaded "; + if (success) + { + operationLog.debug(msg + "and successfully moved to cache."); + } else + { + operationLog.warn(msg + "but couldn't move to cache."); + } + } + + private void touchDataSetFolder(IDatasetLocation dataSetLocation) + { + File dataSetFolder = + new File(workspace, createDataSetPath(CACHE_FOLDER, + dataSetLocation.getDataSetCode())); + dataSetFolder.setLastModified(timeProvider.getTimeInMilliseconds()); + } + + private InputStream createInputStream(String sessionToken, IDatasetLocation dataSetLocation, + DataSetPathInfo path) + { + String dataStoreUrl = dataSetLocation.getDataStoreUrl(); + IDssServiceRpcGeneric service = serviceFactory.getService(dataStoreUrl); + String dataSetCode = dataSetLocation.getDataSetCode(); + String relativePath = path.getRelativePath(); + URL url = + createURL(service.getDownloadUrlForFileForDataSet(sessionToken, dataSetCode, + relativePath)); + InputStream openStream = null; + try + { + openStream = url.openStream(); + return openStream; + } catch (IOException ex) + { + IOUtils.closeQuietly(openStream); + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private OutputStream createFileOutputStream(File file) + { + OutputStream outputStream = null; + try + { + outputStream = new FileOutputStream(file); + return outputStream; + } catch (FileNotFoundException ex) + { + IOUtils.closeQuietly(outputStream); + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + private String createPathInWorkspace(String folder, IDatasetLocation dataSetLocation, DataSetPathInfo path) { @@ -219,9 +337,7 @@ public class ContentCache implements IContentCache private File createFileFromInputStream(IDatasetLocation dataSetLocation, DataSetPathInfo path, InputStream inputStream) { - String relativePath = DOWNLOADING_FOLDER + "/" + Thread.currentThread().getId(); - File file = new File(workspace, relativePath); - createFolder(file.getParentFile()); + File file = createTempFile(); OutputStream ostream = null; try { @@ -237,6 +353,14 @@ public class ContentCache implements IContentCache IOUtils.closeQuietly(ostream); } } + + private File createTempFile() + { + String relativePath = DOWNLOADING_FOLDER + "/" + Thread.currentThread().getId(); + File file = new File(workspace, relativePath); + createFolder(file.getParentFile()); + return file; + } private void createFolder(File folder) { @@ -249,7 +373,7 @@ public class ContentCache implements IContentCache } } } - + private static final class LockManager { private static final class LockWithCounter @@ -290,6 +414,6 @@ public class ContentCache implements IContentCache } } - } - + } + } diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCacheTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCacheTest.java index 56979b248f968c56c0699388aa29f2cd34b88046..5a24473d589303c436654884eede732d94f19393 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCacheTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/content/ContentCacheTest.java @@ -16,84 +16,294 @@ package ch.systemsx.cisd.openbis.dss.generic.shared.content; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import org.apache.commons.io.IOUtils; import org.jmock.Expectations; -import org.jmock.Mockery; -import org.testng.AssertJUnit; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import ch.systemsx.cisd.common.filesystem.IFileOperations; -import ch.systemsx.cisd.common.utilities.ITimeProvider; +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.concurrent.MessageChannel; +import ch.systemsx.cisd.common.concurrent.MessageChannelBuilder; +import ch.systemsx.cisd.common.filesystem.FileUtilities; +import ch.systemsx.cisd.common.logging.ConsoleLogger; +import ch.systemsx.cisd.common.test.ProxyAction; +import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetPathInfo; /** - * - * * @author Franz-Josef Elmer */ -public class ContentCacheTest extends AssertJUnit +public class ContentCacheTest extends AbstractRemoteHierarchicalContentTestCase { - private static final String SESSION_TOKEN = "session"; - - private Mockery context; - - private IFileOperations fileOperations; - - private ITimeProvider timeProvider; - - @BeforeMethod - public void setUp() - { - context = new Mockery(); - fileOperations = context.mock(IFileOperations.class); - timeProvider = context.mock(ITimeProvider.class); - } - - @AfterMethod - public void tearDown() - { - // To following line of code should also be called at the end of each test method. - // Otherwise one do not known which test failed. - context.assertIsSatisfied(); - } - @Test public void testDataSetLocking() { ContentCache cache = createCache(); - + cache.lockDataSet(SESSION_TOKEN, "DS-1"); - + assertEquals(true, cache.isDataSetLocked(SESSION_TOKEN, "DS-1")); assertEquals(false, cache.isDataSetLocked(SESSION_TOKEN, "DS-2")); - + cache.lockDataSet(SESSION_TOKEN, "DS-1"); - + assertEquals(true, cache.isDataSetLocked(SESSION_TOKEN, "DS-1")); - + cache.unlockDataSet(SESSION_TOKEN, "DS-1"); - + assertEquals(true, cache.isDataSetLocked(SESSION_TOKEN, "DS-1")); - + cache.unlockDataSet(SESSION_TOKEN, "DS-1"); - + assertEquals(false, cache.isDataSetLocked(SESSION_TOKEN, "DS-1")); - + + context.assertIsSatisfied(); + } + + @Test + public void testGetInputStream() + { + final DataSetPathInfo pathInfo1 = prepareForDownloading(remoteFile1); + + ContentCache cache = createCache(); + InputStream inputStream = cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1); + + assertEquals(FILE1_CONTENT, readContent(inputStream, true)); + context.assertIsSatisfied(); + } + + @Test(invocationCount = 1, invocationTimeOut = 10000) + public void testGetInputStreamForSameContentInTwoThreads() + { + final DataSetPathInfo pathInfo1 = prepareForDownloading(remoteFile1); + ConsoleLogger logger = new ConsoleLogger(); + final MessageChannel channel1 = + new MessageChannelBuilder(10000).name("1").logger(logger).getChannel(); + final MessageChannel channel2 = + new MessageChannelBuilder(10000).name("2").logger(logger).getChannel(); + final MessageChannel channel3 = + new MessageChannelBuilder(10000).name("3").logger(logger).getChannel(); + final ContentCache cache = createCache(); + File fileInCache = + new File(workSpace, ContentCache.CACHE_FOLDER + "/" + DATA_SET_CODE + "/" + + remoteFile1.getName()); + assertEquals(false, fileInCache.exists()); + + new Thread(new Runnable() + { + @Override + public void run() + { + InputStream inputStream = + cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1); + channel1.send(STARTED_MESSAGE); + channel2.assertNextMessage(STARTED_MESSAGE); + channel1.send(readContent(inputStream, true)); + } + }, "thread1").start(); + new Thread(new Runnable() + { + @Override + public void run() + { + channel1.assertNextMessage(STARTED_MESSAGE); + channel2.send(STARTED_MESSAGE); + InputStream inputStream = + cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1); + channel1.assertNextMessage(FILE1_CONTENT); + channel2.send(readContent(inputStream, true)); + channel3.send(FINISHED_MESSAGE); + } + }, "thread2").start(); + + channel3.assertNextMessage(FINISHED_MESSAGE); + channel2.assertNextMessage(FILE1_CONTENT); + + assertEquals(FILE1_CONTENT, FileUtilities.loadToString(fileInCache).trim()); + File fileFromCache = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1); + assertEquals(fileInCache.getAbsolutePath(), fileFromCache.getAbsolutePath()); + context.assertIsSatisfied(); + } + + @Test(invocationCount = 1, invocationTimeOut = 10000) + public void testGetFileAndGetInputStreamForSameContentInTwoThreads() throws Exception + { + final DataSetPathInfo pathInfo = new DataSetPathInfo(); + pathInfo.setRelativePath(remoteFile1.getName()); + pathInfo.setDirectory(false); + ConsoleLogger logger = new ConsoleLogger(); + final MessageChannel channel1 = + new MessageChannelBuilder(10000).name("11").logger(logger).getChannel(); + final MessageChannel channel2 = + new MessageChannelBuilder(10000).name("12").logger(logger).getChannel(); + final MessageChannel channel3 = + new MessageChannelBuilder(10000).name("13").logger(logger).getChannel(); + final ContentCache cache = createCache(); + context.checking(new Expectations() + { + { + one(remoteDss).getDownloadUrlForFileForDataSet(SESSION_TOKEN, DATA_SET_CODE, + pathInfo.getRelativePath()); + will(new ProxyAction(returnValue(remoteFile1.toURI().toURL().toString())) + { + @Override + protected void doBeforeReturn() + { + channel1.send(STARTED_MESSAGE); + channel2.assertNextMessage(STARTED_MESSAGE); + } + }); + } + }); + File fileInCache = + new File(workSpace, ContentCache.CACHE_FOLDER + "/" + DATA_SET_CODE + "/" + + remoteFile1.getName()); + assertEquals(false, fileInCache.exists()); + + new Thread(new Runnable() + { + @Override + public void run() + { + File file = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo); + channel1.send(FileUtilities.loadToString(file).trim()); + } + }, "thread1").start(); + new Thread(new Runnable() + { + @Override + public void run() + { + channel1.assertNextMessage(STARTED_MESSAGE); + channel2.send(STARTED_MESSAGE); + InputStream inputStream = + cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo); + channel1.assertNextMessage(FILE1_CONTENT); + channel2.send(readContent(inputStream, true)); + channel3.send(FINISHED_MESSAGE); + } + }, "thread2").start(); + + channel3.assertNextMessage(FINISHED_MESSAGE); + channel2.assertNextMessage(FILE1_CONTENT); + + assertEquals(FILE1_CONTENT, FileUtilities.loadToString(fileInCache).trim()); + File fileFromCache = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo); + assertEquals(fileInCache.getAbsolutePath(), fileFromCache.getAbsolutePath()); context.assertIsSatisfied(); } - - private ContentCache createCache() + + @Test(invocationCount = 1, invocationTimeOut = 10000) + public void testGetInputStreamGetFileForSameContentInTwoThreads() throws Exception { - final File workSpace = new File("."); + final DataSetPathInfo pathInfo = new DataSetPathInfo(); + pathInfo.setRelativePath(remoteFile1.getName()); + pathInfo.setDirectory(false); + ConsoleLogger logger = new ConsoleLogger(); + final MessageChannel channel1 = + new MessageChannelBuilder(10000).name("21").logger(logger).getChannel(); + final MessageChannel channel2 = + new MessageChannelBuilder(10000).name("22").logger(logger).getChannel(); + final MessageChannel channel3 = + new MessageChannelBuilder(10000).name("3").logger(logger).getChannel(); + final ContentCache cache = createCache(); context.checking(new Expectations() { { - one(fileOperations).removeRecursivelyQueueing( - new File(workSpace, ContentCache.DOWNLOADING_FOLDER)); + one(remoteDss).getDownloadUrlForFileForDataSet(SESSION_TOKEN, DATA_SET_CODE, + pathInfo.getRelativePath()); + will(new ProxyAction(returnValue(remoteFile1.toURI().toURL().toString())) + { + @Override + protected void doBeforeReturn() + { + channel1.send(STARTED_MESSAGE); + channel2.assertNextMessage(STARTED_MESSAGE); + } + }); } }); - return new ContentCache(null, workSpace, fileOperations, timeProvider); + File fileInCache = + new File(workSpace, ContentCache.CACHE_FOLDER + "/" + DATA_SET_CODE + "/" + + remoteFile1.getName()); + assertEquals(false, fileInCache.exists()); + + new Thread(new Runnable() + { + @Override + public void run() + { + InputStream inputStream = + cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo); + channel1.send(readContent(inputStream, true)); + } + }, "thread1").start(); + new Thread(new Runnable() + { + @Override + public void run() + { + channel1.assertNextMessage(STARTED_MESSAGE); + channel2.send(STARTED_MESSAGE); + File file = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo); + channel1.assertNextMessage(FILE1_CONTENT); + channel2.send(FileUtilities.loadToString(file).trim()); + channel3.send(FINISHED_MESSAGE); + } + }, "thread2").start(); + + channel3.assertNextMessage(FINISHED_MESSAGE); + channel2.assertNextMessage(FILE1_CONTENT); + + assertEquals(FILE1_CONTENT, FileUtilities.loadToString(fileInCache).trim()); + File fileFromCache = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo); + assertEquals(fileInCache.getAbsolutePath(), fileFromCache.getAbsolutePath()); + context.assertIsSatisfied(); + } + + private String readContent(InputStream inputStream, boolean closeStream) + { + try + { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + IOUtils.copy(inputStream, outputStream); + return outputStream.toString(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } finally + { + if (closeStream) + { + IOUtils.closeQuietly(inputStream); + } + } + } + + private DataSetPathInfo prepareForDownloading(final File remoteFile) + { + final DataSetPathInfo pathInfo = new DataSetPathInfo(); + pathInfo.setRelativePath(remoteFile.getName()); + pathInfo.setDirectory(false); + context.checking(new Expectations() + { + { + one(remoteDss).getDownloadUrlForFileForDataSet(SESSION_TOKEN, DATA_SET_CODE, + pathInfo.getRelativePath()); + try + { + will(returnValue(remoteFile.toURI().toURL().toString())); + } catch (MalformedURLException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + }); + return pathInfo; } + }