Skip to content
Snippets Groups Projects
pybis.py 140 KiB
Newer Older
  • Learn to ignore specific revisions
  •             sub_criteria.append(_subcriteria_for_is_finished(is_finished))
            if properties is not None:
                for prop in properties:
                    sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
    
    
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.experiment.search.ExperimentSearchCriteria",
                "operator": "AND"
            }
    
            fetchopts = fetch_option['experiment']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for option in ['tags', 'properties', 'registrator', 'modifier', 'project']:
                fetchopts[option] = fetch_option[option]
    
    
            request = {
                "method": "searchExperiments",
    
            }
            resp = self._post_request(self.as_v3, request)
    
            if len(resp['objects']) == 0:
                raise ValueError("No experiments found!")
    
            objects = resp['objects']
            parse_jackson(objects)
    
            experiments = DataFrame(objects)
    
            experiments['registrationDate'] = experiments['registrationDate'].map(format_timestamp)
            experiments['modificationDate'] = experiments['modificationDate'].map(format_timestamp)
            experiments['project'] = experiments['project'].map(extract_code)
    
            experiments['registrator'] = experiments['registrator'].map(extract_person)
            experiments['modifier'] = experiments['modifier'].map(extract_person)
            experiments['identifier'] = experiments['identifier'].map(extract_identifier)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            experiments['permId'] = experiments['permId'].map(extract_permid)
    
            experiments['type'] = experiments['type'].map(extract_code)
    
    
            attrs = ['identifier', 'permId', 'project', 'type',
    
                     'registrator', 'registrationDate', 'modifier', 'modificationDate']
    
            if props is not None:
                for prop in props:
    
                    experiments[prop.upper()] = experiments['properties'].map(lambda x: x.get(prop.upper(), ''))
    
            return Things(self, 'experiment', exps, 'identifier')
    
    
        def get_datasets(self,
                         code=None, type=None, withParents=None, withChildren=None, status=None,
    
                         sample=None, experiment=None, project=None, tags=None, props=None, **properties
    
    
            sub_criteria = []
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
            if type:
                sub_criteria.append(_subcriteria_for_type(type, 'DataSet'))
            if withParents:
                sub_criteria.append(_subcriteria_for_permid(withParents, 'DataSet', 'Parents'))
            if withChildren:
                sub_criteria.append(_subcriteria_for_permid(withChildren, 'DataSet', 'Children'))
    
    
            if sample:
                sub_criteria.append(_subcriteria_for_code(sample, 'Sample'))
            if experiment:
                sub_criteria.append(_subcriteria_for_code(experiment, 'Experiment'))
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if project:
                exp_crit = _subcriteria_for_code(experiment, 'Experiment')
                proj_crit = _subcriteria_for_code(project, 'Project')
                exp_crit['criteria'] = []
                exp_crit['criteria'].append(proj_crit)
                sub_criteria.append(exp_crit)
    
            if tags:
                sub_criteria.append(_subcriteria_for_tags(tags))
    
            if status:
                sub_criteria.append(_subcriteria_for_status(status))
    
            if properties is not None:
                for prop in properties:
                    sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
    
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.dataset.search.DataSetSearchCriteria",
                "operator": "AND"
            }
    
            fetchopts = {
    
                "containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
                "type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"}
    
            for option in ['tags', 'properties', 'sample', 'experiment', 'physicalData']:
    
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchDataSets",
    
            }
            resp = self._post_request(self.as_v3, request)
    
            if len(resp['objects']) == 0:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError("no datasets found!")
    
    
            objects = resp['objects']
            parse_jackson(objects)
    
            datasets = DataFrame(objects)
            datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
            datasets['modificationDate'] = datasets['modificationDate'].map(format_timestamp)
            datasets['experiment'] = datasets['experiment'].map(extract_nested_identifier)
            datasets['sample'] = datasets['sample'].map(extract_nested_identifier)
            datasets['type'] = datasets['type'].map(extract_code)
            datasets['permId'] = datasets['code']
            datasets['location'] = datasets['physicalData'].map(lambda x: x.get('location') if x else '')
    
    
            attrs = ['permId', 'properties', 'type', 'experiment', 'sample', 'registrationDate', 'modificationDate',
                     'location']
    
            if props is not None:
                for prop in props:
                    datasets[prop.upper()] = datasets['properties'].map(lambda x: x.get(prop.upper(), ''))
                    attrs.append(prop.upper())
    
    
            return Things(self, 'dataset', datasets[attrs], 'permId')
    
        def get_experiment(self, expId, withAttachments=False, only_data=False):
    
            """ Returns an experiment object for a given identifier (expId).
            """
    
    
                "@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
                "type": {
                    "@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
                },
    
            search_request = search_request_for_identifier(expId, 'experiment')
    
            for option in ['tags', 'properties', 'attachments', 'project', 'samples']:
    
                fetchopts[option] = fetch_option[option]
    
            if withAttachments:
                fetchopts['attachments'] = fetch_option['attachmentsWithContent']
    
    
            request = {
    
                "method": "getExperiments",
                "params": [
    
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No such experiment: %s" % expId)
    
    
            for id in resp:
                if only_data:
                    return resp[id]
                else:
                    return Experiment(
                        openbis_obj = self,
                        type = self.get_experiment_type(resp[expId]["type"]["code"]),
                        data = resp[id]
                    )
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def new_experiment(self, type, code, project, props=None, **kwargs):
    
            """ Creates a new experiment of a given experiment type.
            """
    
            return Experiment(
                openbis_obj = self, 
                type = self.get_experiment_type(type), 
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                project = project,
    
                data = None,
                props = props,
                code = code, 
                **kwargs
            )
    
    
        def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
            params = {
                "experimentId": {
                    "permId": experimentId,
                    "@type": "as.dto.experiment.id.ExperimentPermId"
                },
                "@type": "as.dto.experiment.update.ExperimentUpdate"
            }
            if properties is not None:
    
                params["properties"] = properties
    
            if tagIds is not None:
                params["tagIds"] = tagIds
            if attachments is not None:
                params["attachments"] = attachments
    
            request = {
                "method": "updateExperiments",
                "params": [
                    self.token,
    
        def create_sample(self, space_ident, code, type,
                          project_ident=None, experiment_ident=None, properties=None, attachments=None, tags=None):
    
    
            tagIds = _create_tagIds(tags)
            typeId = _create_typeId(type)
            projectId = _create_projectId(project_ident)
            experimentId = _create_experimentId(experiment_ident)
    
            if properties is None:
                properties = {}
    
            request = {
                "method": "createSamples",
                "params": [
                    self.token,
                    [
                        {
                            "properties": properties,
                            "code": code,
    
                            "projectId": projectId,
                            "experimentId": experimentId,
                            "tagIds": tagIds,
                            "attachments": attachments,
    
                            "@type": "as.dto.sample.create.SampleCreation",
    
                        }
                    ]
                ],
            }
            resp = self._post_request(self.as_v3, request)
            return self.get_sample(resp[0]['permId'])
    
    
        create_object = create_sample # Alias
    
    
        def create_external_data_management_system(self, code, label, address, address_type='FILE_SYSTEM'):
    
            """Create an external DMS.
            :param code: An openBIS code for the external DMS.
            :param label: A human-readable label.
            :param address: The address for accessing the external DMS. E.g., a URL.
    
            :param address_type: One of OPENBIS, URL, or FILE_SYSTEM
    
            :return:
            """
            request = {
                "method": "createExternalDataManagementSystems",
                "params": [
                    self.token,
                    [
                        {
                            "code": code,
                            "label": label,
                            "addressType": address_type,
                            "address": address,
                            "@type": "as.dto.externaldms.create.ExternalDmsCreation",
                        }
                    ]
                ],
            }
            resp = self._post_request(self.as_v3, request)
            return self.get_external_data_management_system(resp[0]['permId'])
    
    
        def update_sample(self, sampleId, space=None, project=None, experiment=None,
    
                          parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
    
            params = {
                "sampleId": {
                    "permId": sampleId,
                    "@type": "as.dto.sample.id.SamplePermId"
                },
                "@type": "as.dto.sample.update.SampleUpdate"
            }
    
            if space is not None:
                params['spaceId'] = space
            if project is not None:
                params['projectId'] = project
    
                params["properties"] = properties
    
            if tagIds is not None:
                params["tagIds"] = tagIds
            if attachments is not None:
                params["attachments"] = attachments
    
            request = {
                "method": "updateSamples",
                "params": [
                    self.token,
    
        update_object = update_sample # Alias
    
    
        def delete_entity(self, entity, permid, reason, capitalize=True):
    
            """Deletes Spaces, Projects, Experiments, Samples and DataSets
            """
    
    
            if capitalize:
                entity_capitalized = entity.capitalize()
            else:
                entity_capitalized = entity
    
            entity_type = "as.dto.{}.id.{}PermId".format(entity.lower(), entity_capitalized)
    
                "method": "delete" + entity_capitalized + 's',
    
                "params": [
                    self.token,
                    [
                        {
                            "permId": permid,
                            "@type": entity_type
                        }
                    ],
                    {
                        "reason": reason,
    
                        "@type": "as.dto.{}.delete.{}DeletionOptions".format(entity.lower(), entity_capitalized)
    
                    }
                ]
            }
            self._post_request(self.as_v3, request)
    
        def get_deletions(self):
            request = {
                "method": "searchDeletions",
                "params": [
                    self.token,
                    {},
                    {
                        "deletedObjects": {
                            "@type": "as.dto.deletion.fetchoptions.DeletedObjectFetchOptions"
                        }
                    }
                ]
            }
            resp = self._post_request(self.as_v3, request)
            objects = resp['objects']
            parse_jackson(objects)
    
    
            for value in objects:
                del_objs = extract_deletion(value)
                if len(del_objs) > 0:
                    new_objs.append(*del_objs)
    
            return DataFrame(new_objs)
    
        def new_project(self, space, code, description=None, **kwargs):
            return Project(self, None, space=space, code=code, description=description, **kwargs)
    
        def _gen_fetchoptions(self, options):
            fo = {}
            for option in options:
                fo[option] = fetch_option[option]
            return fo
    
    
        def get_project(self, projectId, only_data=False):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            options = ['space', 'registrator', 'modifier', 'attachments']
    
            if is_identifier(projectId) or is_permid(projectId):
    
                request = self._create_get_request(
                    'getProjects', 'project', projectId, options
                )
                resp = self._post_request(self.as_v3, request)
    
                if only_data:
                    return resp[projectId]
    
    
                return Project(self, resp[projectId])
    
            else:
                search_criteria = _gen_search_criteria({
                    'project': 'Project',
                    'operator': 'AND',
                    'code': projectId
                })
                fo = self._gen_fetchoptions(options)
                request = {
                    "method": "searchProjects",
                    "params": [self.token, search_criteria, fo]
                }
                resp = self._post_request(self.as_v3, request)
    
                if len(resp['objects']) == 0:
                    raise ValueError("No such project: %s" % projectId)
    
                if only_data:
                    return resp['objects'][0]
    
    
                return Project(self, resp['objects'][0])
    
        def get_projects(self, space=None, code=None):
    
            """ Get a list of all available projects (DataFrame object).
            """
    
            sub_criteria = []
            if space:
    
                sub_criteria.append(_subcriteria_for_code(space, 'space'))
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
    
    
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.project.search.ProjectSearchCriteria",
                "operator": "AND"
            }
    
    
            fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
            for option in ['registrator', 'modifier', 'leader']:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                fetchopts[option] = fetch_option[option]
    
    
            request = {
                "method": "searchProjects",
    
            }
    
            resp = self._post_request(self.as_v3, request)
            if resp is not None:
                objects = resp['objects']
    
    
                projects = DataFrame(objects)
                if len(projects) is 0:
                    raise ValueError("No projects found!")
    
    
                projects['registrationDate'] = projects['registrationDate'].map(format_timestamp)
                projects['modificationDate'] = projects['modificationDate'].map(format_timestamp)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                projects['leader'] = projects['leader'].map(extract_person)
    
                projects['registrator'] = projects['registrator'].map(extract_person)
                projects['modifier'] = projects['modifier'].map(extract_person)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                projects['permId'] = projects['permId'].map(extract_permid)
    
                projects['identifier'] = projects['identifier'].map(extract_identifier)
    
                pros = projects[['identifier', 'permId', 'leader', 'registrator', 'registrationDate',
                                 'modifier', 'modificationDate']]
    
                return Things(self, 'project', pros, 'identifier')
    
            else:
                raise ValueError("No projects found!")
    
    
        def _create_get_request(self, method_name, entity, permids, options):
    
    
            if not isinstance(permids, list):
                permids = [permids]
    
    
            type = "as.dto.{}.id.{}".format(entity.lower(), entity.capitalize())
    
            search_params = []
            for permid in permids:
                # decide if we got a permId or an identifier
                match = re.match('/', permid)
                if match:
                    search_params.append(
    
                        {"identifier": permid, "@type": type + 'Identifier'}
    
                    search_params.append(
    
                        {"permId": permid, "@type": type + 'PermId'}
    
                    )
    
            fo = {}
            for option in options:
                fo[option] = fetch_option[option]
    
            request = {
                "method": method_name,
                "params": [
                    self.token,
                    search_params,
                    fo
                ],
            }
            return request
    
    
        def get_terms(self, vocabulary=None):
    
            """ Returns information about vocabulary, including its controlled vocabulary
    
            search_request = {}
            if vocabulary is not None:
    
                search_request = _gen_search_criteria({
                    "vocabulary": "VocabularyTerm",
                    "criteria": [{
    
                        "vocabulary": "Vocabulary",
                        "code": vocabulary
                    }]
                })
    
                "vocabulary": {"@type": "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions"},
    
                "@type": "as.dto.vocabulary.fetchoptions.VocabularyTermFetchOptions"
            }
    
            request = {
                "method": "searchVocabularyTerms",
    
                "params": [self.token, search_request, fetch_options]
    
            }
            resp = self._post_request(self.as_v3, request)
    
    
        def get_tags(self):
            """ Returns a DataFrame of all 
            """
            request = {
                "method": "searchTags",
    
            }
            resp = self._post_request(self.as_v3, request)
    
            objects = DataFrame(resp['objects'])
            objects['registrationDate'] = objects['registrationDate'].map(format_timestamp)
            return objects[['code', 'registrationDate']]
    
        
        def _search_semantic_annotations(self, criteria):
    
            fetch_options = {
                "@type": "as.dto.semanticannotation.fetchoptions.SemanticAnnotationFetchOptions",
                "entityType": {"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"},
                "propertyType": {"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"},
                "propertyAssignment": {
                    "@type": "as.dto.property.fetchoptions.PropertyAssignmentFetchOptions",
                    "entityType" : {
                        "@type" : "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
                    },
                    "propertyType" : {
                        "@type" : "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
                    }
                }
            }
    
            request = {
                "method": "searchSemanticAnnotations",
                "params": [self.token, criteria, fetch_options]
            }
    
            resp = self._post_request(self.as_v3, request)
            
            if resp is not None:
                objects = resp['objects']
                
                if len(objects) is 0:
                    raise ValueError("No semantic annotations found!")
                
                parse_jackson(objects)
                
                for object in objects:
                    object['permId'] = object['permId']['permId']
                    if object.get('entityType') is not None:
                        object['entityType'] = object['entityType']['code']
                    elif object.get('propertyType') is not None:
                        object['propertyType'] = object['propertyType']['code']
                    elif object.get('propertyAssignment') is not None:
                        object['entityType'] = object['propertyAssignment']['entityType']['code']
                        object['propertyType'] = object['propertyAssignment']['propertyType']['code']
                    object['creationDate'] = format_timestamp(object['creationDate'])
                    
                return objects
            else:
                raise ValueError("No semantic annotations found!")
    
        def get_semantic_annotations(self):
            """ Get a list of all available semantic annotations (DataFrame object).
            """
    
            objects = self._search_semantic_annotations({})
            attrs = ['permId', 'entityType', 'propertyType', 'predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId', 'creationDate']
            annotations = DataFrame(objects)
            return Things(self, 'semantic_annotation', annotations[attrs], 'permId')
        
        def get_semantic_annotation(self, permId, only_data = False):
    
            criteria = {
                "@type" : "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria",
                "criteria" : [{
                    "@type" : "as.dto.common.search.PermIdSearchCriteria",
                    "fieldValue" : {
                        "@type" : "as.dto.common.search.StringEqualToValue",
                        "value" : permId
                    }
                }]
            }
    
            objects = self._search_semantic_annotations(criteria)
            object = objects[0]
    
            if only_data:
                return object
            else:
                return SemanticAnnotation(self, isNew=False, **object)    
        
    
        def get_sample_types(self, type=None):
            """ Returns a list of all available sample types
            """
            return self._get_types_of(
                "searchSampleTypes",
                "Sample",
    
        get_object_types = get_sample_types # Alias
    
    
        def get_sample_type(self, type):
            try:
                return self._get_types_of(
    
                    "Sample",
                    type,
                    ["generatedCodePrefix"]
                )
            except Exception:
                raise ValueError("no such sample type: {}".format(type))
    
    
        get_object_type = get_sample_type # Alias
    
    
        def get_experiment_types(self, type=None):
            """ Returns a list of all available experiment types
            """
            return self._get_types_of(
    
                "searchExperimentTypes",
                "Experiment",
    
                    "searchExperimentTypes",
                    "Experiment",
    
                    type
                )
            except Exception:
                raise ValueError("No such experiment type: {}".format(type))
    
        def get_material_types(self, type=None):
            """ Returns a list of all available material types
            """
            return self._get_types_of("searchMaterialTypes", "Material", type)
    
        def get_material_type(self, type):
            try:
                return self._get_types_of("searchMaterialTypes", "Material", type)
            except Exception:
                raise ValueError("No such material type: {}".format(type))
    
        def get_dataset_types(self, type=None):
            """ Returns a list (DataFrame object) of all currently available dataset types
            """
    
            return self._get_types_of("searchDataSetTypes", "DataSet", type, optional_attributes=['kind'])
    
                return self._get_types_of("searchDataSetTypes", "DataSet", type, optional_attributes=['kind'])
    
            except Exception:
                raise ValueError("No such dataSet type: {}".format(type))
    
    
        def _get_types_of(self, method_name, entity, type_name=None, additional_attributes=[], optional_attributes=[]):
    
            """ Returns a list of all available types of an entity.
            If the name of the entity-type is given, it returns a PropertyAssignments object
    
            if type_name is not None:
                search_request = _gen_search_criteria({
    
                    entity.lower(): entity + "Type",
    
                    "code": type_name
    
                    "@type": "as.dto.{}.fetchoptions.{}TypeFetchOptions".format(
    
                        entity.lower(), entity
    
                fetch_options['propertyAssignments'] = fetch_option['propertyAssignments']
    
            request = {
                "method": method_name,
    
                "params": [self.token, search_request, fetch_options],
    
            }
            resp = self._post_request(self.as_v3, request)
    
            if type_name is not None and len(resp['objects']) == 1:
    
                return PropertyAssignments(self, resp['objects'][0])
    
            if len(resp['objects']) >= 1:
                types = DataFrame(resp['objects'])
    
                types['modificationDate'] = types['modificationDate'].map(format_timestamp)
    
                attributes = self._get_attributes(type_name, types, additional_attributes, optional_attributes)
    
                return Things(self, entity.lower() + '_type', types[attributes])
    
    
                raise ValueError("Nothing found!")
    
        def _get_attributes(self, type_name, types, additional_attributes, optional_attributes):
            attributes = ['code', 'description'] + additional_attributes
            attributes += [attribute for attribute in optional_attributes if attribute in types]
            attributes += ['modificationDate']
            if type_name is not None:
                attributes += ['propertyAssignments']
            return attributes
    
    
            """ checks whether a session is still active. Returns true or false.
            """
    
            This method is useful to check if a token is still valid or if it has timed out,
            requiring the user to login again.
    
            :return: Return True if the token is valid, False if it is not valid.
            """
    
            request = {
                "method": "isSessionActive",
    
            resp = self._post_request(self.as_v1, request)
    
            return resp
    
        def get_dataset(self, permid, only_data=False):
    
            """fetch a dataset and some metadata attached to it:
            - properties
            - sample
            - parents
            - children
            - containers
            - dataStore
            - physicalData
            - linkedData
            :return: a DataSet object
            """
    
            criteria = [{
                "permId": permid,
                "@type": "as.dto.dataset.id.DataSetPermId"
            }]
    
            fetchopts = {
    
                "parents": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
                "children": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
                "containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
                "type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"},
    
            for option in ['tags', 'properties', 'dataStore', 'physicalData', 'linkedData',
    
                           'experiment', 'sample']:
                fetchopts[option] = fetch_option[option]
    
            request = {
    
                "method": "getDataSets",
    
            resp = self._post_request(self.as_v3, request)
    
            if resp is None or len(resp) == 0:
    
                raise ValueError('no such dataset found: ' + permid)
    
            for permid in resp:
                if only_data:
                    return resp[permid]
                else:
                    return DataSet(
                        self, 
                        type=self.get_dataset_type(resp[permid]["type"]["code"]),
                        data=resp[permid]
                    )
    
        def get_sample(self, sample_ident, only_data=False, withAttachments=False):
    
            """Retrieve metadata for the sample.
            Get metadata for the sample and any directly connected parents of the sample to allow access
            to the same information visible in the ELN UI. The metadata will be on the file system.
            :param sample_identifiers: A list of sample identifiers to retrieve.
            """
    
            search_request = search_request_for_identifier(sample_ident, 'sample')
    
            fetchopts = {"type": {"@type": "as.dto.sample.fetchoptions.SampleTypeFetchOptions"}}
    
            for option in ['tags', 'properties', 'attachments', 'space', 'experiment', 'registrator', 'dataSets']:
                fetchopts[option] = fetch_option[option]
    
            if withAttachments:
                fetchopts['attachments'] = fetch_option['attachmentsWithContent']
    
    
            for key in ['parents','children','container','components']:
                fetchopts[key] = {"@type": "as.dto.sample.fetchoptions.SampleFetchOptions"}
    
    
            sample_request = {
                "method": "getSamples",
                "params": [
                    self.token,
    
            resp = self._post_request(self.as_v3, sample_request)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if resp is None or len(resp) == 0:
    
                raise ValueError('no such sample found: ' + sample_ident)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            else:
    
                for sample_ident in resp:
    
                    if only_data:
                        return resp[sample_ident]
                    else:
                        return Sample(self, self.get_sample_type(resp[sample_ident]["type"]["code"]), resp[sample_ident])
    
        def get_external_data_management_system(self, permId, only_data=False):
    
            """Retrieve metadata for the external data management system.
    
            :param permId: A permId for an external DMS.
    
            :param only_data: Return the result data as a hash-map, not an object.
            """
    
            request = {
                "method": "getExternalDataManagementSystems",
                "params": [
                    self.token,
                    [{
                        "@type": "as.dto.externaldms.id.ExternalDmsPermId",
    
                    }],
                    {},
                ],
            }
    
            resp = self._post_request(self.as_v3, request)
            parse_jackson(resp)
    
            if resp is None or len(resp) == 0:
    
                raise ValueError('no such external DMS found: ' + permId)
    
            else:
                for ident in resp:
                    if only_data:
                        return resp[ident]
                    else:
                        return ExternalDMS(self, resp[ident])
    
    
        def new_space(self, **kwargs):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ Creates a new space in the openBIS instance.
    
            return Space(self, None, **kwargs)
    
    
        def new_analysis(self, name, description=None, sample=None, dss_code=None, result_files=None,
    
                         notebook_files=None, parents=None):
    
            """ An analysis contains the Jupyter notebook file(s) and some result files.
                Technically this method involves uploading files to the session workspace
                and activating the dropbox aka dataset ingestion service "jupyter-uploader-api"
    
            """
    
            if dss_code is None:
                dss_code = self.get_datastores()['code'][0]
    
            # if a sample identifier was given, use it as a string.
            # if a sample object was given, take its identifier
    
            sampleId = self.sample_to_sample_id(sample)
    
    
            parentIds = []
            if parents is not None:
                if not isinstance(parents, list):
                    parants = [parents]
                for parent in parents:
                    parentIds.append(parent.permId)
    
            datastore_url = self._get_dss_url(dss_code)
    
            # upload the files
    
            data_sets = []
            if notebook_files is not None:
                notebooks_folder = os.path.join(folder, 'notebook_files')
                self.upload_files(
    
                    "dataSetType": "JUPYTER_NOTEBOOk",
    
                    "sessionWorkspaceFolder": notebooks_folder,
    
                    "fileNames": notebook_files,
                    "properties": {}
    
                })
            if result_files is not None:
                results_folder = os.path.join(folder, 'result_files')
                self.upload_files(
    
                    files=result_files,
                    folder=results_folder,
                    wait_until_finished=True
                )
                data_sets.append({
    
                    "dataSetType": "JUPYTER_RESULT",
                    "sessionWorkspaceFolder": results_folder,
                    "fileNames": result_files,
                    "properties": {}
    
            # register the files in openBIS
    
            request = {
    
                "method": "createReportFromAggregationService",
                "params": [
                    self.token,
                    dss_code,
    
                    {
                        "sampleId": sampleId,
                        "parentIds": parentIds,
                        "containers": [{
                            "dataSetType": "JUPYTER_CONTAINER",
                            "properties": {
                                "NAME": name,
                                "DESCRIPTION": description
                            }
                        }],
                        "dataSets": data_sets,
                    }
                ],
    
            resp = self._post_request(self.reg_v1, request)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            try:
                if resp['rows'][0][0]['value'] == 'OK':
                    return resp['rows'][0][1]['value']
            except:
                return resp
    
        def new_git_data_set(self, data_set_type, path, commit_id, repository_id, dms, sample=None, experiment=None, properties={},
    
                             dss_code=None, parents=None, data_set_code=None, contents=[]):
    
            """ Create a link data set.
            :param data_set_type: The type of the data set
    
            :param path: The path to the git repository
            :param commit_id: The git commit id
    
            :param repository_id: The git repository id - same for copies
    
            :param dms: An external data managment system object or external_dms_id
            :param sample: A sample object or sample id.
    
            :param dss_code: Code for the DSS -- defaults to the first dss if none is supplied.
    
            :param properties: Properties for the data set.
            :param parents: Parents for the data set.
    
            :param data_set_code: A data set code -- used if provided, otherwise generated on the server
    
            :param contents: A list of dicts that describe the contents:
                {'file_length': [file length],
                 'crc32': [crc32 checksum],
                 'directory': [is path a directory?]
                 'path': [the relative path string]}
    
            return pbds.GitDataSetCreation(self, data_set_type, path, commit_id, repository_id, dms, sample, experiment,
    
                                           properties, dss_code, parents, data_set_code, contents).new_git_data_set()
    
        def new_content_copy(self, path, commit_id, repository_id, edms_id, data_set_id):
            """
            Create a content copy in an existing link data set.
            :param path: path of the new content copy
            "param commit_id: commit id of the new content copy
            "param repository_id: repository id of the content copy
            "param edms_id: Id of the external data managment system of the content copy
            "param data_set_id: Id of the data set to which the new content copy belongs
            """
            return pbds.GitDataSetUpdate(self, path, commit_id, repository_id, edms_id, data_set_id).new_content_copy()
    
    
            """Take sample which may be a string or object and return an identifier for it."""
    
            return Openbis._object_to_object_id(sample, "as.dto.sample.id.SampleIdentifier", "as.dto.sample.id.SamplePermId");
    
        @staticmethod
        def experiment_to_experiment_id(experiment):
            """Take experiment which may be a string or object and return an identifier for it."""
            return Openbis._object_to_object_id(experiment, "as.dto.experiment.id.ExperimentIdentifier", "as.dto.experiment.id.SamplePermId");
    
        @staticmethod
        def _object_to_object_id(obj, identifierType, permIdType):
            object_id = None
            if isinstance(obj, str):
                if (is_identifier(obj)):
                    object_id = {
                        "identifier": obj,
                        "@type": identifierType