Skip to content
Snippets Groups Projects
pybis.py 113 KiB
Newer Older
  • Learn to ignore specific revisions
  • Swen Vermeul's avatar
    Swen Vermeul committed
            fo = {
                "@type": foType
            }
    
            for option in options:
                fo[option] = fetch_option[option]
    
            request = {
                "method": method_name,
                "params": [
                    self.token,
                    search_params,
                    fo
                ],
            }
            return request
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_terms(self, vocabulary=None, start_with=None, count=None):
    
            """ Returns information about existing vocabulary terms. 
            If a vocabulary code is provided, it only returns the terms of that vocabulary.
    
            search_request = {}
            if vocabulary is not None:
    
                search_request = _gen_search_criteria({
                    "vocabulary": "VocabularyTerm",
                    "criteria": [{
    
                        "vocabulary": "Vocabulary",
                        "code": vocabulary
                    }]
                })
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_request["@type"] = "as.dto.vocabulary.search.VocabularyTermSearchCriteria"
    
            fetchopts = fetch_option['vocabularyTerm']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
            request = {
                "method": "searchVocabularyTerms",
    
                "params": [self.token, search_request, fetchopts]
    
            }
            resp = self._post_request(self.as_v3, request)
    
            attrs = 'code vocabularyCode label description registrationDate modificationDate official ordinal'.split()
    
    
            if len(resp['objects']) == 0:
                terms = DataFrame(columns=attrs)
            else:
                objects = resp['objects']
    
                terms = DataFrame(objects)
    
                terms['vocabularyCode'] = terms['permId'].map(extract_attr('vocabularyCode'))
    
                terms['registrationDate'] = terms['registrationDate'].map(format_timestamp)
                terms['modificationDate'] = terms['modificationDate'].map(format_timestamp)
    
    
            return Things(
                openbis_obj = self,
                entity = 'term',
                df = terms[attrs],
                identifier_name='code',
                additional_identifier='vocabularyCode',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
            
    
        def new_term(self, code, vocabularyCode, label=None, description=None):
            return VocabularyTerm(
    
                self, data=None,
                code=code, vocabularyCode=vocabularyCode,
    
                label=label, description=description
            )
    
    
        def get_term(self, code, vocabularyCode=None, only_data=False):
            entity_def = get_definition_for_entity('VocabularyTerm')
            search_request = {
                "code": code,
                "vocabularyCode": vocabularyCode,
                "@type": "as.dto.vocabulary.id.VocabularyTermPermId"
            }
            fetchopts = get_fetchoption_for_entity('VocabularyTerm')
            for opt in ['registrator']:
                fetchopts[opt] = get_fetchoption_for_entity(opt)
            
            request = {
                "method": 'getVocabularyTerms',
                "params": [
                    self.token,
                    [search_request],
                    fetchopts
                ],
            }
            resp = self._post_request(self.as_v3, request)
    
            if resp is None or len(resp) == 0:
                raise ValueError("no VocabularyTerm found with code='{}' and vocabularyCode='{}'".format(code, vocabularyCode))
            else:
                parse_jackson(resp)
                for ident in resp:
                    if only_data:
                        return resp[ident]
                    else:
                        return VocabularyTerm(self, resp[ident])
    
    
    
        def _get_any_entity(self, identifier, entity, only_data=False, method=None):
    
    
            entity_def = get_definition_for_entity(entity)
    
            # guess the method to fetch an entity
            if method is None:
                method = 'get' + entity + 's'
    
    
            search_request = _type_for_id(identifier, entity)
    
            fetchopts = get_fetchoption_for_entity(entity)
            
            request = {
                "method": method,
                "params": [
                    self.token,
                    [search_request],
                    fetchopts
                ],
            }
            resp = self._post_request(self.as_v3, request)
    
            if resp is None or len(resp) == 0:
                raise ValueError('no {} found with identifier: {}'.format(entity, identifier))
            else:
                parse_jackson(resp)
                for ident in resp:
                    if only_data:
                        return resp[ident]
                    else:
                        # get a dynamic instance of that class
                        klass = globals()[entity]
                        return klass(self, resp[ident])
    
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_vocabularies(self, code=None, start_with=None, count=None):
    
            """ Returns information about vocabulary
            """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            sub_criteria = []
            if code:
                sub_criteria.append(_criteria_for_code(code))
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.vocabulary.search.VocabularySearchCriteria",
                "operator": "AND"
            }
    
    
            fetchopts = fetch_option['vocabulary']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
            for option in ['registrator']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchVocabularies",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "params": [self.token, criteria, fetchopts]
    
            }
            resp = self._post_request(self.as_v3, request)
    
            attrs = 'code description managedInternally internalNameSpace chosenFromList urlTemplate registrator registrationDate modificationDate'.split()
    
            if len(resp['objects']) == 0:
                vocs = DataFrame(columns=attrs)
            else:
                objects = resp['objects']
                parse_jackson(resp)
                vocs = DataFrame(objects)
                vocs['registrationDate'] = vocs['registrationDate'].map(format_timestamp)
                vocs['modificationDate'] = vocs['modificationDate'].map(format_timestamp)
                vocs['registrator']      = vocs['registrator'].map(extract_person)
    
    
            return Things(
                openbis_obj = self,
                entity = 'vocabulary',
                df = vocs[attrs],
                identifier_name = 'code',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
    
    
        def get_vocabulary(self, code, only_data=False):
            """ Returns the details of a given vocabulary (including vocabulary terms)
            """
    
            return self._get_any_entity(code, 'Vocabulary', only_data=only_data, method='getVocabularies')
    
        def new_tag(self, code, description=None):
            """ Creates a new tag (for this user)
            """
            return Tag(self, code=code, description=description)
    
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_tags(self, code=None, start_with=None, count=None):
    
    
            search_criteria = get_search_type_for_entity('tag', 'AND')
    
            criteria = []
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts = fetch_option['tag']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts['from'] = start_with
            fetchopts['count'] = count
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for option in ['owner']:
                fetchopts[option] = fetch_option[option]
    
            if code:
                criteria.append(_criteria_for_code(code))
            search_criteria['criteria'] = criteria
    
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts
                ]
    
            resp = self._post_request(self.as_v3, request)
    
            return self._tag_list_for_response(response=resp['objects'], totalCount=resp['totalCount'])
    
    
        def get_tag(self, permId, only_data=False):
    
            just_one = True
            identifiers = []
            if isinstance(permId, list):
                just_one = False
                for ident in permId:
                    identifiers.append(_type_for_id(ident, 'tag'))
            else:
                identifiers.append(_type_for_id(permId, 'tag'))
    
            fetchopts = fetch_option['tag']
            for option in ['owner']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "getTags",
                "params": [
                    self.token,
    
            resp = self._post_request(self.as_v3, request)
    
    
            if just_one:
                if len(resp) == 0:
                    raise ValueError('no such tag found: {}'.format(permId))
    
    
                parse_jackson(resp)
                for permId in resp:
                    if only_data:
                        return resp[permId]
                    else:
                        return Tag(self, data=resp[permId])
    
                return self._tag_list_for_response( response=list(resp.values()) )
    
        def _tag_list_for_response(self, response, totalCount=0):
    
    
            parse_jackson(response)
            attrs = ['permId', 'code', 'description', 'owner', 'private', 'registrationDate']
            if len(response) == 0:
                tags = DataFrame(columns = attrs)
            else: 
                tags = DataFrame(response)
                tags['registrationDate'] = tags['registrationDate'].map(format_timestamp)
                tags['permId']           = tags['permId'].map(extract_permid)
                tags['description']      = tags['description'].map(lambda x: '' if x is None else x)
                tags['owner']            = tags['owner'].map(extract_person)
    
    
            return Things(
                openbis_obj = self,
                entity = 'tag',
                df = tags[attrs],
                identifier_name ='permId',
    
                totalCount = totalCount,
    
        def search_semantic_annotations(self, 
            permId=None, entityType=None, propertyType=None, only_data=False
        ):
    
            """ Get a list of semantic annotations for permId, entityType, propertyType or 
            property type assignment (DataFrame object).
            :param permId: permId of the semantic annotation.
            :param entityType: entity (sample) type to search for.
            :param propertyType: property type to search for
            :param only_data: return result as plain data object.
            :return:  Things of DataFrame objects or plain data object
            """
    
    
            criteria = []
            typeCriteria = []
    
            if permId is not None:
                criteria.append({
                    "@type" : "as.dto.common.search.PermIdSearchCriteria",
                    "fieldValue" : {
                        "@type" : "as.dto.common.search.StringEqualToValue",
                        "value" : permId
                    }
                })
    
            if entityType is not None:
                typeCriteria.append({
                    "@type" : "as.dto.entitytype.search.EntityTypeSearchCriteria",
                    "criteria" : [_criteria_for_code(entityType)]
                })
    
            if propertyType is not None:
                typeCriteria.append({
                    "@type" : "as.dto.property.search.PropertyTypeSearchCriteria",
                    "criteria" : [_criteria_for_code(propertyType)]
                })
    
            if entityType is not None and propertyType is not None:
                criteria.append({
                    "@type" : "as.dto.property.search.PropertyAssignmentSearchCriteria",
                    "criteria" : typeCriteria
                })
            else:
                criteria += typeCriteria
    
            saCriteria = {
                "@type" : "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria",
                "criteria" : criteria
            }
    
            objects = self._search_semantic_annotations(saCriteria)
    
            if only_data:
                return objects
    
    
            attrs = ['permId', 'entityType', 'propertyType', 'predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId', 'creationDate']
            if len(objects) == 0:
                annotations = DataFrame(columns=attrs)
    
            else:
                annotations = DataFrame(objects)
    
            return Things(
                openbis_obj = self,
                entity = 'semantic_annotation',
                df = annotations[attrs],
                identifier_name = 'permId',
            )
    
        def _search_semantic_annotations(self, criteria):
    
            fetch_options = {
                "@type": "as.dto.semanticannotation.fetchoptions.SemanticAnnotationFetchOptions",
                "entityType": {"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"},
                "propertyType": {"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"},
                "propertyAssignment": {
                    "@type": "as.dto.property.fetchoptions.PropertyAssignmentFetchOptions",
                    "entityType" : {
                        "@type" : "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
                    },
                    "propertyType" : {
                        "@type" : "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
                    }
                }
            }
    
            request = {
                "method": "searchSemanticAnnotations",
                "params": [self.token, criteria, fetch_options]
            }
    
            resp = self._post_request(self.as_v3, request)
    
                objects = resp['objects']
                parse_jackson(objects)
                
                for object in objects:
                    object['permId'] = object['permId']['permId']
                    if object.get('entityType') is not None:
                        object['entityType'] = object['entityType']['code']
                    elif object.get('propertyType') is not None:
                        object['propertyType'] = object['propertyType']['code']
                    elif object.get('propertyAssignment') is not None:
                        object['entityType'] = object['propertyAssignment']['entityType']['code']
                        object['propertyType'] = object['propertyAssignment']['propertyType']['code']
                    object['creationDate'] = format_timestamp(object['creationDate'])
                    
                return objects
    
    
        def get_semantic_annotations(self):
            """ Get a list of all available semantic annotations (DataFrame object).
            """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            objects = self._search_semantic_annotations({
                "@type": "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria"
            })
    
            attrs = ['permId', 'entityType', 'propertyType', 'predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId', 'creationDate']
    
            if len(objects) == 0:
                annotations = DataFrame(columns=attrs)
            else:
                annotations = DataFrame(objects)
    
            return Things(
                openbis_obj = self,
                entity = 'semantic_annotation',
                df = annotations[attrs],
                identifier_name = 'permId',
            )
    
        def get_semantic_annotation(self, permId, only_data = False):
            objects = self.search_semantic_annotations(permId=permId, only_data=True)
    
            if len(objects) == 0:
                raise ValueError("Semantic annotation with permId " + permId +  " not found.")
    
            object = objects[0]
            if only_data:
                return object
            else:
    
                return SemanticAnnotation(self, isNew=False, **object)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_plugins(self, start_with=None, count=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            criteria = []
            search_criteria = get_search_type_for_entity('plugin', 'AND')
            search_criteria['criteria'] = criteria
    
            fetchopts = fetch_option['plugin']
            for option in ['registrator']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchPlugins",
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts,
                ],
            }
            resp = self._post_request(self.as_v3, request)
            attrs = ['name', 'description', 'pluginType', 'pluginKind',
            'entityKinds', 'registrator', 'registrationDate', 'permId']
    
            if len(resp['objects']) == 0:
                plugins = DataFrame(columns=attrs)
            else:
                objects = resp['objects']
                parse_jackson(objects)
    
                plugins = DataFrame(objects)
                plugins['permId'] = plugins['permId'].map(extract_permid)
                plugins['registrator'] = plugins['registrator'].map(extract_person)
                plugins['registrationDate'] = plugins['registrationDate'].map(format_timestamp)
                plugins['description'] = plugins['description'].map(lambda x: '' if x is None else x)
                plugins['entityKinds'] = plugins['entityKinds'].map(lambda x: '' if x is None else x)
    
    
            return Things(
                openbis_obj = self,
                entity = 'plugin',
                df = plugins[attrs],
                identifier_name = 'name',
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
        def get_plugin(self, permId, only_data=False, with_script=True):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts = fetch_option['plugin']
            options = ['registrator']
            if with_script:
                options.append('script')
    
            for option in options:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "getPlugins",
                "params": [
                    self.token,
                    [search_request],
                    fetchopts
                ],
            }
    
            resp = self._post_request(self.as_v3, request)
            parse_jackson(resp)
    
            if resp is None or len(resp) == 0:
                raise ValueError('no such plugin found: ' + permId)
            else:
                for permId in resp:
                    if only_data:
                        return resp[permId]
                    else:
                        return Plugin(self, data=resp[permId])
    
    
        def new_plugin(self, pluginType= "MANAGED_PROPERTY", pluginKind = "JYTHON", **kwargs):
    
            """ Note: not functional yet. Creates a new Plugin in openBIS. The attribute pluginKind must be one of
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            the following:
            DYNAMIC_PROPERTY, MANAGED_PROPERTY, ENTITY_VALIDATION;
    
            Usage::
                o.new_plugin(
                    name = 'name of plugin',
                    description = '...',
                    pluginType  = "ENTITY_VALIDATION",
                    script      = "def a():\n  pass",
                    available   = True,
                    entityKind  = None
                )
            """
    
            if pluginType not in [
                'DYNAMIC_PROPERTY', 'MANAGED_PROPERTY', 'ENTITY_VALIDATION'
            ]:
                raise ValueError(
                    "pluginType must be one of the following: DYNAMIC_PROPERTY, MANAGED_PROPERTY, ENTITY_VALIDATION")
            return Plugin(self, pluginType=pluginType, pluginKind=pluginKind, **kwargs) 
    
            
    
        def new_property_type(self, **kwargs):
            pass
    
        def get_property_type(self, code):
            pass
    
        def get_property_types(self, start_with=None, count=None, **search_args):
            fetchopts = fetch_option['propertyType']
            fetchopts['from'] = start_with
            fetchopts['count'] = count
            search_criteria = get_search_criteria('propertyType', **search_args)
    
            request = {
                "method": "searchPropertyTypes",
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts,
                ],
            }
            attrs = openbis_definitions('propertyType')['attrs']
            resp = self._post_request(self.as_v3, request)
            if len(resp['objects']) == 0:
                experiments = DataFrame(columns=attrs)
            else:
                objects = resp['objects']
                parse_jackson(objects)
                df = DataFrame(objects)
                df['registrationDate'] = df['registrationDate'].map(format_timestamp)
                df['registrator'] = df['registrator'].map(extract_person)
    
            return Things(
                openbis_obj = self,
                entity = 'propertyType',
                df = df[attrs],
                start_with = start_with,
                count = count,
                totalCount = resp.get('totalCount'),
            )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_sample_types(self, type=None, start_with=None, count=None):
    
            """ Returns a list of all available sample types
            """
            return self._get_types_of(
    
                method_name         = "searchSampleTypes",
                entity              = "Sample",
                type_name           = type,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                optional_attributes = ["generatedCodePrefix"],
                start_with          = start_with,
                count               = count,
    
        get_object_types = get_sample_types # Alias
    
    
                method_name         = "searchSampleTypes",
                entity              = "Sample",
                type_name           = type,
                optional_attributes = ["generatedCodePrefix", "validationPluginId"]
            )
    
            return SampleType(self, data=entityType.data)
    
        get_object_type = get_sample_type # Alias
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_experiment_types(self, type=None, start_with=None, count=None):
    
            """ Returns a list of all available experiment types
            """
            return self._get_types_of(
    
                method_name         = "searchExperimentTypes",
                entity              = "Experiment",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                type_name           = type,
                start_with          = start_with,
                count               = count,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        get_collection_types = get_experiment_types  # Alias
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    method_name     = "searchExperimentTypes",
                    entity          = "Experiment",
                    type_name       = type
    
    Swen Vermeul's avatar
    Swen Vermeul committed
               raise ValueError("No such experiment type: {}".format(type))
        get_collection_type = get_experiment_type  # Alias
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_material_types(self, type=None, start_with=None, count=None):
    
            """ Returns a list of all available material types
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return self._get_types_of(
    
                method_name         = "searchMaterialTypes",
                entity              = "Material",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                type_name           = type,
                start_with          = start_with,
                count               = count,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            )
    
                return self._get_types_of(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    method_name     = "searchMaterialTypes", 
                    entity          = "Material", 
                    type_name       = type
    
            except Exception:
                raise ValueError("No such material type: {}".format(type))
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_dataset_types(self, type=None, start_with=None, count=None):
    
            """ Returns a list (DataFrame object) of all currently available dataset types
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return self._get_types_of(
    
                "searchDataSetTypes", "DataSet", type,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                optional_attributes=['kind'],
                start_with=start_with,
                count=count,
            )
    
            return self._get_types_of(
                method_name         = "searchDataSetTypes", 
                entity              = "DataSet",
                type_name           = type,
                optional_attributes = ['kind']
            )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _get_types_of(
    
            self, method_name, entity, type_name=None,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            start_with=None, count=None,
            additional_attributes=None, optional_attributes=None
        ):
    
            """ Returns a list of all available types of an entity.
            If the name of the entity-type is given, it returns a PropertyAssignments object
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if additional_attributes is None:
                additional_attributes = []
    
            if optional_attributes is None:
                optional_attributes = []
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_request = {
                "@type": "as.dto.{}.search.{}TypeSearchCriteria".format(
                    entity.lower(), entity
                )
            }
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetch_options = {
                "@type": "as.dto.{}.fetchoptions.{}TypeFetchOptions".format(
                    entity.lower(), entity
                )
            }
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetch_options['from'] = start_with
            fetch_options['count'] = count
    
            if type_name is not None:
                search_request = _gen_search_criteria({
    
                    entity.lower(): entity + "Type",
    
                    "code": type_name
    
                fetch_options['propertyAssignments'] = fetch_option['propertyAssignments']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if self.get_server_information().api_version > '3.3':
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    fetch_options['validationPlugin'] = fetch_option['plugin']
    
            request = {
                "method": method_name,
    
                "params": [self.token, search_request, fetch_options],
    
            }
            resp = self._post_request(self.as_v3, request)
    
            if type_name is not None:
                if len(resp['objects']) == 1:
                    return EntityType(
                        openbis_obj = self,
                        data        = resp['objects'][0]
                    )
                elif len(resp['objects']) == 0:
                    raise ValueError("No such {} type: {}".format(entity, type_name))
                else:
                    raise ValueError("There is more than one entry for entity={} and type={}".format(entity, type_name))
    
            types = []
            attrs = self._get_attributes(type_name, types, additional_attributes, optional_attributes)
            if len(resp['objects']) == 0:
                types = DataFrame(columns=attrs)
    
                objects = resp['objects']
                parse_jackson(objects)
                types = DataFrame(objects)
                types['modificationDate'] = types['modificationDate'].map(format_timestamp)
    
            return Things(
                openbis_obj = self,
                entity = entity.lower() + '_type',
                df = types[attrs],
                start_with = start_with,
                count = count,
    
                totalCount = resp.get('totalCount'),
    
        def _get_attributes(self, type_name, types, additional_attributes, optional_attributes):
            attributes = ['code', 'description'] + additional_attributes
            attributes += [attribute for attribute in optional_attributes if attribute in types]
            attributes += ['modificationDate']
            if type_name is not None:
                attributes += ['propertyAssignments']
            return attributes
    
    
            """ checks whether a session is still active. Returns true or false.
            """
    
            This method is useful to check if a token is still valid or if it has timed out,
            requiring the user to login again.
    
            :return: Return True if the token is valid, False if it is not valid.
            """
    
            request = {
                "method": "isSessionActive",
    
            try:
                resp = self._post_request(self.as_v1, request)
            except Exception as e:
                return False
    
    
            return resp
    
        def get_dataset(self, permIds, only_data=False, props=None):
    
            """fetch a dataset and some metadata attached to it:
            - properties
            - sample
            - parents
            - children
            - containers
            - dataStore
            - physicalData
            - linkedData
            :return: a DataSet object
            """
    
            just_one = True
            identifiers = []
            if isinstance(permIds, list):
                just_one = False
                for permId in permIds:
                    identifiers.append(
                        _type_for_id(permId, 'dataset')
                    )
            else:
                identifiers.append(
                    _type_for_id(permIds, 'dataset')
                )
    
            for option in ['tags', 'properties', 'dataStore', 'physicalData', 'linkedData',
    
                           'experiment', 'sample', 'registrator', 'modifier']:
    
                fetchopts[option] = fetch_option[option]
    
            request = {
    
                "method": "getDataSets",
    
            resp = self._post_request(self.as_v3, request)
    
            if just_one:
                if len(resp) == 0:
                    raise ValueError('no such dataset found: {}'.format(permIds))
    
                parse_jackson(resp)
    
                for permId in resp:
                    if only_data:
                        return resp[permId]
                    else:
                        return DataSet(
    
                            openbis_obj = self,
    
                            type = self.get_dataset_type(resp[permId]["type"]["code"]),
                            data = resp[permId]
                        )
            else:
                return self._dataset_list_for_response(response=list(resp.values()), props=props)
    
        def _dataset_list_for_response(
            self, response, props=None, 
            start_with=None, count=None, totalCount=0
        ):
    
            """returns a Things object, containing a DataFrame plus some additional information
            """
    
            parse_jackson(response)
    
            attrs = ['permId', 'type', 'experiment', 'sample',
    
                     'registrationDate', 'modificationDate',
                     'location', 'status', 'presentInArchive', 'size',
                     'properties'
                    ]
    
            if len(response) == 0:
                datasets = DataFrame(columns=attrs)
            else:
                datasets = DataFrame(response)
                datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
                datasets['modificationDate'] = datasets['modificationDate'].map(format_timestamp)
                datasets['experiment'] = datasets['experiment'].map(extract_nested_identifier)
                datasets['sample'] = datasets['sample'].map(extract_nested_identifier)
                datasets['type'] = datasets['type'].map(extract_code)
                datasets['permId'] = datasets['code']
    
                datasets['size'] = datasets['physicalData'].map(lambda x: x.get('size') if x else '')
                datasets['status'] = datasets['physicalData'].map(lambda x: x.get('status') if x else '')
                datasets['presentInArchive'] = datasets['physicalData'].map(lambda x: x.get('presentInArchive') if x else '')
    
                datasets['location'] = datasets['physicalData'].map(lambda x: x.get('location') if x else '')
    
            if props is not None:
    
                if isinstance(props, str):
                    props = [props]
    
                for prop in props:
                    datasets[prop.upper()] = datasets['properties'].map(lambda x: x.get(prop.upper(), ''))
                    attrs.append(prop.upper())
    
    
            return Things(
                openbis_obj = self,
                entity = 'dataset',
                df = datasets[attrs],
                identifier_name = 'permId',
                start_with=start_with,
                count=count,
    
                totalCount=totalCount,
    
    
    
        def get_sample(self, sample_ident, only_data=False, withAttachments=False, props=None):
    
            """Retrieve metadata for the sample.
            Get metadata for the sample and any directly connected parents of the sample to allow access
            to the same information visible in the ELN UI. The metadata will be on the file system.
            :param sample_identifiers: A list of sample identifiers to retrieve.
            """
    
            just_one = True
            identifiers = []
            if isinstance(sample_ident, list):
                just_one = False
                for ident in sample_ident:
                    identifiers.append(
                        _type_for_id(ident, 'sample')
                    )
            else:
                identifiers.append(
                    _type_for_id(sample_ident, 'sample')
                )
    
            fetchopts = {"type": {"@type": "as.dto.sample.fetchoptions.SampleTypeFetchOptions"}}
    
            options = ['tags', 'properties', 'attachments', 'space', 'experiment', 'registrator', 'modifier', 'dataSets']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if self.get_server_information().project_samples_enabled:
    
                options.append('project')
            for option in options:
    
                fetchopts[option] = fetch_option[option]
    
            if withAttachments:
                fetchopts['attachments'] = fetch_option['attachmentsWithContent']
    
    
            for key in ['parents','children','container','components']:
                fetchopts[key] = {"@type": "as.dto.sample.fetchoptions.SampleFetchOptions"}
    
                "method": "getSamples",
                "params": [
                    self.token,
    
            resp = self._post_request(self.as_v3, request)
    
            if just_one:
                if len(resp) == 0:
                    raise ValueError('no such sample found: {}'.format(sample_ident))
    
                parse_jackson(resp)
    
                for sample_ident in resp:
    
                        return Sample(
                            openbis_obj = self,
                            type = self.get_sample_type(resp[sample_ident]["type"]["code"]),
                            data = resp[sample_ident]
                        )
    
                return self._sample_list_for_response(
                    response=list(resp.values()),
                    props=props,
                )
    
        def _sample_list_for_response(
            self, response, props=None,
            start_with=None, count=None, totalCount=0
        ):
    
            """returns a Things object, containing a DataFrame plus some additional information
            """
    
            parse_jackson(response)
            attrs = ['identifier', 'permId', 'experiment', 'sample_type',
                     'registrator', 'registrationDate', 'modifier', 'modificationDate']
            if len(response) == 0:
                samples = DataFrame(columns=attrs)
            else:
                samples = DataFrame(response)
                samples['registrationDate'] = samples['registrationDate'].map(format_timestamp)
                samples['modificationDate'] = samples['modificationDate'].map(format_timestamp)
                samples['registrator'] = samples['registrator'].map(extract_person)
                samples['modifier'] = samples['modifier'].map(extract_person)
                samples['identifier'] = samples['identifier'].map(extract_identifier)
                samples['permId'] = samples['permId'].map(extract_permid)
                samples['experiment'] = samples['experiment'].map(extract_nested_identifier)
                samples['sample_type'] = samples['type'].map(extract_nested_permid)
    
            if props is not None:
    
                if isinstance(props, str):
                    props = [props]
    
                for prop in props:
                    samples[prop.upper()] = samples['properties'].map(lambda x: x.get(prop.upper(), ''))
                    attrs.append(prop.upper())
    
    
            return Things(
                openbis_obj = self,
                entity = 'sample',
                df = samples[attrs],
                identifier_name = 'identifier',
                start_with=start_with,
                count=count,
    
                totalCount=totalCount,
    
        def get_external_data_management_system(self, permId, only_data=False):
    
            """Retrieve metadata for the external data management system.
    
            :param permId: A permId for an external DMS.
    
            :param only_data: Return the result data as a hash-map, not an object.
            """
    
            request = {
                "method": "getExternalDataManagementSystems",
                "params": [
                    self.token,
                    [{
                        "@type": "as.dto.externaldms.id.ExternalDmsPermId",