Skip to content
Snippets Groups Projects
pybis.py 52.7 KiB
Newer Older
  • Learn to ignore specific revisions
  •         """ Deletes a given sample.
            """
    
            sample_delete_request = {
                "method": "deleteSamples",
                "params": [
                    self.token,
                    [
                        {
                            "permId": permid,
                            "@type": "as.dto.sample.id.SamplePermId"
                        }
                    ],
                    {
                        "reason": reason,
                        "@type": "as.dto.sample.delete.SampleDeletionOptions"
                    }
                ],
            }
    
            resp = self._post_request(self.as_v3, sample_delete_request)
    
        def new_space(self, name, description=None):
    
            """ Creates a new space in the openBIS instance. Returns a list of all spaces
            """
    
            request = {
                "method": "createSpaces",
                "params": [
                    self.token,
                    [ {
                        "@id": 0,
                        "code": name,
                        "description": description,
                        "@type": "as.dto.space.create.SpaceCreation"
                    } ]
                ],
            }
    
            resp = self._post_request(self.as_v3, request)
    
            return self.get_spaces(refresh=True)
    
    
    
        def new_analysis(self, name, description=None, sample=None, dss_code=None, result_files=None,
        notebook_files=None, parents=[]):
    
    
            """ An analysis contains the Jupyter notebook file(s) and some result files.
                Technically this method involves uploading files to the session workspace
                and activating the dropbox aka dataset ingestion service "jupyter-uploader-api"
    
            """
    
            if dss_code is None:
                dss_code = self.get_datastores()['code'][0]
    
            # if a sample identifier was given, use it as a string.
            # if a sample object was given, take its identifier
            # TODO: handle permId's 
            sample_identifier = None
            if isinstance(sample, str):
                sample_identifier = sample
            else:
                sample_identifier = sample.ident
            
    
            datastore_url = self._get_dss_url(dss_code)
    
            # upload the files
    
            data_sets = []
            if notebook_files is not None:
                notebooks_folder = os.path.join(folder, 'notebook_files')
                self.upload_files(
                    datastore_url = datastore_url,
                    files=notebook_files,
                    folder= notebooks_folder, 
                    wait_until_finished=True
                )
                data_sets.append({
                    "dataSetType" : "JUPYTER_NOTEBOOk",
                    "sessionWorkspaceFolder": notebooks_folder,
                    "fileNames" : notebook_files,
                    "properties" : {}
                })
            if result_files is not None:
                results_folder = os.path.join(folder, 'result_files')
                self.upload_files(
                    datastore_url = datastore_url,
                    files=result_files,
                    folder=results_folder,
                    wait_until_finished=True
                )
                data_sets.append({
                    "dataSetType" : "JUPYTER_RESULT",
                    "sessionWorkspaceFolder" : results_folder,
                    "fileNames" : result_files,
                    "properties" : {}
                })
    
    
            # register the files in openBIS
    
            request = {
              "method": "createReportFromAggregationService",
              "params": [
                self.token,
                dss_code,
    
                    "containers" : [ 
                        {
                        	"dataSetType" : "JUPYTER_CONTAINER",
                        	"properties" : {
    
                    			"NAME" : name,
                    			"DESCRIPTION" : description
    
                    "dataSets" : data_sets,
                    "parents" : parents,
    
            
            resp = self._post_request(self.reg_v1, request)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            try:
                if resp['rows'][0][0]['value'] == 'OK':
                    return resp['rows'][0][1]['value']
            except:
                return resp
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def new_sample(self, sample_name, space_name, sample_type, tags=[], **kwargs):
    
            """ Creates a new sample of a given sample type. sample_name, sample_type and space are
            mandatory arguments.
    
    
            if isinstance(tags, str):
                tags = [tags]
            tag_ids = []
            for tag in tags:
                tag_dict = {
                    "code":tag,
                    "@type":"as.dto.tag.id.TagCode"
                }
                tag_ids.append(tag_dict)
    
    
            sample_create_request = {
                "method":"createSamples",
                "params":[
                    self.token,
    
                        "properties":{},
                        "typeId":{
                            "permId": sample_type,
                            "@type":"as.dto.entitytype.id.EntityTypePermId"
                        },
                        "code": sample_name,
                        "spaceId":{
                            "permId": space_name,
                            "@type":"as.dto.space.id.SpacePermId"
                        },
                        "tagIds":tag_ids,
                        "@type":"as.dto.sample.create.SampleCreation",
                        "experimentId":None,
                        "containerId":None,
                        "componentIds":None,
                        "parentIds":None,
                        "childIds":None,
                        "attachments":None,
                        "creationId":None,
                        "autoGeneratedCode":None
    
            resp = self._post_request(self.as_v3, sample_create_request)
    
            if 'permId' in resp[0]:
                return self.get_sample(resp[0]['permId'])
            else:
                raise ValueError("error while trying to fetch sample from server: " + str(resp))
    
        def _get_dss_url(self, dss_code=None):
            """ internal method to get the downloadURL of a datastore.
            """
    
            dss = self.get_datastores()
            if dss_code is None:
                return dss['downloadUrl'][0]
    
                return dss[dss['code'] == dss_code]['downloadUrl'][0]
    
        def upload_files(self, datastore_url=None, files=None, folder=None, wait_until_finished=False):
    
                datastore_url = self._get_dss_url()
    
            if files is None:
                raise ValueError("Please provide a filename.")
    
    
            if folder is None:
                # create a unique foldername
                folder = time.strftime('%Y-%m-%d_%H-%M-%S')
    
    
            if isinstance(files, str):
                files = [files]
    
            self.files = files
            self.startByte = 0
            self.endByte   = 0
        
            # define a queue to handle the upload threads
            queue = DataSetUploadQueue()
    
            real_files = []
            for filename in files:
                if os.path.isdir(filename):
                    real_files.extend([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
                else:
                    real_files.append(os.path.join(filename))
    
            # compose the upload-URL and put URL and filename in the upload queue 
            for filename in real_files:
                file_in_wsp = os.path.join(folder, filename)
    
                    datastore_url + '/session_workspace_file_upload'
    
                    + '?filename=' + os.path.join(folder,filename)
                    + '&id=1'
                    + '&startByte=0&endByte=0'
                    + '&sessionID=' + self.token
                )
                queue.put([upload_url, filename, self.verify_certificates])
    
            # wait until all files have uploaded
            if wait_until_finished:
                queue.join()
    
            # return files with full path in session workspace
    
    
    
    class DataSetUploadQueue:
       
        def __init__(self, workers=20):
            # maximum files to be uploaded at once
            self.upload_queue = Queue()
    
            # define number of threads and start them
            for t in range(workers):
                t = Thread(target=self.upload_file)
                t.daemon = True
                t.start()
    
    
        def put(self, things):
            """ expects a list [url, filename] which is put into the upload queue
            """
            self.upload_queue.put(things)
    
    
        def join(self):
            """ needs to be called if you want to wait for all uploads to be finished
            """
            self.upload_queue.join()
    
    
        def upload_file(self):
            while True:
                # get the next item in the queue
                upload_url, filename, verify_certificates = self.upload_queue.get()
    
    
                filesize = os.path.getsize(filename)
    
    
                # upload the file to our DSS session workspace
                with open(filename, 'rb') as f:
                    resp = requests.post(upload_url, data=f, verify=verify_certificates)
                    resp.raise_for_status()
    
                    data = resp.json()
                    assert filesize == int(data['size'])
    
    
                # Tell the queue that we are done
                self.upload_queue.task_done()
    
    class DataSetDownloadQueue:
        
        def __init__(self, workers=20):
            # maximum files to be downloaded at once
            self.download_queue = Queue()
    
            # define number of threads
            for t in range(workers):
                t = Thread(target=self.download_file)
                t.daemon = True
                t.start()
    
    
        def put(self, things):
            """ expects a list [url, filename] which is put into the download queue
            """
            self.download_queue.put(things)
    
    
        def join(self):
            """ needs to be called if you want to wait for all downloads to be finished
            """
            self.download_queue.join()
    
    
        def download_file(self):
            while True:
    
                url, filename, file_size, verify_certificates = self.download_queue.get()
    
                # create the necessary directory structure if they don't exist yet
                os.makedirs(os.path.dirname(filename), exist_ok=True)
    
                # request the file in streaming mode
    
                r = requests.get(url, stream=True, verify=verify_certificates)
    
                with open(filename, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=1024): 
                        if chunk: # filter out keep-alive new chunks
                            f.write(chunk)
    
    
                assert os.path.getsize(filename) == int(file_size)
    
        """ DataSet are openBIS objects that contain the actual files.
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(DataSet, self).__init__(*args, **kwargs)
            self.__dict__ = self
            self.permid = self["permId"]["permId"]
    
            self.openbis = openbis_obj
    
            if self['physicalData'] is None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                self.shareId = None
                self.location = None
            else:
    
                self.shareId = self['physicalData']['shareId']
                self.location = self['physicalData']['location']
    
        def download(self, files=None, wait_until_finished=True, workers=10):
    
            """ download the actual files and put them by default in the following folder:
    
            If no files are specified, all files of a given dataset are downloaded.
            Files are usually downloaded in parallel, using 10 workers by default. If you want to wait until
            all the files are downloaded, set the wait_until_finished option to True.
    
            if files == None:
                files = self.file_list()
            elif isinstance(files, str):
                files = [files]
    
    
            base_url = self['dataStore']['downloadUrl'] + '/datastore_server/' + self.permid + '/'
    
            queue = DataSetDownloadQueue(workers=workers)
    
            # get file list and start download
    
                file_info = self.get_file_list(start_folder=filename)
                file_size = file_info[0]['fileSize']
    
                download_url = base_url + filename + '?sessionID=' + self.openbis.token 
                filename = os.path.join(self.openbis.hostname, self.permid, filename)
    
                queue.put([download_url, filename, file_size, self.openbis.verify_certificates])
    
    
            # wait until all files have downloaded
            if wait_until_finished:
                queue.join()
    
    
            print("Files downloaded to: %s" % os.path.join(self.openbis.hostname, self.permid))
    
            """ Returns an array of the parents of the given dataset. Returns an empty array if no
            parents were found.
            """
    
            for item in self['parents']:
    
                parent = self.openbis.get_dataset(item['code'])
                if parent is not None:
                    parents.append(parent)
            return parents
    
    
            """ Returns an array of the children of the given dataset. Returns an empty array if no
            children were found.
            """
    
            for item in self['children']:
    
                child = self.openbis.get_dataset(item['code'])
                if child is not None:
                    children.append(child)
            return children
    
    
    
        def file_list(self):
            files = []
            for file in self.get_file_list(recursive=True):
                if file['isDirectory']:
                    pass
                else:
                    files.append(file['pathInDataSet'])
            return files
    
    
        def get_files(self, start_folder='/'):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ Returns a DataFrame of all files in this dataset
            """
    
            def createRelativePath(pathInDataSet):
                if self.shareId is None:
                    return ''
                else:
                    return os.path.join(self.shareId, self.location, pathInDataSet)
                
    
            files = self.get_file_list(start_folder=start_folder)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            df = DataFrame(files)
            df['relativePath'] = df['pathInDataSet'].map(createRelativePath)
    
            df['crc32Checksum'] = df['crc32Checksum'].fillna(0.0).astype(int)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return df[['isDirectory', 'pathInDataSet', 'fileSize', 'crc32Checksum']]
    
        def get_file_list(self, recursive=True, start_folder="/"):
    
            """ Lists all files of a given dataset. You can specifiy a start_folder other than "/".
            By default, all directories and their containing files are listed recursively. You can
            turn off this option by setting recursive=False.
            """
    
            request = {
                "method" : "listFilesForDataSet",
                "params" : [ 
                    self.openbis.token,
                    self.permid, 
    
                    start_folder,
    
            resp = requests.post(
    
                self["dataStore"]["downloadUrl"] + '/datastore_server/rmi-dss-api-v1.json',
    
                json.dumps(request), 
                verify=self.openbis.verify_certificates
            )
    
                    raise ValueError('Error from openBIS: ' + data['error'] )
    
                elif 'result' in data:
                    return data['result']
    
                    raise ValueError('request to openBIS did not return either result nor error')
    
                raise ValueError('internal error while performing post request')
    
        """ A Sample is one of the most commonly used objects in openBIS.
    
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Sample, self).__init__(*args, **kwargs)
            self.__dict__ = self
    
            self.permid = self.permId['permId']
            self.ident = self.identifier['identifier']
    
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def delete(self, reason):
            self.openbis.delete_sample(self.permid, reason)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            objects = self.dataSets
            cache = {}
            for obj in objects:
                for key in obj.keys():
                    if key in ('type'):
                        if isinstance(obj[key], dict):
                            cache[ obj[key]['@id'] ] = obj[key]
                        else:
                            if obj[key] in cache:
                                obj[key] = cache[ obj[key] ]
            datasets = DataFrame(objects)
            datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
            datasets['properties'] = datasets['properties'].map(extract_properties)
            datasets['type'] = datasets['type'].map(extract_code)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            #return datasets[['code','properties', 'type', 'registrationDate']]
    
                parent = self.openbis.get_sample(item['permId']['permId'])
    
                if parent is not None:
                    parents.append(parent)
            return parents
    
    
        def get_children(self):
            children = []
    
                child = self.openbis.get_sample(item['permId']['permId'])
    
                if child is not None:
                    children.append(child)
            return children
    
    class Space(dict):
        """ managing openBIS spaces
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Space, self).__init__(*args, **kwargs)
            self.__dict__ = self
            self.openbis = openbis_obj
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_samples(self, *args, **kwargs):
    
            """ Lists all samples in a given space. A pandas DataFrame object is returned.
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            return self.openbis.get_samples(space=self.code, *args, **kwargs)
    
    
        @property
        def projects(self):
            return self.openbis.get_projects(space=self.code)
        
        def new_project(self, **kwargs):
            return self.openbis.new_project(space=self.code, **kwargs)
    
        @property
        def experiments(self):
            return self.openbis.get_experiments(space=self.code)
    
    
    class Things():
        """An object that contains a DataFrame object about an entity  available in openBIS.
           
        """
    
        def __init__(self, openbis_obj, what, df, identifier_name='code'):
            self.openbis = openbis_obj
            self.what = what
            self.df = df
            self.identifier_name = identifier_name
    
        def _repr_html_(self):
            return self.df._repr_html_()
    
        def __getitem__(self, key):
            if self.df is not None and len(self.df) > 0:
                row = None
                if isinstance(key, int):
                    # get thing by rowid
                    row = self.df.loc[[key]]
                else:
                    # get thing by code
                    row = self.df[self.df[self.identifier_name]==key.upper()]
    
                if row is not None:
                    # invoke the openbis.get_what() method
                    return getattr(self.openbis, 'get_'+self.what)(row[self.identifier_name].values[0])
    
    
    class Experiment(dict):
        """ managing openBIS experiments
        """
    
        def __init__(self, openbis_obj, *args, **kwargs):
            super(Experiment, self).__init__(*args, **kwargs)
            self.__dict__ = self
            self.openbis = openbis_obj
    
        def __repr__(self):
            data = {}
            data["identifier"] = self['identifier']['identifier']
            data["permId"] = self['permId']['permId']
            data["properties"] = self['properties']
            return repr(data)