From 1ef40e36355fa07b600070be340eed1a616dbb79 Mon Sep 17 00:00:00 2001
From: vkovtun <viktor.kovtun@id.ethz.ch>
Date: Tue, 10 Jan 2023 14:23:41 +0100
Subject: [PATCH] SSDM-13253 Moved the logic to already existing facade class.

---
 .../SessionWorkspaceFileUploadServlet.java    |   7 +-
 .../ethz/sis/openbis/generic/OpenBISAPI.java  | 251 ++++++++++++------
 2 files changed, 173 insertions(+), 85 deletions(-)

diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/SessionWorkspaceFileUploadServlet.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/SessionWorkspaceFileUploadServlet.java
index 87ded2540f8..487fc3f94be 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/SessionWorkspaceFileUploadServlet.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/SessionWorkspaceFileUploadServlet.java
@@ -89,9 +89,13 @@ public class SessionWorkspaceFileUploadServlet extends HttpServlet
 
         uploadRequest.validate();
 
+        final String uploadId = uploadRequest.getUploadId();
+        final String filePath = (uploadId != null)
+                ? uploadId + "/" + uploadRequest.getFileName()
+                : uploadRequest.getFileName();
         long bytes =
                 service.putFileSliceToSessionWorkspace(uploadRequest.getSessionId(),
-                        uploadRequest.getFileName(), uploadRequest.getStartByte(),
+                        filePath, uploadRequest.getStartByte(),
                         uploadRequest.getFile());
 
         Map<String, Object> resultMap = new HashMap<String, Object>();
@@ -99,7 +103,6 @@ public class SessionWorkspaceFileUploadServlet extends HttpServlet
         resultMap.put(START_BYTE_PARAM, uploadRequest.getStartByte());
         resultMap.put(END_BYTE_PARAM, uploadRequest.getEndByte());
         resultMap.put(FILE_NAME_PARAM, uploadRequest.getFileName());
-        resultMap.put(UPLOAD_ID_PARAM, uploadRequest.getUploadId());
         resultMap.put(SIZE_PARAM, bytes);
         resultMap.put(STATUS_PARAM, "ok");
 
diff --git a/openbis_api/source/java/ch/ethz/sis/openbis/generic/OpenBISAPI.java b/openbis_api/source/java/ch/ethz/sis/openbis/generic/OpenBISAPI.java
index 5fadcf3663b..42aa29922ad 100644
--- a/openbis_api/source/java/ch/ethz/sis/openbis/generic/OpenBISAPI.java
+++ b/openbis_api/source/java/ch/ethz/sis/openbis/generic/OpenBISAPI.java
@@ -4,29 +4,45 @@ import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi;
 import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.id.DataSetPermId;
 import ch.ethz.sis.openbis.generic.dssapi.v3.IDataStoreServerApi;
 import ch.ethz.sis.openbis.generic.dssapi.v3.dto.dataset.create.UploadedDataSetCreation;
-import ch.systemsx.cisd.common.exceptions.UserFailureException;
+import ch.systemsx.cisd.common.http.JettyHttpClientFactory;
+import ch.systemsx.cisd.common.logging.LogCategory;
+import ch.systemsx.cisd.common.logging.LogFactory;
 import ch.systemsx.cisd.common.spring.HttpInvokerUtils;
 
 import java.io.File;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.nio.charset.StandardCharsets;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import org.apache.log4j.Logger;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
 
 public class OpenBISAPI {
 
+    private static final Logger OPERATION_LOG = LogFactory.getLogger(LogCategory.OPERATION, OpenBISAPI.class);
+
     private static final int DEFAULT_TIMEOUT_IN_MILLIS = 30000; //30 seconds
 
-    private static final int CHUNK_SIZE = 1048576;
+    private static final int CHUNK_SIZE = 1048576; // 1 MiB
+
+    private static final Collection<Integer> ACCEPTABLE_STATUSES = List.of(200, 502);
 
     private final IApplicationServerApi asFacade;
 
@@ -40,11 +56,13 @@ public class OpenBISAPI {
 
     private final String dssURL;
 
-    public OpenBISAPI(final String asURL, final String dssURL) {
+    public OpenBISAPI(final String asURL, final String dssURL)
+    {
         this(asURL, dssURL, DEFAULT_TIMEOUT_IN_MILLIS);
     }
 
-    public OpenBISAPI(final String asURL, final String dssURL, final int timeout) {
+    public OpenBISAPI(final String asURL, final String dssURL, final int timeout)
+    {
         this.timeout = timeout;
         this.asURL = asURL;
         asFacade = HttpInvokerUtils.createServiceStub(IApplicationServerApi.class, this.asURL, timeout);
@@ -52,101 +70,168 @@ public class OpenBISAPI {
         dssFacade = HttpInvokerUtils.createServiceStub(IDataStoreServerApi.class, this.dssURL, timeout);
     }
 
-    public String getSessionToken() {
+    public String getSessionToken()
+    {
         return sessionToken;
     }
 
-    public void setSessionToken(final String sessionToken) {
+    public void setSessionToken(final String sessionToken)
+    {
         this.sessionToken = sessionToken;
     }
 
+    private List<File> contentOf(final File item)
+    {
+        if (item.getName().startsWith("."))
+        {
+            return Collections.emptyList();
+        } else if (item.isFile())
+        {
+            return Collections.singletonList(item);
+        } else
+        {
+            return Arrays.stream(Objects.requireNonNull(item.listFiles()))
+                    .flatMap(file -> contentOf(file).stream())
+                    .collect(Collectors.toList());
+        }
+    }
+    @SuppressWarnings("resource")
+    private Iterable<byte[]> streamFile(final File file, final int chunkSize) throws FileNotFoundException
+    {
+        final InputStream inputStream = new FileInputStream(file);
+
+        return () -> new Iterator<>()
+        {
+            public boolean hasMore = true;
+
+            public boolean hasNext()
+            {
+                return hasMore;
+            }
+
+            public byte[] next()
+            {
+                try
+                {
+                    byte[] bytes = inputStream.readNBytes(chunkSize);
+                    if (bytes.length < chunkSize)
+                    {
+                        hasMore = false;
+                        inputStream.close();
+                    }
+                    return bytes;
+                } catch (final IOException e)
+                {
+                    try
+                    {
+                        inputStream.close();
+                    } catch (final IOException ex)
+                    {
+                        throw new RuntimeException(ex);
+                    }
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+
+    }
+
+
+
     /**
      * Upload file or folder to the DSS SessionWorkspaceFileUploadServlet and return the ID to be used by createUploadedDataSet
      * This method hides the complexities of uploading a folder with many files and does the uploads in chunks.
      */
-    public String uploadFileWorkspaceDSS(final Path fileOrFolder) {
+    public String uploadFileWorkspaceDSS(final Path fileOrFolder)
+    {
         //        final ServiceFinder serviceFinder = new ServiceFinder("openbis", "session_workspace_file_upload");
         //        serviceFinder.computeServerUrl()
 
         Objects.requireNonNull(sessionToken);
 
-        try
+        final List<File> content = contentOf(fileOrFolder.toFile());
+        if (content.isEmpty())
         {
-            final File file = fileOrFolder.toFile();
+            throw new RuntimeException("The directory " + fileOrFolder + " is empty");
+        }
 
-            if (!file.isFile())
+        final long totalSize = content.stream().reduce(0L, (size, f) ->
+        {
+            try
             {
-                throw new UserFailureException("File must be a file for now.");
+                return size + Files.size(f.toPath());
+            } catch (IOException e)
+            {
+                throw new RuntimeException(e);
             }
+        }, Long::sum);
 
-            final String fileName = file.getName();
-
-            final String response = request("POST", new URI(dssURL + "/session_workspace_file_upload"),
-                    Map.of(
-                            "sessionID", sessionToken,
-                            "filename", fileName,
-                            "id", "0",
-                            "startByte", "0",
-                            "endByte", String.valueOf(file.length()),
-                            "size", String.valueOf(file.length())
-                    ), Files.readAllBytes(file.toPath()));
-            System.out.println(response);
-
-            @SuppressWarnings("unchecked")
-            final Map<String, String> values = new ObjectMapper().readValue(response, Map.class);
-
-            return values.get("uploadID");
-        } catch (Exception e)
+        final String uploadId = UUID.randomUUID().toString();
+        final org.eclipse.jetty.client.HttpClient client = JettyHttpClientFactory.getHttpClient();
+        int id = 1;
+        try
+        {
+            for (final File file : content)
+            {
+                final String fileName = file.getName();
+                final long fileSize = file.length();
+                final String prefix = "(" + id + "/" + content.size() + ") " + fileName + ": ";
+                final long size = Files.size(file.toPath());
+                final long totalChunks = (size / CHUNK_SIZE) + (size % CHUNK_SIZE == 0 ? 0 : 1);
+
+                OPERATION_LOG.info(prefix + "Starting upload of " + size + " bytes");
+
+                long start = 0;
+                for (var chunk : streamFile(file, CHUNK_SIZE))
+                {
+                    if (chunk.length == 0)
+                    {
+                        continue;
+                    }
+                    final long end = start + chunk.length;
+
+                    int status = 0;
+                    while (status != 200)
+                    {
+                        final ContentProvider contentProvider = new BytesContentProvider(chunk);
+
+                        final Request httpRequest = client.newRequest(dssURL + "/session_workspace_file_upload")
+                                .method(HttpMethod.POST);
+                        httpRequest.param("sessionID", sessionToken);
+                        httpRequest.param("id", Integer.toString(id++));
+                        httpRequest.param("filename", fileName);
+                        httpRequest.param("startByte", Long.toString(start));
+                        httpRequest.param("endByte", Long.toString(end));
+                        httpRequest.param("size", Long.toString(fileSize));
+                        httpRequest.param("uploadID", uploadId);
+                        httpRequest.content(contentProvider);
+                        final ContentResponse response = httpRequest.send();
+
+                        status = response.getStatus();
+                        OPERATION_LOG.info(prefix + "Chunk " + (start / CHUNK_SIZE + 1) + "/" + totalChunks
+                                + " uploaded with status " + status);
+
+                        if (!ACCEPTABLE_STATUSES.contains(status))
+                        {
+                            throw new IOException(response.getContentAsString());
+                        }
+                    }
+                    start += CHUNK_SIZE;
+                }
+                OPERATION_LOG.info(prefix + "Upload complete");
+            }
+        } catch (final IOException | TimeoutException | InterruptedException | ExecutionException e)
         {
             throw new RuntimeException(e);
         }
-    }
 
-    public DataSetPermId createUploadedDataSet(final UploadedDataSetCreation newDataSet) {
-        return dssFacade.createUploadedDataSet(sessionToken, newDataSet);
+        return uploadId;
     }
 
-    @SuppressWarnings({ "OptionalGetWithoutIsPresent", "unchecked" })
-    private String request(final String httpMethod, final URI serverUri,
-            final Map<String, String> parameters, final byte [] body) throws Exception {
-        HttpClient client = HttpClient.newBuilder()
-                .version(HttpClient.Version.HTTP_1_1)
-                .followRedirects(HttpClient.Redirect.NORMAL)
-                .connectTimeout(Duration.ofMillis(timeout))
-                .build();
-
-        final String query = parameters.entrySet().stream()
-                .map(entry -> urlEncode(entry.getKey()) + "=" + urlEncode(entry.getValue()))
-                .reduce((s1, s2) -> s1 + "&" + s2).get();
-
-        final URI uri = new URI(serverUri.getScheme(), null, serverUri.getHost(), serverUri.getPort(),
-                serverUri.getPath(), query, null);
-
-        final HttpRequest.Builder builder = HttpRequest.newBuilder()
-                .uri(uri)
-                .version(HttpClient.Version.HTTP_1_1)
-                .timeout(Duration.ofMillis(timeout))
-                .method(httpMethod, HttpRequest.BodyPublishers.ofByteArray(body));
-
-        final HttpRequest request = builder.build();
-
-        final HttpResponse<String> httpResponse = client.send(request, HttpResponse.BodyHandlers.ofString());
-
-        final int statusCode = httpResponse.statusCode();
-        if (statusCode >= 200 && statusCode < 300) {
-            return httpResponse.body();
-        } else if (statusCode >= 400 && statusCode < 500) {
-            throw new UserFailureException("User failure. Received status code: " + statusCode + ". Body: " +
-                    new String(httpResponse.body()));
-        } else if (statusCode >= 500 && statusCode < 600) {
-            throw new RuntimeException("Server failure. Received status code: " + statusCode);
-        } else {
-            throw new RuntimeException("Unknown failure. Received status code: " + statusCode);
-        }
-    }
-
-    private static String urlEncode(final String s) {
-        return URLEncoder.encode(s, StandardCharsets.UTF_8);
+    public DataSetPermId createUploadedDataSet(final UploadedDataSetCreation newDataSet)
+    {
+        // TODO: add to the facade the possibility to create a dataset.
+        return dssFacade.createUploadedDataSet(sessionToken, newDataSet);
     }
 
 }
-- 
GitLab