Skip to content
Snippets Groups Projects
pybis.py 149 KiB
Newer Older
  • Learn to ignore specific revisions
  •                 if attr in ['project']:
                        experiments[attr] = experiments[attr].map(extract_nested_identifier)
                    if attr in ['space']:
                        experiments[attr] = experiments[attr].map(extract_code)
    
    
                experiments['registrationDate'] = experiments['registrationDate'].map(format_timestamp)
                experiments['modificationDate'] = experiments['modificationDate'].map(format_timestamp)
                experiments['project'] = experiments['project'].map(extract_code)
                experiments['registrator'] = experiments['registrator'].map(extract_person)
                experiments['modifier'] = experiments['modifier'].map(extract_person)
                experiments['identifier'] = experiments['identifier'].map(extract_identifier)
                experiments['permId'] = experiments['permId'].map(extract_permid)
                experiments['type'] = experiments['type'].map(extract_code)
    
                    if prop == '*':
                        # include all properties in dataFrame.
                        # expand the dataFrame by adding new columns
                        columns = []
                        for i, experiment in enumerate(response):
                            for prop_name, val in experiment.get('properties',{}).items():
                                experiments.loc[i, prop_name.upper()] = val
                                columns.append(prop_name.upper())
    
                        display_attrs += set(columns)
                        continue
    
    
                        # property name is provided
                        for i, experiment in enumerate(response):
                            val = experiment.get('properties',{}).get(prop,'') or experiment.get('properties',{}).get(prop.upper(),'')
                            experiments.loc[i, prop.upper()] = val
                        display_attrs.append(prop.upper())
    
            return Things(
                openbis_obj = self,
                entity = 'experiment',
    
                identifier_name ='identifier',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
        get_collections = get_experiments  # Alias
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_datasets(
    
            self, code=None, type=None, withParents=None, withChildren=None,
    
            status=None, sample=None, experiment=None, collection=None, project=None,
    
            tags=None, attrs=None, props=None, **properties
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        ):
    
            """Returns a DataFrame of all dataSets for a given project/experiment/sample (or any combination)
            Filters:
            --------
            project      -- a project code or a project object
            experiment   -- an experiment code or an experiment object
            sample       -- a sample code/permId or a sample/object
            collection   -- same as experiment
            tags         -- only return dataSets with the specified tags
            type         -- a dataSetType code
    
            Paging:
            -------
            start_with   -- default=None
            count        -- number of dataSets that should be fetched. default=None.
    
            Include:
            --------
            withParents  -- the list of parent's permIds in a column 'parents'
            withChildren -- the list of children's permIds in a column 'children'
            attrs        -- list of all desired attributes. Examples:
                            project, experiment, sample: just return their identifier
                            space.code, project.code, experiment.code
                            registrator.email, registrator.firstName
                            type.generatedCodePrefix
            props        -- list of all desired properties. Returns an empty string if
                            a) property is not present
                            b) property is not defined for this dataSetType
            """
    
            if 'object' in properties:
                sample = properties['object']
            if collection is not None:
                experiment = collection
    
    
            sub_criteria = []
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
            if type:
    
                sub_criteria.append(_subcriteria_for_code(type, 'dataSetType'))
    
                sub_criteria.append(_subcriteria_for(withParents, 'dataSet', 'Parents'))
    
                sub_criteria.append(_subcriteria_for(withChildren, 'dataSet', 'Children'))
    
            if sample:
    
                sub_criteria.append(_subcriteria_for(sample, 'sample'))
    
            if experiment:
    
                sub_criteria.append(_subcriteria_for(experiment, 'experiment'))
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if project:
    
                exp_crit = _subcriteria_for(experiment, 'experiment')
                proj_crit = _subcriteria_for(project, 'project')
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                exp_crit['criteria'].append(proj_crit)
                sub_criteria.append(exp_crit)
    
            if tags:
                sub_criteria.append(_subcriteria_for_tags(tags))
    
            if status:
                sub_criteria.append(_subcriteria_for_status(status))
    
            if properties is not None:
                for prop in properties:
                    sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
    
            search_criteria = get_search_type_for_entity('dataset')
    
            search_criteria['criteria'] = sub_criteria
            search_criteria['operator'] = 'AND'
    
            fetchopts = get_fetchoptions('dataSet', including=['type'])
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
            if kind:
                kind = kind.upper()
                if kind not in ['PHYSICAL_DATA', 'CONTAINER', 'LINK']:
                    raise ValueError("unknown dataSet kind: {}. It should be one of the following: PHYSICAL_DATA, CONTAINER or LINK".format(kind))
                fetchopts['kind'] = kind
                raise NotImplementedError('you cannot search for dataSet kinds yet')
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if attrs is None: attrs = []
    
            options = self._get_fetchopts_for_attrs(attrs)
            for option in ['tags', 'properties', 'physicalData']+options:
    
            # get fetch options for projects and spaces
            # via experiment, if requested
            for attr in attrs:
                if any([entity in attr for entity in ['space','project']]):
                    fetchopts['experiment'] = fetch_option['experiment']
                    fetchopts['experiment']['project'] = fetch_option['project']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            
    
            }
            resp = self._post_request(self.as_v3, request)
    
            return self._dataset_list_for_response(
                response=resp['objects'],
    
                props=props,
                start_with=start_with,
                count=count,
    
                totalCount=resp['totalCount'],
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_experiment(self, code, withAttachments=False, only_data=False):
            """ Returns an experiment object for a given identifier (code).
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_request = _type_for_id(code, 'experiment')
    
            for option in ['tags', 'properties', 'attachments', 'project', 'samples', 'registrator', 'modifier']:
    
                fetchopts[option] = fetch_option[option]
    
            if withAttachments:
                fetchopts['attachments'] = fetch_option['attachmentsWithContent']
    
    
            request = {
    
                "method": "getExperiments",
                "params": [
    
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError(f"No such experiment: {code}")
    
            for id in resp:
                if only_data:
                    return resp[id]
                else:
                    return Experiment(
                        openbis_obj = self,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        type = self.get_experiment_type(resp[code]["type"]["code"]),
    
                        data = resp[id]
                    )
    
        get_collection = get_experiment  # Alias
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def new_experiment(self, type, code, project, props=None, **kwargs):
    
            """ Creates a new experiment of a given experiment type.
            """
    
            return Experiment(
    
                openbis_obj = self,
                type = self.get_experiment_type(type),
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                project = project,
    
                data = None,
                props = props,
    
        new_collection = new_experiment  # Alias
    
    
    
        def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
            params = {
                "experimentId": {
                    "permId": experimentId,
                    "@type": "as.dto.experiment.id.ExperimentPermId"
                },
                "@type": "as.dto.experiment.update.ExperimentUpdate"
            }
            if properties is not None:
    
                params["properties"] = properties
    
            if tagIds is not None:
                params["tagIds"] = tagIds
            if attachments is not None:
                params["attachments"] = attachments
    
            request = {
                "method": "updateExperiments",
                "params": [
                    self.token,
    
        update_collection = update_experiment  # Alias
    
        def create_external_data_management_system(self, code, label, address, address_type='FILE_SYSTEM'):
    
            """Create an external DMS.
            :param code: An openBIS code for the external DMS.
            :param label: A human-readable label.
            :param address: The address for accessing the external DMS. E.g., a URL.
    
            :param address_type: One of OPENBIS, URL, or FILE_SYSTEM
    
            :return:
            """
            request = {
                "method": "createExternalDataManagementSystems",
                "params": [
                    self.token,
                    [
                        {
                            "code": code,
                            "label": label,
                            "addressType": address_type,
                            "address": address,
                            "@type": "as.dto.externaldms.create.ExternalDmsCreation",
                        }
                    ]
                ],
            }
            resp = self._post_request(self.as_v3, request)
            return self.get_external_data_management_system(resp[0]['permId'])
    
    
        def update_sample(self, sampleId, space=None, project=None, experiment=None,
    
                          parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
    
            params = {
                "sampleId": {
                    "permId": sampleId,
                    "@type": "as.dto.sample.id.SamplePermId"
                },
                "@type": "as.dto.sample.update.SampleUpdate"
            }
    
            if space is not None:
                params['spaceId'] = space
            if project is not None:
                params['projectId'] = project
    
                params["properties"] = properties
    
            if tagIds is not None:
                params["tagIds"] = tagIds
            if attachments is not None:
                params["attachments"] = attachments
    
            request = {
                "method": "updateSamples",
                "params": [
                    self.token,
    
        update_object = update_sample # Alias
    
    
        def delete_entity(self, entity, id, reason, id_name='permId'):
    
            """Deletes Spaces, Projects, Experiments, Samples and DataSets
            """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            type = get_type_for_entity(entity, 'delete')
            method = get_method_for_entity(entity, 'delete')
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "method": method,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            "@type": type
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        "@type": type
    
            resp = self._post_request(self.as_v3, request)
    
    
        def delete_openbis_entity(self, entity, objectId, reason='No reason given'):
            method = get_method_for_entity(entity, 'delete')
            delete_options = get_type_for_entity(entity, 'delete')
            delete_options['reason'] = reason
    
            request = {
               "method": method,
               "params": [
                    self.token,
                    [ objectId ],
                    delete_options
                ]
            }
            resp = self._post_request(self.as_v3, request)
            return
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_deletions(self, start_with=None, count=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_criteria = {
                "@type": "as.dto.deletion.search.DeletionSearchCriteria"
            }
    
            fetchopts = fetch_option['deletion']
            fetchoptsDeleted = fetch_option['deletedObjects']
            fetchoptsDeleted['from'] = start_with
            fetchoptsDeleted['count'] = count
            fetchopts['deletedObjects'] = fetchoptsDeleted
    
            request = {
                "method": "searchDeletions",
                "params": [
                    self.token,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    search_criteria,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    fetchopts,
    
                ]
            }
            resp = self._post_request(self.as_v3, request)
            objects = resp['objects']
            parse_jackson(objects)
    
    
            for value in objects:
                del_objs = extract_deletion(value)
                if len(del_objs) > 0:
                    new_objs.append(*del_objs)
    
            return DataFrame(new_objs)
    
        def new_project(self, space, code, description=None, **kwargs):
            return Project(self, None, space=space, code=code, description=description, **kwargs)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _gen_fetchoptions(self, options, foType):
            fo = {
                "@type": foType
            }
    
            for option in options:
                fo[option] = fetch_option[option]
            return fo
    
    
        def get_project(self, projectId, only_data=False):
    
            """Returns a Project object for a given identifier, code or permId.
            """
    
            project = not only_data and self._object_cache(entity='project',code=projectId)
            if project:
                return project
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            options = ['space', 'registrator', 'modifier', 'attachments']
    
            if is_identifier(projectId) or is_permid(projectId):
    
                request = self._create_get_request(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    'getProjects', 'project', projectId, options,
                    "as.dto.project.fetchoptions.ProjectFetchOptions"
    
                )
                resp = self._post_request(self.as_v3, request)
    
                if only_data:
                    return resp[projectId]
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    openbis_obj=self, 
                    type=None,
                    data=resp[projectId]
                )
    
                if self.use_cache:
                    self._object_cache(entity='project', code=projectId, value=project)
                return project
    
            else:
                search_criteria = _gen_search_criteria({
                    'project': 'Project',
                    'operator': 'AND',
                    'code': projectId
                })
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                fo = self._gen_fetchoptions(options, foType="as.dto.project.fetchoptions.ProjectFetchOptions")
    
                request = {
                    "method": "searchProjects",
                    "params": [self.token, search_criteria, fo]
                }
                resp = self._post_request(self.as_v3, request)
    
                if len(resp['objects']) == 0:
                    raise ValueError("No such project: %s" % projectId)
    
                if only_data:
                    return resp['objects'][0]
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    openbis_obj=self, 
                    type=None,
                    data=resp['objects'][0]
                )
    
                if self.use_cache:
                    self._object_cache(entity='project', code=projectId, value=project)
                return project
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_projects(
            self, space=None, code=None,
            start_with=None, count=None,
        ):
    
            """ Get a list of all available projects (DataFrame object).
            """
    
            sub_criteria = []
            if space:
    
                sub_criteria.append(_subcriteria_for_code(space, 'space'))
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
    
    
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.project.search.ProjectSearchCriteria",
                "operator": "AND"
            }
    
    
            fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
            for option in ['registrator', 'modifier', 'leader']:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                fetchopts[option] = fetch_option[option]
    
    
            request = {
                "method": "searchProjects",
    
            }
            resp = self._post_request(self.as_v3, request)
    
    
            attrs = ['identifier', 'permId', 'leader', 'registrator', 'registrationDate', 'modifier', 'modificationDate']
            if len(resp['objects']) == 0:
                projects = DataFrame(columns=attrs)        
            else:
                objects = resp['objects']
                parse_jackson(objects)
    
                projects = DataFrame(objects)
    
                projects['registrationDate'] = projects['registrationDate'].map(format_timestamp)
                projects['modificationDate'] = projects['modificationDate'].map(format_timestamp)
                projects['leader'] = projects['leader'].map(extract_person)
                projects['registrator'] = projects['registrator'].map(extract_person)
                projects['modifier'] = projects['modifier'].map(extract_person)
                projects['permId'] = projects['permId'].map(extract_permid)
                projects['identifier'] = projects['identifier'].map(extract_identifier)
    
    
            return Things(
                openbis_obj = self,
                entity = 'project',
                df = projects[attrs],
                identifier_name = 'identifier',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _create_get_request(self, method_name, entity, permids, options, foType):
    
    
            if not isinstance(permids, list):
                permids = [permids]
    
    
            type = "as.dto.{}.id.{}".format(entity.lower(), entity.capitalize())
    
            search_params = []
            for permid in permids:
                # decide if we got a permId or an identifier
                match = re.match('/', permid)
                if match:
                    search_params.append(
    
                        {"identifier": permid, "@type": type + 'Identifier'}
    
                    search_params.append(
    
                        {"permId": permid, "@type": type + 'PermId'}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fo = {
                "@type": foType
            }
    
            for option in options:
                fo[option] = fetch_option[option]
    
            request = {
                "method": method_name,
                "params": [
                    self.token,
                    search_params,
                    fo
                ],
            }
            return request
    
    
        def clear_cache(self, entity=None):
            """Empty the internal object cache
            If you do not specify any entity, the complete cache is cleared.
            As entity, you can specify either:
            space, project, vocabulary, term, sampleType, experimentType, dataSetType
            """
            if entity:
                self.cache[entity] = {}
            else:
                self.cache = {}
    
        def _object_cache(self, entity=None, code=None, value=None):
    
            # return the value, if no value provided
            if value is None:
                if entity in self.cache:
                    return self.cache[entity].get(code)
            else:
                if entity not in self.cache:
                    self.cache[entity] = {}
    
                self.cache[entity][code] = value
    
    
        def get_terms(self, vocabulary=None, start_with=None, count=None):
    
            """ Returns information about existing vocabulary terms. 
            If a vocabulary code is provided, it only returns the terms of that vocabulary.
    
            if self.use_cache and vocabulary is not None and start_with is None and count is None:
                voc = self._object_cache(entity='term',code=vocabulary)
                if voc:
                    return voc
    
            search_request = {}
            if vocabulary is not None:
    
                search_request = _gen_search_criteria({
                    "vocabulary": "VocabularyTerm",
                    "criteria": [{
    
                        "vocabulary": "Vocabulary",
                        "code": vocabulary
                    }]
                })
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_request["@type"] = "as.dto.vocabulary.search.VocabularyTermSearchCriteria"
    
            fetchopts = fetch_option['vocabularyTerm']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
            request = {
                "method": "searchVocabularyTerms",
    
                "params": [self.token, search_request, fetchopts]
    
            }
            resp = self._post_request(self.as_v3, request)
    
            attrs = 'code vocabularyCode label description registrationDate modificationDate official ordinal'.split()
    
    
            if len(resp['objects']) == 0:
                terms = DataFrame(columns=attrs)
            else:
                objects = resp['objects']
    
                terms = DataFrame(objects)
    
                terms['vocabularyCode'] = terms['permId'].map(extract_attr('vocabularyCode'))
    
                terms['registrationDate'] = terms['registrationDate'].map(format_timestamp)
                terms['modificationDate'] = terms['modificationDate'].map(format_timestamp)
    
    
                openbis_obj = self,
                entity = 'term',
                df = terms[attrs],
                identifier_name='code',
                additional_identifier='vocabularyCode',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
            if self.use_cache and vocabulary is not None and start_with is None and count is None:
                self._object_cache(entity='term',code=vocabulary,value=things)
    
            
    
        def new_term(self, code, vocabularyCode, label=None, description=None):
            return VocabularyTerm(
    
                self, data=None,
                code=code, vocabularyCode=vocabularyCode,
    
                label=label, description=description
            )
    
    
    
        def get_term(self, code, vocabularyCode, only_data=False):
            entity_def = get_definition_for_entity('vocabularyTerm')
    
            search_request = {
                "code": code,
                "vocabularyCode": vocabularyCode,
                "@type": "as.dto.vocabulary.id.VocabularyTermPermId"
            }
    
            fetchopts = get_fetchoption_for_entity('vocabularyTerm')
    
            for opt in ['registrator']:
                fetchopts[opt] = get_fetchoption_for_entity(opt)
            
            request = {
                "method": 'getVocabularyTerms',
                "params": [
                    self.token,
                    [search_request],
                    fetchopts
                ],
            }
            resp = self._post_request(self.as_v3, request)
    
            if resp is None or len(resp) == 0:
                raise ValueError("no VocabularyTerm found with code='{}' and vocabularyCode='{}'".format(code, vocabularyCode))
            else:
                parse_jackson(resp)
                for ident in resp:
                    if only_data:
                        return resp[ident]
                    else:
                        return VocabularyTerm(self, resp[ident])
    
    
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_vocabularies(self, code=None, start_with=None, count=None):
    
            """ Returns information about vocabulary
            """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            sub_criteria = []
            if code:
                sub_criteria.append(_criteria_for_code(code))
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.vocabulary.search.VocabularySearchCriteria",
                "operator": "AND"
            }
    
    
            fetchopts = fetch_option['vocabulary']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
            for option in ['registrator']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchVocabularies",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "params": [self.token, criteria, fetchopts]
    
            }
            resp = self._post_request(self.as_v3, request)
    
    
            attrs = 'code description managedInternally chosenFromList urlTemplate registrator registrationDate modificationDate'.split()
    
    
            if len(resp['objects']) == 0:
                vocs = DataFrame(columns=attrs)
            else:
                objects = resp['objects']
                parse_jackson(resp)
                vocs = DataFrame(objects)
                vocs['registrationDate'] = vocs['registrationDate'].map(format_timestamp)
                vocs['modificationDate'] = vocs['modificationDate'].map(format_timestamp)
                vocs['registrator']      = vocs['registrator'].map(extract_person)
    
    
            return Things(
                openbis_obj = self,
                entity = 'vocabulary',
                df = vocs[attrs],
                identifier_name = 'code',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
        def get_vocabulary(self, code, only_data=False):
    
            """ Returns the details of a given vocabulary (including vocabulary terms)
            """
    
            code = str(code).upper()
            voc = not only_data and self._object_cache(entity='vocabulary',code=code)
            if voc:
                return voc
    
            entity = 'vocabulary'
            method_name = get_method_for_entity(entity, 'get')
            objectIds = _type_for_id(code.upper(), entity)
            fetchopts = fetch_option[entity]
            
            request = {
                "method": method_name,
                "params": [
                    self.token,
                    [objectIds],
                    fetchopts
                ],
            }
            resp = self._post_request(self.as_v3, request)
    
            if len(resp) == 0:
                raise ValueError('no {} found with identifier: {}'.format(entity, code))
            else:
                parse_jackson(resp)
                for ident in resp:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    data = resp[ident]
                    if only_data:
                        return data
                    vocabulary = Vocabulary( openbis_obj=self, data=data)
    
                    if self.use_cache:
                        self._object_cache(entity='vocabulary', code=code, value=vocabulary)
                    return vocabulary
    
        def new_tag(self, code, description=None):
            """ Creates a new tag (for this user)
            """
            return Tag(self, code=code, description=description)
    
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_tags(self, code=None, start_with=None, count=None):
    
    
            search_criteria = get_search_type_for_entity('tag', 'AND')
    
            criteria = []
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts = fetch_option['tag']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for option in ['owner']:
                fetchopts[option] = fetch_option[option]
    
            if code:
                criteria.append(_criteria_for_code(code))
            search_criteria['criteria'] = criteria
    
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts
                ]
    
            resp = self._post_request(self.as_v3, request)
    
            return self._tag_list_for_response(response=resp['objects'], totalCount=resp['totalCount'])
    
    
        def get_tag(self, permId, only_data=False):
    
            just_one = True
            identifiers = []
            if isinstance(permId, list):
                just_one = False
                for ident in permId:
                    identifiers.append(_type_for_id(ident, 'tag'))
            else:
    
                tag = not only_data and self._object_cache(entity='tag',code=permId)
                if tag:
                    return tag
    
                identifiers.append(_type_for_id(permId, 'tag'))
    
            fetchopts = fetch_option['tag']
            for option in ['owner']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "getTags",
                "params": [
                    self.token,
    
            resp = self._post_request(self.as_v3, request)
    
    
            if just_one:
                if len(resp) == 0:
                    raise ValueError('no such tag found: {}'.format(permId))
    
    
                parse_jackson(resp)
                for permId in resp:
                    if only_data:
                        return resp[permId]
                    else:
    
                        tag = Tag(self, data=resp[permId])
                        if self.use_cache:
                            self._object_cache(entity='tag',code=permId,value=tag)
                        return tag
    
                return self._tag_list_for_response( response=list(resp.values()) )
    
        def _tag_list_for_response(self, response, totalCount=0):
    
    
            parse_jackson(response)
            attrs = ['permId', 'code', 'description', 'owner', 'private', 'registrationDate']
            if len(response) == 0:
                tags = DataFrame(columns = attrs)
            else: 
                tags = DataFrame(response)
                tags['registrationDate'] = tags['registrationDate'].map(format_timestamp)
                tags['permId']           = tags['permId'].map(extract_permid)
                tags['description']      = tags['description'].map(lambda x: '' if x is None else x)
                tags['owner']            = tags['owner'].map(extract_person)
    
    
            return Things(
                openbis_obj = self,
                entity = 'tag',
                df = tags[attrs],
                identifier_name ='permId',
    
                totalCount = totalCount,
    
        def search_semantic_annotations(self, 
            permId=None, entityType=None, propertyType=None, only_data=False
        ):
    
            """ Get a list of semantic annotations for permId, entityType, propertyType or 
            property type assignment (DataFrame object).
            :param permId: permId of the semantic annotation.
            :param entityType: entity (sample) type to search for.
            :param propertyType: property type to search for
            :param only_data: return result as plain data object.
            :return:  Things of DataFrame objects or plain data object
            """
    
    
            criteria = []
            typeCriteria = []
    
            if permId is not None:
                criteria.append({
                    "@type" : "as.dto.common.search.PermIdSearchCriteria",
                    "fieldValue" : {
                        "@type" : "as.dto.common.search.StringEqualToValue",
                        "value" : permId
                    }
                })
    
            if entityType is not None:
                typeCriteria.append({
                    "@type" : "as.dto.entitytype.search.EntityTypeSearchCriteria",
                    "criteria" : [_criteria_for_code(entityType)]
                })
    
            if propertyType is not None:
                typeCriteria.append({
                    "@type" : "as.dto.property.search.PropertyTypeSearchCriteria",
                    "criteria" : [_criteria_for_code(propertyType)]
                })
    
            if entityType is not None and propertyType is not None:
                criteria.append({
                    "@type" : "as.dto.property.search.PropertyAssignmentSearchCriteria",
                    "criteria" : typeCriteria
                })
            else:
                criteria += typeCriteria
    
            saCriteria = {
                "@type" : "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria",
                "criteria" : criteria
            }
    
            objects = self._search_semantic_annotations(saCriteria)
    
            if only_data:
                return objects
    
    
            attrs = ['permId', 'entityType', 'propertyType', 'predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId', 'creationDate']
            if len(objects) == 0:
                annotations = DataFrame(columns=attrs)
    
            else:
                annotations = DataFrame(objects)
    
            return Things(
                openbis_obj = self,
                entity = 'semantic_annotation',
                df = annotations[attrs],
                identifier_name = 'permId',
            )
    
        def _search_semantic_annotations(self, criteria):
    
            fetch_options = {
                "@type": "as.dto.semanticannotation.fetchoptions.SemanticAnnotationFetchOptions",
                "entityType": {"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"},
                "propertyType": {"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"},
                "propertyAssignment": {
                    "@type": "as.dto.property.fetchoptions.PropertyAssignmentFetchOptions",
                    "entityType" : {
                        "@type" : "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
                    },
                    "propertyType" : {
                        "@type" : "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
                    }
                }
            }
    
            request = {
                "method": "searchSemanticAnnotations",
                "params": [self.token, criteria, fetch_options]
            }
    
            resp = self._post_request(self.as_v3, request)
    
                objects = resp['objects']
                parse_jackson(objects)
                
                for object in objects:
                    object['permId'] = object['permId']['permId']
                    if object.get('entityType') is not None:
                        object['entityType'] = object['entityType']['code']
                    elif object.get('propertyType') is not None:
                        object['propertyType'] = object['propertyType']['code']
                    elif object.get('propertyAssignment') is not None:
                        object['entityType'] = object['propertyAssignment']['entityType']['code']
                        object['propertyType'] = object['propertyAssignment']['propertyType']['code']
                    object['creationDate'] = format_timestamp(object['creationDate'])
                    
                return objects
    
    
        def get_semantic_annotations(self):
            """ Get a list of all available semantic annotations (DataFrame object).
            """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            objects = self._search_semantic_annotations({
                "@type": "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria"
            })
    
            attrs = ['permId', 'entityType', 'propertyType', 'predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId', 'creationDate']
    
            if len(objects) == 0:
                annotations = DataFrame(columns=attrs)
            else:
                annotations = DataFrame(objects)
    
            return Things(
                openbis_obj = self,
                entity = 'semantic_annotation',
                df = annotations[attrs],
                identifier_name = 'permId',
            )
    
        def get_semantic_annotation(self, permId, only_data = False):
            objects = self.search_semantic_annotations(permId=permId, only_data=True)
    
            if len(objects) == 0:
                raise ValueError("Semantic annotation with permId " + permId +  " not found.")
    
                return SemanticAnnotation(self, isNew=False, **obj)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_plugins(self, start_with=None, count=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            criteria = []
            search_criteria = get_search_type_for_entity('plugin', 'AND')
            search_criteria['criteria'] = criteria
    
            fetchopts = fetch_option['plugin']
            for option in ['registrator']:
                fetchopts[option] = fetch_option[option]
    
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            request = {
                "method": "searchPlugins",