From a2bc0bcb49918bcb71e6e1327d5d92b744446d4d Mon Sep 17 00:00:00 2001 From: alaskowski <alaskowski@ethz.ch> Date: Mon, 7 Aug 2023 15:10:39 +0200 Subject: [PATCH] SSDM-13838: Amending dataset upload to use fully v3 api scheme. --- .../src/python/pybis/dataset.py | 172 ++++++++++++++---- 1 file changed, 141 insertions(+), 31 deletions(-) diff --git a/api-openbis-python3-pybis/src/python/pybis/dataset.py b/api-openbis-python3-pybis/src/python/pybis/dataset.py index 9ba2f319b59..aa031d30f7c 100644 --- a/api-openbis-python3-pybis/src/python/pybis/dataset.py +++ b/api-openbis-python3-pybis/src/python/pybis/dataset.py @@ -17,7 +17,6 @@ import os import random import time import urllib.parse -import uuid import zipfile from functools import partialmethod from pathlib import Path @@ -51,6 +50,7 @@ from .utils import ( PYBIS_PLUGIN = "dataset-uploader-api" dataset_definitions = openbis_definitions("dataSet") dss_endpoint = "/datastore_server/rmi-data-store-server-v3.json" +session_workspace = "/datastore_server/session_workspace_file_upload" def signed_to_unsigned(sig_int): @@ -949,14 +949,11 @@ class DataSet( ) def _upload_v3(self, data_stores): - upload_id = str(uuid.uuid4()) datastore_url = data_stores["downloadUrl"][0] # for uploading phyiscal data, we first upload it to the session workspace - self.upload_files_v3( - upload_id=upload_id, + upload_id = self.upload_files_v3( datastore_url=datastore_url, files=self.files, - folder="", wait_until_finished=True, ) @@ -1143,22 +1140,34 @@ class DataSet( return self.files_in_wsp def upload_files_v3( - self, upload_id, datastore_url=None, files=None, folder=None, wait_until_finished=False + self, files, datastore_url=None, folder=None, wait_until_finished=False ): if datastore_url is None: datastore_url = self.openbis._get_dss_url() if files is None: raise ValueError("Please provide a filename.") - if folder is None: - # create a unique foldername - folder = time.strftime("%Y-%m-%d_%H-%M-%S") - if isinstance(files, str): files = [files] + upload_id = time.strftime("%Y-%m-%d_%H-%M-%S.%s") + if len(files) == 1: + if folder is None: + folder = upload_id + else: + folder = os.path.join(upload_id, folder) + + else: + if folder is None: + folder = os.path.join(upload_id, 'default') + else: + folder = os.path.join(upload_id, folder) + + if len(files) == 0: + raise ValueError("Please provide a filename.") + # define a queue to handle the upload threads - with DataSetUploadQueue(multipart=True) as queue: + with DataSetUploadQueueNew() as queue: real_files = [] for filename in files: @@ -1168,35 +1177,136 @@ class DataSet( path = os.path.relpath(root, pardir) for file in files: real_files.append((path, os.path.join(root, file))) + if not files: + # append empty folder + real_files.append((path, "")) else: real_files.append(("", os.path.join(filename))) # compose the upload-URL and put URL and filename in the upload queue - files_in_wsp = [] - for directory, filename in real_files: - file_in_wsp = os.path.join(folder, os.path.basename(filename)) - files_in_wsp += [file_in_wsp] - - fol = os.path.join(folder, directory) + for filename in real_files: + file_in_wsp = os.path.join(folder, filename[0], os.path.basename(filename[1])) + url_filename = os.path.join( + folder, filename[0], urllib.parse.quote(os.path.basename(filename[1])) + ) + self.files_in_wsp.append(file_in_wsp) - upload_url = ( - datastore_url + "/datastore_server/store_share_file_upload" - "?dataSetType={}" - "&folderPath={}" - "&ignoreFilePath={}" - "&uploadID={}" - "&sessionID={}" - ).format(self.type.code, fol, False, upload_id, self.openbis.token) - queue.put( - [upload_url, filename, - self.openbis.verify_certificates]) + is_empty_folder = filename[1] == '' + if is_empty_folder: + upload_url = (f'{datastore_url}{session_workspace}' + f'?filename={url_filename}' + f'&id={1}' + f'&startByte={0}&endByte={0}' + f'&emptyFolder={True}' + f'&sessionID={self.openbis.token}') + queue.put([upload_url, filename, self.openbis.verify_certificates, True, False, + []]) + else: + file_size = os.path.getsize(filename[1]) + count = 1 + size = 1024 * 1024 * 10 # 10MB + if file_size > size: + for i in range(0, file_size, size): + start_byte = i + end_byte = min(i + size-1, file_size) + upload_url = (f'{datastore_url}{session_workspace}' + f'?filename={url_filename}' + f'&id={count}' + f'&startByte={start_byte}&endByte={end_byte}' + f'&emptyFolder={False}' + f'&sessionID={self.openbis.token}') + queue.put([upload_url, filename, self.openbis.verify_certificates, False, True, + [start_byte, end_byte]]) + count += 1 + else: + upload_url = ( + datastore_url + + "/datastore_server/session_workspace_file_upload" + + "?filename=" + + url_filename + + "&id=" + + str(count) + + "&startByte=0&endByte=" + str(file_size) + + "&emptyFolder=False" + + "&sessionID=" + + self.openbis.token + ) + queue.put([upload_url, filename, self.openbis.verify_certificates, False, False, []]) # wait until all files have uploaded if wait_until_finished: queue.join() # return files with full path in session workspace - return files_in_wsp + return upload_id + + +class DataSetUploadQueueNew: + """Structure for uploading files to OpenBIS in separate threads. + It works as a queue where each item is a single file upload. It allows to upload files using v1 + and v3 api. V3 api uses multipart schema for file upload, whereas V1 api makes sue of the body""" + + def __init__(self, workers=20): + # maximum files to be uploaded at once + self.upload_queue = Queue() + self.workers = workers + + # define number of threads and start them + for t in range(workers): + t = Thread(target=self.upload_file) + t.start() + + def __enter__(self, *args, **kwargs): + return self + + def __exit__(self, *args, **kwargs): + """This method is called at the end of a with statement.""" + # stop the workers + for i in range(self.workers): + self.upload_queue.put(None) + + def put(self, things): + """expects a list [url, filename] which is put into the upload queue""" + self.upload_queue.put(things) + + def join(self): + """needs to be called if you want to wait for all uploads to be finished""" + # block until all tasks are done + self.upload_queue.join() + + def upload_file(self): + while True: + # get the next item in the queue + queue_item = self.upload_queue.get() + if queue_item is None: + # when we call the .join() method of the DataSetUploadQueue and empty the queue + break + upload_url, filename, verify_certificates, is_empty_folder, partial, bytes_range = queue_item + + # upload the file to our DSS session workspace + if is_empty_folder: + resp = requests.post(upload_url, verify=verify_certificates) + resp.raise_for_status() + else: + if partial: + with open(filename[1], "rb") as f: + f.seek(bytes_range[0]) + data = f.read(bytes_range[1] - bytes_range[0]) + resp = requests.post(upload_url, data=data, verify=verify_certificates) + resp.raise_for_status() + else: + file_size = os.path.getsize(filename[1]) + with open(filename[1], "rb") as f: + resp = requests.post(upload_url, data=f, verify=verify_certificates) + resp.raise_for_status() + data = resp.json() + if file_size != int(data["size"]): + raise ValueError( + f'size of file uploaded: {file_size} != data received: {int(data["size"])}' + ) + + # Tell the queue that we are done + self.upload_queue.task_done() class DataSetUploadQueue: @@ -1242,7 +1352,7 @@ class DataSetUploadQueue: break upload_url, filename, verify_certificates = queue_item - file_size = os.path.getsize(filename) + file_size = os.path.getsize(filename[1]) if self.multipart is True: file = {filename: open(filename, "rb")} @@ -1250,7 +1360,7 @@ class DataSetUploadQueue: resp.raise_for_status() else: # upload the file to our DSS session workspace - with open(filename, "rb") as f: + with open(filename[1], "rb") as f: resp = requests.post(upload_url, data=f, verify=verify_certificates) resp.raise_for_status() data = resp.json() -- GitLab