Skip to content
Snippets Groups Projects
pybis.py 45.3 KiB
Newer Older
  • Learn to ignore specific revisions
  •             return self.get_sample(resp[0]['permId'])
            else:
                raise ValueError("error while trying to fetch sample from server: " + str(resp))
    
        def _get_dss_url(self, dss_code=None):
            """ internal method to get the downloadURL of a datastore.
            """
    
            dss = self.get_datastores()
            if dss_code is None:
                return dss['downloadUrl'][0]
    
                return dss[dss['code'] == dss_code]['downloadUrl'][0]
    
        def upload_files(self, datastore_url=None, files=None, folder=None, wait_until_finished=False):
    
                datastore_url = self._get_dss_url()
    
            if files is None:
                raise ValueError("Please provide a filename.")
    
    
            if folder is None:
                # create a unique foldername
                folder = time.strftime('%Y-%m-%d_%H-%M-%S')
    
    
            if isinstance(files, str):
                files = [files]
    
            self.files = files
            self.startByte = 0
            self.endByte   = 0
        
            # define a queue to handle the upload threads
            queue = DataSetUploadQueue()
    
            real_files = []
            for filename in files:
                if os.path.isdir(filename):
                    real_files.extend([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
                else:
                    real_files.append(os.path.join(filename))
    
            # compose the upload-URL and put URL and filename in the upload queue 
            for filename in real_files:
                file_in_wsp = os.path.join(folder, filename)
    
                    datastore_url + '/session_workspace_file_upload'
    
                    + '?filename=' + os.path.join(folder,filename)
                    + '&id=1'
                    + '&startByte=0&endByte=0'
                    + '&sessionID=' + self.token
                )
                queue.put([upload_url, filename, self.verify_certificates])
    
            # wait until all files have uploaded
            if wait_until_finished:
                queue.join()
    
            # return files with full path in session workspace
    
    
    
    class DataSetUploadQueue:
       
        def __init__(self, workers=20):
            # maximum files to be uploaded at once
            self.upload_queue = Queue()
    
            # define number of threads and start them
            for t in range(workers):
                t = Thread(target=self.upload_file)
                t.daemon = True
                t.start()
    
    
        def put(self, things):
            """ expects a list [url, filename] which is put into the upload queue
            """
            self.upload_queue.put(things)
    
    
        def join(self):
            """ needs to be called if you want to wait for all uploads to be finished
            """
            self.upload_queue.join()
    
    
        def upload_file(self):
            while True:
                # get the next item in the queue
                upload_url, filename, verify_certificates = self.upload_queue.get()
    
                # upload the file to our DSS session workspace
                with open(filename, 'rb') as f:
                    resp = requests.post(upload_url, data=f, verify=verify_certificates)
                    resp.raise_for_status()
    
                # Tell the queue that we are done
                self.upload_queue.task_done()
    
    class DataSetDownloadQueue:
        
        def __init__(self, workers=20):
            # maximum files to be downloaded at once
            self.download_queue = Queue()
    
            # define number of threads
            for t in range(workers):
                t = Thread(target=self.download_file)
                t.daemon = True
                t.start()
    
    
        def put(self, things):
            """ expects a list [url, filename] which is put into the download queue
            """
            self.download_queue.put(things)
    
    
        def join(self):
            """ needs to be called if you want to wait for all downloads to be finished
            """
            self.download_queue.join()
    
    
        def download_file(self):
            while True:
    
                url, filename, verify_certificates = self.download_queue.get()
    
                # create the necessary directory structure if they don't exist yet
                os.makedirs(os.path.dirname(filename), exist_ok=True)
    
                # request the file in streaming mode
    
                r = requests.get(url, stream=True, verify=verify_certificates)
    
                with open(filename, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=1024): 
                        if chunk: # filter out keep-alive new chunks
                            f.write(chunk)
    
                self.download_queue.task_done()
    
    
    
        """ DataSet are openBIS objects that contain the actual files.
        """
    
    
        def __init__(self, openbis_obj, permid, data):
            self.openbis = openbis_obj
            self.permid  = permid
            self.data    = data
    
            self.v1_ds = '/datastore_server/rmi-dss-api-v1.json'
    
            self.downloadUrl = self.data['dataStore']['downloadUrl']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if self.data['physicalData'] is None:
                self.shareId = None
                self.location = None
            else:
                self.shareId = self.data['physicalData']['shareId']
                self.location = self.data['physicalData']['location']
    
        def download(self, files=None, wait_until_finished=False, workers=10):
    
            """ download the actual files and put them by default in the following folder:
    
            If no files are specified, all files of a given dataset are downloaded.
            Files are usually downloaded in parallel, using 10 workers by default. If you want to wait until
            all the files are downloaded, set the wait_until_finished option to True.
    
            if files == None:
                files = self.file_list()
            elif isinstance(files, str):
                files = [files]
    
    
            base_url = self.downloadUrl + '/datastore_server/' + self.permid + '/'
    
    
            queue = DataSetDownloadQueue(workers=workers)
    
            # get file list and start download
    
            for filename in files:
                download_url = base_url + filename + '?sessionID=' + self.openbis.token 
                filename = os.path.join(self.openbis.hostname, self.permid, filename)
                queue.put([download_url, filename, self.openbis.verify_certificates])
    
    
            # wait until all files have downloaded
            if wait_until_finished:
                queue.join()
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            print("Files downloaded to: %s" % os.path.join(self.openbis.hostname, self.permid))
    
    
    
            """ Returns an array of the parents of the given dataset. Returns an empty array if no
            parents were found.
            """
    
            parents = []
            for item in self.data['parents']:
                parent = self.openbis.get_dataset(item['code'])
                if parent is not None:
                    parents.append(parent)
            return parents
    
        def get_children(self):
    
            """ Returns an array of the children of the given dataset. Returns an empty array if no
            children were found.
            """
    
            children = []
            for item in self.data['children']:
                child = self.openbis.get_dataset(item['code'])
                if child is not None:
                    children.append(child)
            return children
    
        def file_list(self):
            files = []
            for file in self.get_file_list(recursive=True):
                if file['isDirectory']:
                    pass
                else:
                    files.append(file['pathInDataSet'])
            return files
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
        def get_files(self):
            """ Returns a DataFrame of all files in this dataset
            """
    
            def createRelativePath(pathInDataSet):
                if self.shareId is None:
                    return ''
                else:
                    return os.path.join(self.shareId, self.location, pathInDataSet)
                
            files = self.get_file_list()
            df = DataFrame(files)
            df['relativePath'] = df['pathInDataSet'].map(createRelativePath)
            return df[['isDirectory', 'pathInDataSet', 'fileSize', 'crc32Checksum']]
    
        def get_file_list(self, recursive=True, start_folder="/"):
    
            """ Lists all files of a given dataset. You can specifiy a start_folder other than "/".
            By default, all directories and their containing files are listed recursively. You can
            turn off this option by setting recursive=False.
            """
    
            request = {
                "method" : "listFilesForDataSet",
                "params" : [ 
                    self.openbis.token,
                    self.permid, 
    
                    start_folder,
    
            resp = requests.post(
                self.downloadUrl + self.v1_ds, 
                json.dumps(request), 
                verify=self.openbis.verify_certificates
            )
    
                    raise ValueError('Error from openBIS: ' + data['error'] )
    
                elif 'result' in data:
                    return data['result']
    
                    raise ValueError('request to openBIS did not return either result nor error')
    
                raise ValueError('internal error while performing post request')
    
        """ A Sample is one of the most commonly used objects in openBIS.
    
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Sample, self).__init__(*args, **kwargs)
            self.__dict__ = self
    
            self.permid = self.permId['permId']
            self.ident = self.identifier['identifier']
    
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def delete(self, reason):
            self.openbis.delete_sample(self.permid, reason)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            objects = self.dataSets
            cache = {}
            for obj in objects:
                for key in obj.keys():
                    if key in ('type'):
                        if isinstance(obj[key], dict):
                            cache[ obj[key]['@id'] ] = obj[key]
                        else:
                            if obj[key] in cache:
                                obj[key] = cache[ obj[key] ]
            datasets = DataFrame(objects)
            datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
            datasets['properties'] = datasets['properties'].map(extract_properties)
            datasets['type'] = datasets['type'].map(extract_code)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            #return datasets[['code','properties', 'type', 'registrationDate']]
    
                parent = self.openbis.get_sample(item['permId']['permId'])
    
                if parent is not None:
                    parents.append(parent)
            return parents
    
    
        def get_children(self):
            children = []
    
                child = self.openbis.get_sample(item['permId']['permId'])
    
                if child is not None:
                    children.append(child)
            return children
    
    
    
    class Space(dict):
        """ managing openBIS spaces
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Space, self).__init__(*args, **kwargs)
            self.__dict__ = self
            self.openbis = openbis_obj
            self.code = self.code
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_samples(self, *args, **kwargs):
    
            """ Lists all samples in a given space. A pandas DataFrame object is returned.
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            return self.openbis.get_samples(space=self.code, *args, **kwargs)