Skip to content
Snippets Groups Projects
Commit 4267676e authored by felmer's avatar felmer
Browse files

SP-464, BIS-255: ContentCache.getInputStream() implemented and tested.

SVN: 28150
parent ae2cff5c
No related branches found
No related tags found
No related merge requests found
......@@ -32,12 +32,15 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException;
import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
import ch.systemsx.cisd.common.filesystem.FileOperations;
import ch.systemsx.cisd.common.filesystem.IFileOperations;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.ITimeProvider;
import ch.systemsx.cisd.common.utilities.SystemTimeProvider;
import ch.systemsx.cisd.openbis.dss.generic.shared.api.v1.IDssServiceRpcGeneric;
......@@ -53,6 +56,9 @@ public class ContentCache implements IContentCache
{
public static final String CACHE_WORKSPACE_FOLDER_KEY = "cache-workspace-folder";
private final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION,
ContentCache.class);
static final String CACHE_FOLDER = "cached";
static final String DOWNLOADING_FOLDER = "downloading";
......@@ -84,6 +90,7 @@ public class ContentCache implements IContentCache
this.timeProvider = timeProvider;
fileOperations.removeRecursivelyQueueing(new File(cacheWorkspace, DOWNLOADING_FOLDER));
fileLockManager = new LockManager();
operationLog.info("Content cache created. Workspace: " + cacheWorkspace.getAbsolutePath());
}
@Override
......@@ -138,10 +145,7 @@ public class ContentCache implements IContentCache
{
downloadFile(sessionToken, dataSetLocation, path);
}
File dataSetFolder =
new File(workspace, createDataSetPath(CACHE_FOLDER,
dataSetLocation.getDataSetCode()));
dataSetFolder.setLastModified(timeProvider.getTimeInMilliseconds());
touchDataSetFolder(dataSetLocation);
return file;
} finally
{
......@@ -150,16 +154,79 @@ public class ContentCache implements IContentCache
}
@Override
public InputStream getInputStream(String sessionToken, IDatasetLocation dataSetLocation,
public InputStream getInputStream(String sessionToken, final IDatasetLocation dataSetLocation,
DataSetPathInfo path)
{
try
{
return new FileInputStream(getFile(sessionToken, dataSetLocation, path));
} catch (FileNotFoundException ex)
final String pathInWorkspace = createPathInWorkspace(CACHE_FOLDER, dataSetLocation, path);
fileLockManager.lock(pathInWorkspace);
final File file = new File(workspace, pathInWorkspace);
if (file.exists())
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
try
{
FileInputStream fileInputStream =
new FileInputStream(getFile(sessionToken, dataSetLocation, path));
touchDataSetFolder(dataSetLocation);
return fileInputStream;
} catch (FileNotFoundException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
} finally
{
fileLockManager.unlock(pathInWorkspace);
}
}
final File tempFile = createTempFile();
final InputStream inputStream = createInputStream(sessionToken, dataSetLocation, path);
final OutputStream fileOutputStream = createFileOutputStream(tempFile);
return new InputStream()
{
private boolean closed;
@Override
public int read() throws IOException
{
int b = inputStream.read();
fileOutputStream.write(b);
return b;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int count = inputStream.read(b, off, len);
if (count >= 0)
{
fileOutputStream.write(b, off, count);
}
return count;
}
@Override
public void close() throws IOException
{
if (closed)
{
return;
}
inputStream.close();
fileOutputStream.close();
moveDownloadedFileToCache(tempFile, pathInWorkspace);
touchDataSetFolder(dataSetLocation);
closed = true;
fileLockManager.unlock(pathInWorkspace);
}
@Override
protected void finalize() throws Throwable
{
if (closed == false)
{
fileLockManager.unlock(pathInWorkspace);
}
super.finalize();
}
};
}
private void downloadFile(String sessionToken, IDatasetLocation dataSetLocation,
......@@ -168,18 +235,10 @@ public class ContentCache implements IContentCache
InputStream input = null;
try
{
String dataStoreUrl = dataSetLocation.getDataStoreUrl();
IDssServiceRpcGeneric service = serviceFactory.getService(dataStoreUrl);
String dataSetCode = dataSetLocation.getDataSetCode();
String relativePath = path.getRelativePath();
String url =
service.getDownloadUrlForFileForDataSet(sessionToken, dataSetCode, relativePath);
input = createURL(url).openStream();
input = createInputStream(sessionToken, dataSetLocation, path);
File downloadedFile = createFileFromInputStream(dataSetLocation, path, input);
String pathInWorkspace = createPathInWorkspace(CACHE_FOLDER, dataSetLocation, path);
File file = new File(workspace, pathInWorkspace);
createFolder(file.getParentFile());
downloadedFile.renameTo(file);
moveDownloadedFileToCache(downloadedFile, pathInWorkspace);
} catch (Exception ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
......@@ -189,6 +248,65 @@ public class ContentCache implements IContentCache
}
}
private void moveDownloadedFileToCache(File downloadedFile, String pathInWorkspace)
{
File file = new File(workspace, pathInWorkspace);
createFolder(file.getParentFile());
boolean success = downloadedFile.renameTo(file);
String msg = "'" + pathInWorkspace + "' successfully downloaded ";
if (success)
{
operationLog.debug(msg + "and successfully moved to cache.");
} else
{
operationLog.warn(msg + "but couldn't move to cache.");
}
}
private void touchDataSetFolder(IDatasetLocation dataSetLocation)
{
File dataSetFolder =
new File(workspace, createDataSetPath(CACHE_FOLDER,
dataSetLocation.getDataSetCode()));
dataSetFolder.setLastModified(timeProvider.getTimeInMilliseconds());
}
private InputStream createInputStream(String sessionToken, IDatasetLocation dataSetLocation,
DataSetPathInfo path)
{
String dataStoreUrl = dataSetLocation.getDataStoreUrl();
IDssServiceRpcGeneric service = serviceFactory.getService(dataStoreUrl);
String dataSetCode = dataSetLocation.getDataSetCode();
String relativePath = path.getRelativePath();
URL url =
createURL(service.getDownloadUrlForFileForDataSet(sessionToken, dataSetCode,
relativePath));
InputStream openStream = null;
try
{
openStream = url.openStream();
return openStream;
} catch (IOException ex)
{
IOUtils.closeQuietly(openStream);
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
}
private OutputStream createFileOutputStream(File file)
{
OutputStream outputStream = null;
try
{
outputStream = new FileOutputStream(file);
return outputStream;
} catch (FileNotFoundException ex)
{
IOUtils.closeQuietly(outputStream);
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
}
private String createPathInWorkspace(String folder, IDatasetLocation dataSetLocation,
DataSetPathInfo path)
{
......@@ -219,9 +337,7 @@ public class ContentCache implements IContentCache
private File createFileFromInputStream(IDatasetLocation dataSetLocation, DataSetPathInfo path,
InputStream inputStream)
{
String relativePath = DOWNLOADING_FOLDER + "/" + Thread.currentThread().getId();
File file = new File(workspace, relativePath);
createFolder(file.getParentFile());
File file = createTempFile();
OutputStream ostream = null;
try
{
......@@ -237,6 +353,14 @@ public class ContentCache implements IContentCache
IOUtils.closeQuietly(ostream);
}
}
private File createTempFile()
{
String relativePath = DOWNLOADING_FOLDER + "/" + Thread.currentThread().getId();
File file = new File(workspace, relativePath);
createFolder(file.getParentFile());
return file;
}
private void createFolder(File folder)
{
......@@ -249,7 +373,7 @@ public class ContentCache implements IContentCache
}
}
}
private static final class LockManager
{
private static final class LockWithCounter
......@@ -290,6 +414,6 @@ public class ContentCache implements IContentCache
}
}
}
}
}
......@@ -16,84 +16,294 @@
package ch.systemsx.cisd.openbis.dss.generic.shared.content;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import org.apache.commons.io.IOUtils;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import ch.systemsx.cisd.common.filesystem.IFileOperations;
import ch.systemsx.cisd.common.utilities.ITimeProvider;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.concurrent.MessageChannel;
import ch.systemsx.cisd.common.concurrent.MessageChannelBuilder;
import ch.systemsx.cisd.common.filesystem.FileUtilities;
import ch.systemsx.cisd.common.logging.ConsoleLogger;
import ch.systemsx.cisd.common.test.ProxyAction;
import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetPathInfo;
/**
*
*
* @author Franz-Josef Elmer
*/
public class ContentCacheTest extends AssertJUnit
public class ContentCacheTest extends AbstractRemoteHierarchicalContentTestCase
{
private static final String SESSION_TOKEN = "session";
private Mockery context;
private IFileOperations fileOperations;
private ITimeProvider timeProvider;
@BeforeMethod
public void setUp()
{
context = new Mockery();
fileOperations = context.mock(IFileOperations.class);
timeProvider = context.mock(ITimeProvider.class);
}
@AfterMethod
public void tearDown()
{
// To following line of code should also be called at the end of each test method.
// Otherwise one do not known which test failed.
context.assertIsSatisfied();
}
@Test
public void testDataSetLocking()
{
ContentCache cache = createCache();
cache.lockDataSet(SESSION_TOKEN, "DS-1");
assertEquals(true, cache.isDataSetLocked(SESSION_TOKEN, "DS-1"));
assertEquals(false, cache.isDataSetLocked(SESSION_TOKEN, "DS-2"));
cache.lockDataSet(SESSION_TOKEN, "DS-1");
assertEquals(true, cache.isDataSetLocked(SESSION_TOKEN, "DS-1"));
cache.unlockDataSet(SESSION_TOKEN, "DS-1");
assertEquals(true, cache.isDataSetLocked(SESSION_TOKEN, "DS-1"));
cache.unlockDataSet(SESSION_TOKEN, "DS-1");
assertEquals(false, cache.isDataSetLocked(SESSION_TOKEN, "DS-1"));
context.assertIsSatisfied();
}
@Test
public void testGetInputStream()
{
final DataSetPathInfo pathInfo1 = prepareForDownloading(remoteFile1);
ContentCache cache = createCache();
InputStream inputStream = cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1);
assertEquals(FILE1_CONTENT, readContent(inputStream, true));
context.assertIsSatisfied();
}
@Test(invocationCount = 1, invocationTimeOut = 10000)
public void testGetInputStreamForSameContentInTwoThreads()
{
final DataSetPathInfo pathInfo1 = prepareForDownloading(remoteFile1);
ConsoleLogger logger = new ConsoleLogger();
final MessageChannel channel1 =
new MessageChannelBuilder(10000).name("1").logger(logger).getChannel();
final MessageChannel channel2 =
new MessageChannelBuilder(10000).name("2").logger(logger).getChannel();
final MessageChannel channel3 =
new MessageChannelBuilder(10000).name("3").logger(logger).getChannel();
final ContentCache cache = createCache();
File fileInCache =
new File(workSpace, ContentCache.CACHE_FOLDER + "/" + DATA_SET_CODE + "/"
+ remoteFile1.getName());
assertEquals(false, fileInCache.exists());
new Thread(new Runnable()
{
@Override
public void run()
{
InputStream inputStream =
cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1);
channel1.send(STARTED_MESSAGE);
channel2.assertNextMessage(STARTED_MESSAGE);
channel1.send(readContent(inputStream, true));
}
}, "thread1").start();
new Thread(new Runnable()
{
@Override
public void run()
{
channel1.assertNextMessage(STARTED_MESSAGE);
channel2.send(STARTED_MESSAGE);
InputStream inputStream =
cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1);
channel1.assertNextMessage(FILE1_CONTENT);
channel2.send(readContent(inputStream, true));
channel3.send(FINISHED_MESSAGE);
}
}, "thread2").start();
channel3.assertNextMessage(FINISHED_MESSAGE);
channel2.assertNextMessage(FILE1_CONTENT);
assertEquals(FILE1_CONTENT, FileUtilities.loadToString(fileInCache).trim());
File fileFromCache = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo1);
assertEquals(fileInCache.getAbsolutePath(), fileFromCache.getAbsolutePath());
context.assertIsSatisfied();
}
@Test(invocationCount = 1, invocationTimeOut = 10000)
public void testGetFileAndGetInputStreamForSameContentInTwoThreads() throws Exception
{
final DataSetPathInfo pathInfo = new DataSetPathInfo();
pathInfo.setRelativePath(remoteFile1.getName());
pathInfo.setDirectory(false);
ConsoleLogger logger = new ConsoleLogger();
final MessageChannel channel1 =
new MessageChannelBuilder(10000).name("11").logger(logger).getChannel();
final MessageChannel channel2 =
new MessageChannelBuilder(10000).name("12").logger(logger).getChannel();
final MessageChannel channel3 =
new MessageChannelBuilder(10000).name("13").logger(logger).getChannel();
final ContentCache cache = createCache();
context.checking(new Expectations()
{
{
one(remoteDss).getDownloadUrlForFileForDataSet(SESSION_TOKEN, DATA_SET_CODE,
pathInfo.getRelativePath());
will(new ProxyAction(returnValue(remoteFile1.toURI().toURL().toString()))
{
@Override
protected void doBeforeReturn()
{
channel1.send(STARTED_MESSAGE);
channel2.assertNextMessage(STARTED_MESSAGE);
}
});
}
});
File fileInCache =
new File(workSpace, ContentCache.CACHE_FOLDER + "/" + DATA_SET_CODE + "/"
+ remoteFile1.getName());
assertEquals(false, fileInCache.exists());
new Thread(new Runnable()
{
@Override
public void run()
{
File file = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo);
channel1.send(FileUtilities.loadToString(file).trim());
}
}, "thread1").start();
new Thread(new Runnable()
{
@Override
public void run()
{
channel1.assertNextMessage(STARTED_MESSAGE);
channel2.send(STARTED_MESSAGE);
InputStream inputStream =
cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo);
channel1.assertNextMessage(FILE1_CONTENT);
channel2.send(readContent(inputStream, true));
channel3.send(FINISHED_MESSAGE);
}
}, "thread2").start();
channel3.assertNextMessage(FINISHED_MESSAGE);
channel2.assertNextMessage(FILE1_CONTENT);
assertEquals(FILE1_CONTENT, FileUtilities.loadToString(fileInCache).trim());
File fileFromCache = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo);
assertEquals(fileInCache.getAbsolutePath(), fileFromCache.getAbsolutePath());
context.assertIsSatisfied();
}
private ContentCache createCache()
@Test(invocationCount = 1, invocationTimeOut = 10000)
public void testGetInputStreamGetFileForSameContentInTwoThreads() throws Exception
{
final File workSpace = new File(".");
final DataSetPathInfo pathInfo = new DataSetPathInfo();
pathInfo.setRelativePath(remoteFile1.getName());
pathInfo.setDirectory(false);
ConsoleLogger logger = new ConsoleLogger();
final MessageChannel channel1 =
new MessageChannelBuilder(10000).name("21").logger(logger).getChannel();
final MessageChannel channel2 =
new MessageChannelBuilder(10000).name("22").logger(logger).getChannel();
final MessageChannel channel3 =
new MessageChannelBuilder(10000).name("3").logger(logger).getChannel();
final ContentCache cache = createCache();
context.checking(new Expectations()
{
{
one(fileOperations).removeRecursivelyQueueing(
new File(workSpace, ContentCache.DOWNLOADING_FOLDER));
one(remoteDss).getDownloadUrlForFileForDataSet(SESSION_TOKEN, DATA_SET_CODE,
pathInfo.getRelativePath());
will(new ProxyAction(returnValue(remoteFile1.toURI().toURL().toString()))
{
@Override
protected void doBeforeReturn()
{
channel1.send(STARTED_MESSAGE);
channel2.assertNextMessage(STARTED_MESSAGE);
}
});
}
});
return new ContentCache(null, workSpace, fileOperations, timeProvider);
File fileInCache =
new File(workSpace, ContentCache.CACHE_FOLDER + "/" + DATA_SET_CODE + "/"
+ remoteFile1.getName());
assertEquals(false, fileInCache.exists());
new Thread(new Runnable()
{
@Override
public void run()
{
InputStream inputStream =
cache.getInputStream(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo);
channel1.send(readContent(inputStream, true));
}
}, "thread1").start();
new Thread(new Runnable()
{
@Override
public void run()
{
channel1.assertNextMessage(STARTED_MESSAGE);
channel2.send(STARTED_MESSAGE);
File file = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo);
channel1.assertNextMessage(FILE1_CONTENT);
channel2.send(FileUtilities.loadToString(file).trim());
channel3.send(FINISHED_MESSAGE);
}
}, "thread2").start();
channel3.assertNextMessage(FINISHED_MESSAGE);
channel2.assertNextMessage(FILE1_CONTENT);
assertEquals(FILE1_CONTENT, FileUtilities.loadToString(fileInCache).trim());
File fileFromCache = cache.getFile(SESSION_TOKEN, DATA_SET_LOCATION, pathInfo);
assertEquals(fileInCache.getAbsolutePath(), fileFromCache.getAbsolutePath());
context.assertIsSatisfied();
}
private String readContent(InputStream inputStream, boolean closeStream)
{
try
{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
IOUtils.copy(inputStream, outputStream);
return outputStream.toString();
} catch (IOException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
} finally
{
if (closeStream)
{
IOUtils.closeQuietly(inputStream);
}
}
}
private DataSetPathInfo prepareForDownloading(final File remoteFile)
{
final DataSetPathInfo pathInfo = new DataSetPathInfo();
pathInfo.setRelativePath(remoteFile.getName());
pathInfo.setDirectory(false);
context.checking(new Expectations()
{
{
one(remoteDss).getDownloadUrlForFileForDataSet(SESSION_TOKEN, DATA_SET_CODE,
pathInfo.getRelativePath());
try
{
will(returnValue(remoteFile.toURI().toURL().toString()));
} catch (MalformedURLException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
}
});
return pathInfo;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment