Skip to content
Snippets Groups Projects
Commit 1ef40e36 authored by vkovtun's avatar vkovtun
Browse files

SSDM-13253 Moved the logic to already existing facade class.

parent 8608b975
No related branches found
No related tags found
1 merge request!40SSDM-13578 : 2PT : Database and V3 Implementation - include the new AFS "free"...
......@@ -89,9 +89,13 @@ public class SessionWorkspaceFileUploadServlet extends HttpServlet
uploadRequest.validate();
final String uploadId = uploadRequest.getUploadId();
final String filePath = (uploadId != null)
? uploadId + "/" + uploadRequest.getFileName()
: uploadRequest.getFileName();
long bytes =
service.putFileSliceToSessionWorkspace(uploadRequest.getSessionId(),
uploadRequest.getFileName(), uploadRequest.getStartByte(),
filePath, uploadRequest.getStartByte(),
uploadRequest.getFile());
Map<String, Object> resultMap = new HashMap<String, Object>();
......@@ -99,7 +103,6 @@ public class SessionWorkspaceFileUploadServlet extends HttpServlet
resultMap.put(START_BYTE_PARAM, uploadRequest.getStartByte());
resultMap.put(END_BYTE_PARAM, uploadRequest.getEndByte());
resultMap.put(FILE_NAME_PARAM, uploadRequest.getFileName());
resultMap.put(UPLOAD_ID_PARAM, uploadRequest.getUploadId());
resultMap.put(SIZE_PARAM, bytes);
resultMap.put(STATUS_PARAM, "ok");
......
......@@ -4,29 +4,45 @@ import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi;
import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.id.DataSetPermId;
import ch.ethz.sis.openbis.generic.dssapi.v3.IDataStoreServerApi;
import ch.ethz.sis.openbis.generic.dssapi.v3.dto.dataset.create.UploadedDataSetCreation;
import ch.systemsx.cisd.common.exceptions.UserFailureException;
import ch.systemsx.cisd.common.http.JettyHttpClientFactory;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.spring.HttpInvokerUtils;
import java.io.File;
import java.net.URI;
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
public class OpenBISAPI {
private static final Logger OPERATION_LOG = LogFactory.getLogger(LogCategory.OPERATION, OpenBISAPI.class);
private static final int DEFAULT_TIMEOUT_IN_MILLIS = 30000; //30 seconds
private static final int CHUNK_SIZE = 1048576;
private static final int CHUNK_SIZE = 1048576; // 1 MiB
private static final Collection<Integer> ACCEPTABLE_STATUSES = List.of(200, 502);
private final IApplicationServerApi asFacade;
......@@ -40,11 +56,13 @@ public class OpenBISAPI {
private final String dssURL;
public OpenBISAPI(final String asURL, final String dssURL) {
public OpenBISAPI(final String asURL, final String dssURL)
{
this(asURL, dssURL, DEFAULT_TIMEOUT_IN_MILLIS);
}
public OpenBISAPI(final String asURL, final String dssURL, final int timeout) {
public OpenBISAPI(final String asURL, final String dssURL, final int timeout)
{
this.timeout = timeout;
this.asURL = asURL;
asFacade = HttpInvokerUtils.createServiceStub(IApplicationServerApi.class, this.asURL, timeout);
......@@ -52,101 +70,168 @@ public class OpenBISAPI {
dssFacade = HttpInvokerUtils.createServiceStub(IDataStoreServerApi.class, this.dssURL, timeout);
}
public String getSessionToken() {
public String getSessionToken()
{
return sessionToken;
}
public void setSessionToken(final String sessionToken) {
public void setSessionToken(final String sessionToken)
{
this.sessionToken = sessionToken;
}
private List<File> contentOf(final File item)
{
if (item.getName().startsWith("."))
{
return Collections.emptyList();
} else if (item.isFile())
{
return Collections.singletonList(item);
} else
{
return Arrays.stream(Objects.requireNonNull(item.listFiles()))
.flatMap(file -> contentOf(file).stream())
.collect(Collectors.toList());
}
}
@SuppressWarnings("resource")
private Iterable<byte[]> streamFile(final File file, final int chunkSize) throws FileNotFoundException
{
final InputStream inputStream = new FileInputStream(file);
return () -> new Iterator<>()
{
public boolean hasMore = true;
public boolean hasNext()
{
return hasMore;
}
public byte[] next()
{
try
{
byte[] bytes = inputStream.readNBytes(chunkSize);
if (bytes.length < chunkSize)
{
hasMore = false;
inputStream.close();
}
return bytes;
} catch (final IOException e)
{
try
{
inputStream.close();
} catch (final IOException ex)
{
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
};
}
/**
* Upload file or folder to the DSS SessionWorkspaceFileUploadServlet and return the ID to be used by createUploadedDataSet
* This method hides the complexities of uploading a folder with many files and does the uploads in chunks.
*/
public String uploadFileWorkspaceDSS(final Path fileOrFolder) {
public String uploadFileWorkspaceDSS(final Path fileOrFolder)
{
// final ServiceFinder serviceFinder = new ServiceFinder("openbis", "session_workspace_file_upload");
// serviceFinder.computeServerUrl()
Objects.requireNonNull(sessionToken);
try
final List<File> content = contentOf(fileOrFolder.toFile());
if (content.isEmpty())
{
final File file = fileOrFolder.toFile();
throw new RuntimeException("The directory " + fileOrFolder + " is empty");
}
if (!file.isFile())
final long totalSize = content.stream().reduce(0L, (size, f) ->
{
try
{
throw new UserFailureException("File must be a file for now.");
return size + Files.size(f.toPath());
} catch (IOException e)
{
throw new RuntimeException(e);
}
}, Long::sum);
final String fileName = file.getName();
final String response = request("POST", new URI(dssURL + "/session_workspace_file_upload"),
Map.of(
"sessionID", sessionToken,
"filename", fileName,
"id", "0",
"startByte", "0",
"endByte", String.valueOf(file.length()),
"size", String.valueOf(file.length())
), Files.readAllBytes(file.toPath()));
System.out.println(response);
@SuppressWarnings("unchecked")
final Map<String, String> values = new ObjectMapper().readValue(response, Map.class);
return values.get("uploadID");
} catch (Exception e)
final String uploadId = UUID.randomUUID().toString();
final org.eclipse.jetty.client.HttpClient client = JettyHttpClientFactory.getHttpClient();
int id = 1;
try
{
for (final File file : content)
{
final String fileName = file.getName();
final long fileSize = file.length();
final String prefix = "(" + id + "/" + content.size() + ") " + fileName + ": ";
final long size = Files.size(file.toPath());
final long totalChunks = (size / CHUNK_SIZE) + (size % CHUNK_SIZE == 0 ? 0 : 1);
OPERATION_LOG.info(prefix + "Starting upload of " + size + " bytes");
long start = 0;
for (var chunk : streamFile(file, CHUNK_SIZE))
{
if (chunk.length == 0)
{
continue;
}
final long end = start + chunk.length;
int status = 0;
while (status != 200)
{
final ContentProvider contentProvider = new BytesContentProvider(chunk);
final Request httpRequest = client.newRequest(dssURL + "/session_workspace_file_upload")
.method(HttpMethod.POST);
httpRequest.param("sessionID", sessionToken);
httpRequest.param("id", Integer.toString(id++));
httpRequest.param("filename", fileName);
httpRequest.param("startByte", Long.toString(start));
httpRequest.param("endByte", Long.toString(end));
httpRequest.param("size", Long.toString(fileSize));
httpRequest.param("uploadID", uploadId);
httpRequest.content(contentProvider);
final ContentResponse response = httpRequest.send();
status = response.getStatus();
OPERATION_LOG.info(prefix + "Chunk " + (start / CHUNK_SIZE + 1) + "/" + totalChunks
+ " uploaded with status " + status);
if (!ACCEPTABLE_STATUSES.contains(status))
{
throw new IOException(response.getContentAsString());
}
}
start += CHUNK_SIZE;
}
OPERATION_LOG.info(prefix + "Upload complete");
}
} catch (final IOException | TimeoutException | InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
}
public DataSetPermId createUploadedDataSet(final UploadedDataSetCreation newDataSet) {
return dssFacade.createUploadedDataSet(sessionToken, newDataSet);
return uploadId;
}
@SuppressWarnings({ "OptionalGetWithoutIsPresent", "unchecked" })
private String request(final String httpMethod, final URI serverUri,
final Map<String, String> parameters, final byte [] body) throws Exception {
HttpClient client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofMillis(timeout))
.build();
final String query = parameters.entrySet().stream()
.map(entry -> urlEncode(entry.getKey()) + "=" + urlEncode(entry.getValue()))
.reduce((s1, s2) -> s1 + "&" + s2).get();
final URI uri = new URI(serverUri.getScheme(), null, serverUri.getHost(), serverUri.getPort(),
serverUri.getPath(), query, null);
final HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uri)
.version(HttpClient.Version.HTTP_1_1)
.timeout(Duration.ofMillis(timeout))
.method(httpMethod, HttpRequest.BodyPublishers.ofByteArray(body));
final HttpRequest request = builder.build();
final HttpResponse<String> httpResponse = client.send(request, HttpResponse.BodyHandlers.ofString());
final int statusCode = httpResponse.statusCode();
if (statusCode >= 200 && statusCode < 300) {
return httpResponse.body();
} else if (statusCode >= 400 && statusCode < 500) {
throw new UserFailureException("User failure. Received status code: " + statusCode + ". Body: " +
new String(httpResponse.body()));
} else if (statusCode >= 500 && statusCode < 600) {
throw new RuntimeException("Server failure. Received status code: " + statusCode);
} else {
throw new RuntimeException("Unknown failure. Received status code: " + statusCode);
}
}
private static String urlEncode(final String s) {
return URLEncoder.encode(s, StandardCharsets.UTF_8);
public DataSetPermId createUploadedDataSet(final UploadedDataSetCreation newDataSet)
{
// TODO: add to the facade the possibility to create a dataset.
return dssFacade.createUploadedDataSet(sessionToken, newDataSet);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment