diff --git a/api-openbis-python3-pybis/src/python/pybis/dataset.py b/api-openbis-python3-pybis/src/python/pybis/dataset.py index d2d08cb9945b55909cecb459158befe178d1870c..d6fdbd81d4376f476b5f26bb3ab917fbf458aab0 100644 --- a/api-openbis-python3-pybis/src/python/pybis/dataset.py +++ b/api-openbis-python3-pybis/src/python/pybis/dataset.py @@ -17,6 +17,7 @@ import os import random import time import urllib.parse +import uuid import zipfile from functools import partialmethod from pathlib import Path @@ -850,7 +851,7 @@ class DataSet( ) if self.is_new: - datastores = self.openbis.get_datastores() + data_stores = self.openbis.get_datastores() if self.sample is None and self.experiment is None: raise ValueError( @@ -862,46 +863,10 @@ class DataSet( raise ValueError( "Cannot register a dataset without a file. Please provide at least one file" ) + if self.openbis.get_server_information().is_version_greater_than(3, 5): + return self._upload_v3(data_stores) - # for uploading phyiscal data, we first upload it to the session workspace - self.upload_files( - datastore_url=datastores["downloadUrl"][0], - files=self.files, - folder="", - wait_until_finished=True, - ) - - # activate the ingestion plugin, as soon as the data is uploaded - # this will actually register the dataset in the datastore and the AS - request = self._generate_plugin_request( - dss=datastores["code"][0], - permId=permId, - ) - resp = self.openbis._post_request(self.openbis.reg_v1, request) - if resp["rows"][0][0]["value"] == "OK": - permId = resp["rows"][0][2]["value"] - if permId is None or permId == "": - self.__dict__["is_new"] = False - if VERBOSE: - print( - "DataSet successfully created. Because you connected to an openBIS version older than 16.05.04, you cannot update the object." - ) - else: - new_dataset_data = self.openbis.get_dataset( - permId, only_data=True - ) - self._set_data(new_dataset_data) - if VERBOSE: - print("DataSet successfully created.") - return self - else: - import json - - print(json.dumps(request)) - raise ValueError( - "Error while creating the DataSet: " - + resp["rows"][0][1]["value"] - ) + return self._upload_v1(permId, data_stores) # CONTAINER else: if self.files is not None and len(self.files) > 0: @@ -919,7 +884,7 @@ class DataSet( request["params"][1][0]["autoGeneratedCode"] = False props = self.p._all_props() - DSpermId = datastores["code"][0] + DSpermId = data_stores["code"][0] request["params"][1][0]["properties"] = props request["params"][1][0]["dataStoreId"] = { "permId": DSpermId, @@ -935,7 +900,7 @@ class DataSet( self._set_data(new_dataset_data) return self - # updating the DataSEt + # updating the DataSET else: request = self._up_attrs() props = self.p._all_props() @@ -945,6 +910,124 @@ class DataSet( if VERBOSE: print("DataSet successfully updated.") + def _upload_v1(self, permId, datastores): + # for uploading phyiscal data, we first upload it to the session workspace + self.upload_files_v1( + datastore_url=datastores["downloadUrl"][0], + files=self.files, + folder="", + wait_until_finished=True, + ) + + # activate the ingestion plugin, as soon as the data is uploaded + # this will actually register the dataset in the datastore and the AS + request = self._generate_plugin_request( + dss=datastores["code"][0], + permId=permId, + ) + resp = self.openbis._post_request(self.openbis.reg_v1, request) + if resp["rows"][0][0]["value"] == "OK": + permId = resp["rows"][0][2]["value"] + if permId is None or permId == "": + self.__dict__["is_new"] = False + if VERBOSE: + print( + "DataSet successfully created. Because you connected to an openBIS version older than 16.05.04, you cannot update the object." + ) + else: + new_dataset_data = self.openbis.get_dataset( + permId, only_data=True + ) + self._set_data(new_dataset_data) + if VERBOSE: + print("DataSet successfully created.") + return self + else: + print(json.dumps(request)) + raise ValueError( + "Error while creating the DataSet: " + + resp["rows"][0][1]["value"] + ) + + def _upload_v3(self, data_stores): + upload_id = str(uuid.uuid4()) + + # for uploading phyiscal data, we first upload it to the session workspace + self.upload_files_v3( + upload_id=upload_id, + datastore_url=data_stores["downloadUrl"][0], + files=self.files, + folder="", + wait_until_finished=True, + ) + + param = { + "@type": "dss.dto.dataset.create.UploadedDataSetCreation", + "@id": "1", + "typeId": { + "@type": "as.dto.entitytype.id.EntityTypePermId", + "@id": "2", + "permId": self.type.code, + "entityKind": "DATA_SET"}, + + "properties": self.props.all_nonempty(), + "parentIds": [], + "uploadId": upload_id + } + + if self.experiment is not None: + param["experimentId"] = { + "@type": "as.dto.experiment.id.ExperimentIdentifier", + "@id": "3", + "identifier": self.experiment.identifier + } + if self.sample is not None: + param["sampleId"] = { + "@type": "as.dto.sample.id.SamplePermId", + "@id": "4", + "permId": self.sample.permId + } + parent_ids = self.parents + if parent_ids is None: + parent_ids = [] + counter = 5 + for parent_id in parent_ids: + param["parentIds"] += { + "@type": "as.dto.dataset.id.DataSetPermId", + "@id": str(counter), + "permId": parent_id + } + counter += 1 + + request = { + "method": "createUploadedDataSet", + "params": [self.openbis.token, param] + } + + resp = self.openbis._post_request(self.openbis.dss_v3, request) + if "permId" in resp: + permId = resp["permId"] + if permId is None or permId == "": + self.__dict__["is_new"] = False + if VERBOSE: + print( + "DataSet successfully created. Because you connected to an openBIS version older than 16.05.04, you cannot update the object." + ) + else: + new_dataset_data = self.openbis.get_dataset( + permId, only_data=True + ) + self._set_data(new_dataset_data) + if VERBOSE: + print("DataSet successfully created.") + return self + else: + print(json.dumps(request)) + raise ValueError( + "Error while creating the DataSet: " + + resp["rows"][0][1]["value"] + ) + def zipit(self, file_or_folder, zipf): """Takes a directory or a file, and a zipfile instance. For every file that is encountered, we issue the write() method to add that file to the zipfile. @@ -972,13 +1055,12 @@ class DataSet( os.path.join(realpath[len(head) + 1:], filename), ) - def upload_files( + def upload_files_v1( self, datastore_url=None, files=None, folder=None, wait_until_finished=False ): if datastore_url is None: datastore_url = self.openbis._get_dss_url() - if files is None: raise ValueError("Please provide a filename.") @@ -1058,12 +1140,72 @@ class DataSet( # return files with full path in session workspace return self.files_in_wsp + def upload_files_v3( + self, upload_id, datastore_url=None, files=None, folder=None, wait_until_finished=False + ): + if datastore_url is None: + datastore_url = self.openbis._get_dss_url() + if files is None: + raise ValueError("Please provide a filename.") + + if folder is None: + # create a unique foldername + folder = time.strftime("%Y-%m-%d_%H-%M-%S") + + if isinstance(files, str): + files = [files] + + # define a queue to handle the upload threads + with DataSetUploadQueue(multipart=True) as queue: + + real_files = [] + for filename in files: + if os.path.isdir(filename): + pardir = os.path.join(filename, os.pardir) + for root, dirs, files in os.walk(os.path.expanduser(filename)): + path = os.path.relpath(root, pardir) + for file in files: + real_files.append((path, os.path.join(root, file))) + else: + real_files.append(("", os.path.join(filename))) + + # compose the upload-URL and put URL and filename in the upload queue + files_in_wsp = [] + for directory, filename in real_files: + file_in_wsp = os.path.join(folder, os.path.basename(filename)) + files_in_wsp += [file_in_wsp] + + fol = os.path.join(folder, directory) + + upload_url = ( + datastore_url + "/datastore_server/store_share_file_upload" + "?dataSetType={}" + "&folderPath={}" + "&ignoreFilePath={}" + "&uploadID={}" + "&sessionID={}" + ).format(self.type.code, fol, False, upload_id, self.openbis.token) + queue.put( + [upload_url, filename, + self.openbis.verify_certificates]) + + # wait until all files have uploaded + if wait_until_finished: + queue.join() + + # return files with full path in session workspace + return files_in_wsp + class DataSetUploadQueue: - def __init__(self, workers=20): + """Structure for uploading files to OpenBIS in separate threads. + It works as a queue where each item is a single file upload. It allows to upload files using v1 + and v3 api. V3 api uses multipart schema for file upload, whereas V1 api makes sue of the body""" + def __init__(self, workers=20, multipart=False): # maximum files to be uploaded at once self.upload_queue = Queue() self.workers = workers + self.multipart = multipart # define number of threads and start them for t in range(workers): @@ -1097,17 +1239,22 @@ class DataSetUploadQueue: break upload_url, filename, verify_certificates = queue_item - filesize = os.path.getsize(filename) + file_size = os.path.getsize(filename) - # upload the file to our DSS session workspace - with open(filename, "rb") as f: - resp = requests.post(upload_url, data=f, verify=verify_certificates) + if self.multipart is True: + file = {filename: open(filename, "rb")} + resp = requests.post(upload_url, files=file, verify=verify_certificates) resp.raise_for_status() - data = resp.json() - if filesize != int(data["size"]): - raise ValueError( - f'size of file uploaded: {filesize} != data received: {int(data["size"])}' - ) + else: + # upload the file to our DSS session workspace + with open(filename, "rb") as f: + resp = requests.post(upload_url, data=f, verify=verify_certificates) + resp.raise_for_status() + data = resp.json() + if file_size != int(data["size"]): + raise ValueError( + f'size of file uploaded: {file_size} != data received: {int(data["size"])}' + ) # Tell the queue that we are done self.upload_queue.task_done() @@ -1117,6 +1264,7 @@ class ZipBuffer(object): """A file-like object for zipfile.ZipFile to write into. zipfile invokes the write method to store its zipped content. We will send this content directly to the session_workspace as a POST request. + Used by V1 API only. """ def __init__(self, openbis_obj, host, filename): @@ -1165,6 +1313,7 @@ class ZipBuffer(object): class DataSetDownloadQueue: + """Special queue structure for multithreaded downloading files using V1 API.""" def __init__(self, workers=20, collect_files_with_wrong_length=False): self.collect_files_with_wrong_length = collect_files_with_wrong_length # maximum files to be downloaded at once diff --git a/api-openbis-python3-pybis/src/python/pybis/pybis.py b/api-openbis-python3-pybis/src/python/pybis/pybis.py index 1d1f13cd2be4f06ccfe63d2caf56892abd3db772..a48ba586b9a59bda23525c47b79d161dadf9f91e 100644 --- a/api-openbis-python3-pybis/src/python/pybis/pybis.py +++ b/api-openbis-python3-pybis/src/python/pybis/pybis.py @@ -979,6 +979,7 @@ class Openbis: self.as_v3 = "/openbis/openbis/rmi-application-server-v3.json" self.as_v1 = "/openbis/openbis/rmi-general-information-v1.json" self.reg_v1 = "/openbis/openbis/rmi-query-v1.json" + self.dss_v3 = "/datastore_server/rmi-data-store-server-v3.json" self.verify_certificates = verify_certificates if not verify_certificates: urllib3.disable_warnings() @@ -4300,7 +4301,7 @@ class Openbis: fetch_options["propertyAssignments"] = get_fetchoption_for_entity( "propertyAssignments" ) - if self.get_server_information().api_version > "3.3": + if self.get_server_information().is_version_greater_than(3, 3): fetch_options["validationPlugin"] = get_fetchoption_for_entity("plugin") request = { @@ -5412,6 +5413,12 @@ class ServerInformation: def is_openbis_1806(self): return (self.get_major_version() == 3) and (self.get_minor_version() >= 5) + def is_version_greater_than(self, major: int, minor: int): + """Checks if server api version is greater than provided""" + current_major = self.get_major_version() + current_minor = self.get_minor_version() + return (current_major == major and current_minor > minor) or current_major > major + def _repr_html_(self): html = """ <table border="1" class="dataframe"> diff --git a/api-openbis-python3-pybis/src/python/tests/test_dataset.py b/api-openbis-python3-pybis/src/python/tests/test_dataset.py index deb8d621ed1f6c2a24d2b23ee80c4dd485d3a18b..616833729b97569d1e6f0d562b14761415d10e9e 100644 --- a/api-openbis-python3-pybis/src/python/tests/test_dataset.py +++ b/api-openbis-python3-pybis/src/python/tests/test_dataset.py @@ -12,15 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from pybis.things import Things - -import json -import random -import re import os +import re +import time import pytest -import time +from pybis.things import Things def test_get_datasets(space): @@ -83,11 +80,11 @@ def test_create_delete_dataset(space): assert dataset_by_permId.registrationDate is not None # check date format: 2019-03-22 11:36:40 assert ( - re.search( - r"^\d{4}\-\d{2}\-\d{2} \d{2}\:\d{2}\:\d{2}$", - dataset_by_permId.registrationDate, - ) - is not None + re.search( + r"^\d{4}\-\d{2}\-\d{2} \d{2}\:\d{2}\:\d{2}$", + dataset_by_permId.registrationDate, + ) + is not None ) # delete datasets @@ -125,6 +122,7 @@ def test_create_dataset_with_code(space): dataset.delete("dataset creation test on {}".format(timestamp)) + def test_things_initialization(space): data_frame_result = [1, 2, 3] objects_result = [4, 5, 6] @@ -136,9 +134,9 @@ def test_things_initialization(space): return objects_result things = Things( - openbis_obj = None, - entity = 'dataset', - identifier_name = 'permId', + openbis_obj=None, + entity='dataset', + identifier_name='permId', start_with=0, count=10, totalCount=10, @@ -154,4 +152,60 @@ def test_things_initialization(space): assert things.objects == objects_result assert things.is_df_initialised() - assert things.is_objects_initialised() \ No newline at end of file + assert things.is_objects_initialised() + + +def test_create_new_dataset_v1(space): + """Create dataset and upload file using upload scheme from before 3.6 api version""" + openbis_instance = space.openbis + + testfile_path = os.path.join(os.path.dirname(__file__), "testdir/testfile") + + # It is a hack to force old way of upload for testing. + openbis_instance.get_server_information()._info["api-version"] = "3.5" + + dataset = openbis_instance.new_dataset( + type="RAW_DATA", + experiment="/DEFAULT/DEFAULT/DEFAULT", + files=[testfile_path], + props={"$name": "some good name"}, + ) + dataset.save() + + assert dataset.permId is not None + assert dataset.file_list == ["original/testfile"] + + +def test_create_new_dataset_v3_single_file(space): + openbis_instance = space.openbis + + testfile_path = os.path.join(os.path.dirname(__file__), "testdir/testfile") + + dataset = openbis_instance.new_dataset( + type="RAW_DATA", + experiment="/DEFAULT/DEFAULT/DEFAULT", + files=[testfile_path], + props={"$name": "some good name"}, + ) + dataset.save() + + assert dataset.permId is not None + assert dataset.file_list == ["original/testfile"] + + +def test_create_new_dataset_v3_directory(space): + openbis_instance = space.openbis + + testfile_path = os.path.join(os.path.dirname(__file__), "testdir") + + dataset = openbis_instance.new_dataset( + type="RAW_DATA", + experiment="/DEFAULT/DEFAULT/DEFAULT", + files=[testfile_path], + props={"$name": "some good name"}, + ) + dataset.save() + + assert dataset.permId is not None + assert dataset.file_list == ["testdir/testfile"] + diff --git a/api-openbis-python3-pybis/src/python/tests/testfile b/api-openbis-python3-pybis/src/python/tests/testdir/testfile similarity index 100% rename from api-openbis-python3-pybis/src/python/tests/testfile rename to api-openbis-python3-pybis/src/python/tests/testdir/testfile