Skip to content
Snippets Groups Projects
pybis.py 42.2 KiB
Newer Older
  • Learn to ignore specific revisions
  •     def put(self, things):
            """ expects a list [url, filename] which is put into the upload queue
            """
            self.upload_queue.put(things)
    
    
        def join(self):
            """ needs to be called if you want to wait for all uploads to be finished
            """
            self.upload_queue.join()
    
    
        def upload_file(self):
            while True:
                # get the next item in the queue
                upload_url, filename, verify_certificates = self.upload_queue.get()
    
                # upload the file to our DSS session workspace
                with open(filename, 'rb') as f:
                    resp = requests.post(upload_url, data=f, verify=verify_certificates)
                    resp.raise_for_status()
    
                # Tell the queue that we are done
                self.upload_queue.task_done()
    
    class DataSetDownloadQueue:
        
        def __init__(self, workers=20):
            # maximum files to be downloaded at once
            self.download_queue = Queue()
    
            # define number of threads
            for t in range(workers):
                t = Thread(target=self.download_file)
                t.daemon = True
                t.start()
    
    
        def put(self, things):
            """ expects a list [url, filename] which is put into the download queue
            """
            self.download_queue.put(things)
    
    
        def join(self):
            """ needs to be called if you want to wait for all downloads to be finished
            """
            self.download_queue.join()
    
    
        def download_file(self):
            while True:
    
                url, filename, verify_certificates = self.download_queue.get()
    
                # create the necessary directory structure if they don't exist yet
                os.makedirs(os.path.dirname(filename), exist_ok=True)
    
                # request the file in streaming mode
    
                r = requests.get(url, stream=True, verify=verify_certificates)
    
                with open(filename, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=1024): 
                        if chunk: # filter out keep-alive new chunks
                            f.write(chunk)
    
                self.download_queue.task_done()
    
    
    
        """ DataSet are openBIS objects that contain the actual files.
        """
    
    
        def __init__(self, openbis_obj, permid, data):
            self.openbis = openbis_obj
            self.permid  = permid
            self.data    = data
    
            self.v1_ds = '/datastore_server/rmi-dss-api-v1.json'
    
            self.downloadUrl = self.data['dataStore']['downloadUrl']
    
    
        def download(self, files=None, wait_until_finished=False, workers=10):
    
            """ download the actual files and put them by default in the following folder:
    
            If no files are specified, all files of a given dataset are downloaded.
            Files are usually downloaded in parallel, using 10 workers by default. If you want to wait until
            all the files are downloaded, set the wait_until_finished option to True.
    
            if files == None:
                files = self.file_list()
            elif isinstance(files, str):
                files = [files]
    
    
            base_url = self.downloadUrl + '/datastore_server/' + self.permid + '/'
    
    
            queue = DataSetDownloadQueue(workers=workers)
    
            # get file list and start download
    
            for filename in files:
                download_url = base_url + filename + '?sessionID=' + self.openbis.token 
                filename = os.path.join(self.openbis.hostname, self.permid, filename)
                queue.put([download_url, filename, self.openbis.verify_certificates])
    
    
            # wait until all files have downloaded
            if wait_until_finished:
                queue.join()
    
    
            """ Returns an array of the parents of the given dataset. Returns an empty array if no
            parents were found.
            """
    
            parents = []
            for item in self.data['parents']:
                parent = self.openbis.get_dataset(item['code'])
                if parent is not None:
                    parents.append(parent)
            return parents
    
        def get_children(self):
    
            """ Returns an array of the children of the given dataset. Returns an empty array if no
            children were found.
            """
    
            children = []
            for item in self.data['children']:
                child = self.openbis.get_dataset(item['code'])
                if child is not None:
                    children.append(child)
            return children
    
    
    
        def file_list(self):
            files = []
            for file in self.get_file_list(recursive=True):
                if file['isDirectory']:
                    pass
                else:
                    files.append(file['pathInDataSet'])
            return files
    
    
        def get_file_list(self, recursive=True, start_folder="/"):
    
            """ Lists all files of a given dataset. You can specifiy a start_folder other than "/".
            By default, all directories and their containing files are listed recursively. You can
            turn off this option by setting recursive=False.
            """
    
            request = {
                "method" : "listFilesForDataSet",
                "params" : [ 
                    self.openbis.token,
                    self.permid, 
    
                    start_folder,
    
            resp = requests.post(
                self.downloadUrl + self.v1_ds, 
                json.dumps(request), 
                verify=self.openbis.verify_certificates
            )
    
                    raise ValueError('Error from openBIS: ' + data['error'] )
    
                elif 'result' in data:
                    return data['result']
    
                    raise ValueError('request to openBIS did not return either result nor error')
    
                raise ValueError('internal error while performing post request')
    
        """ A Sample is one of the most commonly used objects in openBIS.
    
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Sample, self).__init__(*args, **kwargs)
            self.__dict__ = self
    
            self.permid = self.permId['permId']
            self.ident = self.identifier['identifier']
    
            self.datasets = None
    
    
    
        def delete(self, permid, reason):
            self.openbis.delete_sample(permid, reason)
    
    
        def get_datasets(self):
            datasets = []
            for item in self.dataSets:
                datasets.append(self.openbis.get_dataset(item['permId']['permId']))
            return datasets
    
                parent = self.openbis.get_sample(item['permId']['permId'])
    
                if parent is not None:
                    parents.append(parent)
            return parents
    
    
        def get_children(self):
            children = []
    
                child = self.openbis.get_sample(item['permId']['permId'])
    
                if child is not None:
                    children.append(child)
            return children
    
    
    
    class Space(dict):
        """ managing openBIS spaces
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Space, self).__init__(*args, **kwargs)
            self.__dict__ = self
            self.openbis = openbis_obj
            self.code = self.code
    
            """ Lists all samples in a given space. A pandas DataFrame object is returned.
            """
    
            fields = ['spaceCode','permId', 'identifier','experimentIdentifierOrNull']
    
            request = {
                "method": "searchForSamples",
                "params": [
                    self.openbis.token,
                    {
                    "matchClauses": [
                        {
                        "@type": "AttributeMatchClause",
                        "fieldType": "ATTRIBUTE",
                        "attribute": "SPACE",
                        "desiredValue": self.code,
                        }
                    ],
                        "subCriterias": [],
                        "operator": "MATCH_ALL_CLAUSES"
                    },
                    [
                        "PROPERTIES",
                        "PARENTS"
                    ]
                ],
                "id": "1",
                "jsonrpc": "2.0"
            }
    
            resp = self.openbis._post_request(self.openbis.as_v1, request)
    
            if resp is not None and len(resp) > 0:
                datasets = DataFrame(resp)[fields]