Skip to content
Snippets Groups Projects
pybis.py 174 KiB
Newer Older
  • Learn to ignore specific revisions
  •             raise ValueError("login to openBIS failed")
            else:
                self.token = result
    
            # Request just 1 permId
            request = {
                "method": "createPermIdStrings",
                "params": [self.token, 1],
            }
            resp = self._post_request(self.as_v3, request)
            if resp is not None:
                return resp[0]
            else:
                raise ValueError("Could not create permId")
    
        def get_datastores(self):
    
            """ Get a list of all available datastores. Usually there is only one, but in some cases
    
            there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
    
            the file uploaded to.
            """
    
    
            request = {
                "method": "listDataStores",
    
            }
            resp = self._post_request(self.as_v1, request)
            if resp is not None:
    
                return DataFrame(resp)[['code', 'downloadUrl', 'hostUrl']]
    
                raise ValueError("No datastore found!")
    
    
        def new_person(self, userId, space=None):
            """ creates an openBIS person
            """
    
            try:
                person = self.get_person(userId=userId)
            except Exception:
                return Person(self, userId=userId, space=space) 
    
            raise ValueError(
                "There already exists a user with userId={}".format(userId)
            )
    
    
    
        def new_group(self, code, description=None, users=None):
            """ creates an openBIS person
            """
            return Group(self, code=code, description=description, users=users)
    
    
    
        def get_group(self, groupId, only_data=False):
            """ Get an openBIS AuthorizationGroup. Returns a Group object.
    
            ids = [{
                "@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
                "permId": groupId
            }]
    
            fetchopts = {}
            for option in ['roleAssignments', 'users', 'registrator']:
                fetchopts[option] = fetch_option[option]
    
    
            fetchopts['users']['space'] = fetch_option['space']
    
    
            request = {
                "method": "getAuthorizationGroups",
                "params": [
                    self.token,
                    ids,
                    fetchopts
                ]
            }
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No group found!")
    
            for permid in resp:
                group = resp[permid]
                parse_jackson(group)
    
                if only_data:
                    return group
                else:
                    return Group(self, data=group)
    
    
        def get_role_assignments(self, **search_args):
    
            """ Get the assigned roles for a given group, person or space
            """
            search_criteria = get_search_type_for_entity('roleAssignment', 'AND')
    
            allowed_search_attrs = ['role', 'roleLevel', 'user', 'group', 'person', 'space']
    
    
            sub_crit = []
            for attr in search_args:
                if attr in allowed_search_attrs:
                    if attr == 'space':
                        sub_crit.append(
                            _subcriteria_for_code(search_args[attr], 'space')
                        )
    
                        userId = ''
                        if isinstance(search_args[attr], str):
                            userId = search_args[attr]
                        else:
                            userId = search_args[attr].userId
    
                        sub_crit.append(
                            _subcriteria_for_userId(userId)    
                        )
                    elif attr == 'group':
    
                        groupId = ''
                        if isinstance(search_args[attr], str):
                            groupId = search_args[attr]
                        else:
                            groupId = search_args[attr].code
    
                        sub_crit.append(
    
                            _subcriteria_for_permid(groupId, 'AuthorizationGroup')
    
                        )
                    elif attr == 'role':
                        # TODO
                        raise ValueError("not yet implemented")
                    elif attr == 'roleLevel':
                        # TODO
                        raise ValueError("not yet implemented")
                    else:
                        pass
                else:
    
                    raise ValueError("unknown search argument {}".format(attr))
    
    
            search_criteria['criteria'] = sub_crit
    
            fetchopts = {}
    
            for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
    
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchRoleAssignments",
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts
                ]
            }
    
            resp = self._post_request(self.as_v3, request)
            if len(resp['objects']) == 0:
                raise ValueError("No assigned roles found!")
    
            objects = resp['objects']
            parse_jackson(objects)
            roles = DataFrame(objects)
    
    
            roles['techId'] = roles['id'].map(extract_id)
    
            roles['user'] = roles['user'].map(extract_userId)
            roles['group'] = roles['authorizationGroup'].map(extract_code)
            roles['space'] = roles['space'].map(extract_code)
            p = Things(
    
                self, entity='role_assignment', 
                df=roles[['techId', 'role', 'roleLevel', 'user', 'group', 'space', 'project']],
                identifier_name='techId'
    
        def get_role_assignment(self, techId, only_data=False):
    
            """ Fetches one assigned role by its techId.
            """
    
            fetchopts = {}
            for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "getRoleAssignments",
                "params": [
                    self.token,
                    [{
                        "techId": techId,
                        "@type": "as.dto.roleassignment.id.RoleAssignmentTechId"
                    }],
                    fetchopts
                ]
            }
    
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No assigned role found for techId={}".format(techId))
    
            for id in resp:
                data = resp[id]
                parse_jackson(data)
    
                if only_data:
                    return data
                else:
                    return RoleAssignment(self, data=data)
    
    
        def assign_role(self, role, **args):
            """ general method to assign a role to either
                - a person
                - a group
            The scope is either
                - the whole instance
                - a space
                - a project
            """
             
            userId = None
            groupId = None
            spaceId = None
            projectId = None
    
            for arg in args:
                if arg in ['person', 'group', 'space', 'project']:
                    permId = args[arg] if isinstance(args[arg],str) else args[arg].permId
                    if arg == 'person':
                        userId = {
                            "permId": permId,
                            "@type": "as.dto.person.id.PersonPermId"
                        }
                    elif arg == 'group':
                        groupId = {
                            "permId": permId,
                            "@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId"
                        }
                    elif arg == 'space':
                        spaceId = {
                            "permId": permId,
                            "@type": "as.dto.space.id.SpacePermId"
                        }
                    elif arg == 'project':
                        projectId = {
                            "permId": permId,
                            "@type": "as.dto.project.id.ProjectPermId"
                        }
    
            request = {
                "method": "createRoleAssignments",
                "params": [
                    self.token, 
                    [
    	            {
                            "role": role,
                            "userId": userId,
    		        "authorizationGroupId": groupId,
                            "spaceId": spaceId,
    		        "projectId": projectId,
    		        "@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
    	            }
    	        ]
    	    ]
    	}
            resp = self._post_request(self.as_v3, request)
            return
    
    
    
        def get_groups(self, **search_args):
            """ Get openBIS AuthorizationGroups. Returns a «Things» object.
    
            Usage::
                groups = e.get.groups()
                groups[0]             # select first group
                groups['GROUP_NAME']  # select group with this code
                for group in groups:
                    ...               # a Group object
                groups.df             # get a DataFrame object of the group list
                print(groups)         # print a nice ASCII table (eg. in IPython)
                groups                # HTML table (in a Jupyter notebook)
    
            """
    
            criteria = []
            for search_arg in ['code', 'description', 'registrator']:
                if search_arg in search_args:
                    pass
                    #sub_crit = get_search_type_for_entity(search_arg)
                    #criteria.append(sub_crit)
    
            search_criteria = get_search_type_for_entity('authorizationGroup')
            search_criteria['criteria'] = criteria
    
            search_criteria['operator'] = 'AND'
    
                    
            fetchopts = fetch_option['authorizationGroup']
    
            for option in ['roleAssignments', 'registrator', 'users']:
    
                fetchopts[option] = fetch_option[option]
            request = {
                "method": "searchAuthorizationGroups",
                "params": [
                    self.token,
    
                    fetchopts
                ],
            }
            resp = self._post_request(self.as_v3, request)
            if len(resp['objects']) == 0:
    
                raise ValueError("No groups found!")
    
    
            objects = resp['objects']
            parse_jackson(objects)
    
    
            groups = DataFrame(objects)
    
    
            groups['permId'] = groups['permId'].map(extract_permid)
            groups['registrator'] = groups['registrator'].map(extract_person)
    
            groups['users'] = groups['users'].map(extract_userId)
    
            groups['registrationDate'] = groups['registrationDate'].map(format_timestamp)
            groups['modificationDate'] = groups['modificationDate'].map(format_timestamp)
    
                self, entity='group', 
                df=groups[['permId', 'code', 'description', 'users', 'registrator', 'registrationDate', 'modificationDate']],
    
                identifier_name='permId'
            )
            return p
    
    
        def get_persons(self, **search_args):
            """ Get openBIS users
            """
    
    
            search_criteria = get_search_criteria('person', **search_args)
    
            fetchopts = {}
            for option in ['space']:
                fetchopts[option] = fetch_option[option]
            request = {
                "method": "searchPersons",
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts
                ],
            }
            resp = self._post_request(self.as_v3, request)
            if len(resp['objects']) == 0:
                raise ValueError("No persons found!")
    
            objects = resp['objects']
            parse_jackson(objects)
    
            persons = DataFrame(resp['objects'])
            persons['permId'] = persons['permId'].map(extract_permid)
            persons['registrationDate'] = persons['registrationDate'].map(format_timestamp)
            persons['space'] = persons['space'].map(extract_nested_permid)
            p = Things(
                self, entity='person', 
                df=persons[['permId', 'userId', 'firstName', 'lastName', 'email', 'space', 'registrationDate', 'active']],
                identifier_name='permId'
            )
            return p
    
        def get_person(self, userId, only_data=False):
            """ Get a person (user)
            """
             
            ids = [{
                "@type": "as.dto.person.id.PersonPermId",
                "permId": userId
            }]
    
            fetchopts = {}
    
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "getPersons",
                "params": [
                    self.token,
                    ids,
                    fetchopts,
                ],
            }
            
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No person found!")
    
    
            for permid in resp:
                person = resp[permid]
                parse_jackson(person)
    
                if only_data:
                    return person
                else:
                    return Person(self, data=person)
    
    
    
        def get_spaces(self, code=None):
    
            """ Get a list of all available spaces (DataFrame object). To create a sample or a
            dataset, you need to specify in which space it should live.
            """
    
            search_criteria = _subcriteria_for_code(code, 'space')
            fetchopts = {}
    
            request = {
                "method": "searchSpaces",
    
                           search_criteria,
                           fetchopts,
    
            }
            resp = self._post_request(self.as_v3, request)
            if resp is not None:
                spaces = DataFrame(resp['objects'])
    
                spaces['registrationDate'] = spaces['registrationDate'].map(format_timestamp)
                spaces['modificationDate'] = spaces['modificationDate'].map(format_timestamp)
    
                sp = Things(
                    self,
                    'space',
                    spaces[['code', 'description', 'registrationDate', 'modificationDate']]
                )
                return sp
    
                raise ValueError("No spaces found!")
    
    
        def get_space(self, code, only_data=False):
            """ Returns a Space object for a given identifier.
    
            fetchopts = {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"}
    
            for option in ['registrator']:
                fetchopts[option] = fetch_option[option]
    
    
                "method": "getSpaces",
                "params": [
                    self.token,
                    [{
    
                        "@type": "as.dto.space.id.SpacePermId"
                    }],
                    fetchopts
    
            resp = self._post_request(self.as_v3, request)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if len(resp) == 0:
    
                raise ValueError("No such space: %s" % code)
    
            for permid in resp:
                if only_data:
                    return resp[permid]
                else:
                    return Space(self, data=resp[permid])
    
        def get_samples(self, code=None, permId=None, space=None, project=None, experiment=None, type=None,
    
                        withParents=None, withChildren=None, tags=None, props=None, **properties):
    
            """ Get a list of all samples for a given space/project/experiment (or any combination)
            """
    
            sub_criteria = []
            if space:
    
                sub_criteria.append(_gen_search_criteria({
    
                    "space": "Space",
                    "operator": "AND",
                    "code": space
    
                exp_crit = _subcriteria_for_code(experiment, 'experiment')
                proj_crit = _subcriteria_for_code(project, 'project')
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                exp_crit['criteria'] = []
                exp_crit['criteria'].append(proj_crit)
                sub_criteria.append(exp_crit)
    
                sub_criteria.append(_subcriteria_for_code(experiment, 'experiment'))
    
            if properties is not None:
                for prop in properties:
                    sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
    
            if type:
                sub_criteria.append(_subcriteria_for_code(type, 'sample_type'))
    
            if tags:
                sub_criteria.append(_subcriteria_for_tags(tags))
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
    
                sub_criteria.append(_common_search("as.dto.common.search.PermIdSearchCriteria", permId))
    
                if not isinstance(withParents, list):
                    withParents = [withParents]
                for parent in withParents:
                    sub_criteria.append(
    
                            "sample": "SampleParents",
                            "identifier": parent
                        })
                    )
    
                if not isinstance(withChildren, list):
                    withChildren = [withChildren]
                for child in withChildren:
                    sub_criteria.append(
    
                            "sample": "SampleChildren",
                            "identifier": child
                        })
                    )
    
    
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.sample.search.SampleSearchCriteria",
                "operator": "AND"
            }
    
            # build the various fetch options
    
            fetchopts = fetch_option['sample']
    
            for option in ['tags', 'properties', 'registrator', 'modifier', 'experiment']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchSamples",
    
            }
            resp = self._post_request(self.as_v3, request)
    
                raise ValueError("no samples found!")
    
            samples = DataFrame(objects)
            samples['registrationDate'] = samples['registrationDate'].map(format_timestamp)
            samples['modificationDate'] = samples['modificationDate'].map(format_timestamp)
    
            samples['registrator'] = samples['registrator'].map(extract_person)
            samples['modifier'] = samples['modifier'].map(extract_person)
            samples['identifier'] = samples['identifier'].map(extract_identifier)
    
            samples['permId'] = samples['permId'].map(extract_permid)
    
            samples['experiment'] = samples['experiment'].map(extract_nested_identifier)
            samples['sample_type'] = samples['type'].map(extract_nested_permid)
    
            attrs = ['identifier', 'permId', 'experiment', 'sample_type',
    
                     'registrator', 'registrationDate', 'modifier', 'modificationDate']
    
            if props is not None:
    
                    samples[prop.upper()] = samples['properties'].map(lambda x: x.get(prop.upper(), ''))
    
            return Things(self, 'sample', ss, 'identifier')
    
        get_objects = get_samples # Alias
    
    
        def get_experiments(self, code=None, type=None, space=None, project=None, tags=None, is_finished=None, props=None, **properties):
            """ Searches for all experiment which match the search criteria. Returns a
            «Things» object which can be used in many different situations.
    
            Usage::
                experiments = get_experiments(project='PROJECT_NAME', props=['NAME','FINISHED_FLAG'])
                experiments[0]  # returns first experiment
                experiments['/MATERIALS/REAGENTS/ANTIBODY_COLLECTION']
                for experiment in experiment:
                    # handle every experiment
                    ...
                experiments.df      # returns DataFrame object of the experiment list
                print(experiments)  # prints a nice ASCII table
    
                sub_criteria.append(_subcriteria_for_code(space, 'space'))
    
                sub_criteria.append(_subcriteria_for_code(project, 'project'))
    
            if code:
    
                sub_criteria.append(_criteria_for_code(code))
    
            if type:
                sub_criteria.append(_subcriteria_for_type(type, 'Experiment'))
            if tags:
                sub_criteria.append(_subcriteria_for_tags(tags))
            if is_finished is not None:
                sub_criteria.append(_subcriteria_for_is_finished(is_finished))
            if properties is not None:
                for prop in properties:
                    sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
    
            search_criteria = get_search_type_for_entity('experiment')
    
            search_criteria['criteria'] = sub_criteria
            search_criteria['operator'] = 'AND'
    
    
            fetchopts = fetch_option['experiment']
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for option in ['tags', 'properties', 'registrator', 'modifier', 'project']:
                fetchopts[option] = fetch_option[option]
    
    
            request = {
                "method": "searchExperiments",
    
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts,
                ],
    
            }
            resp = self._post_request(self.as_v3, request)
    
            if len(resp['objects']) == 0:
                raise ValueError("No experiments found!")
    
            objects = resp['objects']
            parse_jackson(objects)
    
            experiments = DataFrame(objects)
    
            experiments['registrationDate'] = experiments['registrationDate'].map(format_timestamp)
            experiments['modificationDate'] = experiments['modificationDate'].map(format_timestamp)
            experiments['project'] = experiments['project'].map(extract_code)
    
            experiments['registrator'] = experiments['registrator'].map(extract_person)
            experiments['modifier'] = experiments['modifier'].map(extract_person)
            experiments['identifier'] = experiments['identifier'].map(extract_identifier)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            experiments['permId'] = experiments['permId'].map(extract_permid)
    
            experiments['type'] = experiments['type'].map(extract_code)
    
    
            attrs = ['identifier', 'permId', 'project', 'type',
    
                     'registrator', 'registrationDate', 'modifier', 'modificationDate']
    
            if props is not None:
                for prop in props:
    
                    experiments[prop.upper()] = experiments['properties'].map(lambda x: x.get(prop.upper(), ''))
    
            return Things(self, 'experiment', exps, 'identifier')
    
    
        def get_datasets(self,
                         code=None, type=None, withParents=None, withChildren=None, status=None,
    
                         sample=None, experiment=None, project=None, tags=None, props=None, **properties
    
    
            sub_criteria = []
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
            if type:
                sub_criteria.append(_subcriteria_for_type(type, 'DataSet'))
            if withParents:
                sub_criteria.append(_subcriteria_for_permid(withParents, 'DataSet', 'Parents'))
            if withChildren:
                sub_criteria.append(_subcriteria_for_permid(withChildren, 'DataSet', 'Children'))
    
    
            if sample:
                sub_criteria.append(_subcriteria_for_code(sample, 'Sample'))
            if experiment:
                sub_criteria.append(_subcriteria_for_code(experiment, 'Experiment'))
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if project:
                exp_crit = _subcriteria_for_code(experiment, 'Experiment')
                proj_crit = _subcriteria_for_code(project, 'Project')
                exp_crit['criteria'] = []
                exp_crit['criteria'].append(proj_crit)
                sub_criteria.append(exp_crit)
    
            if tags:
                sub_criteria.append(_subcriteria_for_tags(tags))
    
            if status:
                sub_criteria.append(_subcriteria_for_status(status))
    
            if properties is not None:
                for prop in properties:
                    sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
    
            search_criteria = get_search_type_for_entity('dataset')
    
            search_criteria['criteria'] = sub_criteria
            search_criteria['operator'] = 'AND'
    
                "containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
                "type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"}
    
            for option in ['tags', 'properties', 'sample', 'experiment', 'physicalData']:
    
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchDataSets",
    
            }
            resp = self._post_request(self.as_v3, request)
    
            if len(resp['objects']) == 0:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError("no datasets found!")
    
    
            objects = resp['objects']
            parse_jackson(objects)
    
            datasets = DataFrame(objects)
            datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
            datasets['modificationDate'] = datasets['modificationDate'].map(format_timestamp)
            datasets['experiment'] = datasets['experiment'].map(extract_nested_identifier)
            datasets['sample'] = datasets['sample'].map(extract_nested_identifier)
            datasets['type'] = datasets['type'].map(extract_code)
            datasets['permId'] = datasets['code']
            datasets['location'] = datasets['physicalData'].map(lambda x: x.get('location') if x else '')
    
    
            attrs = ['permId', 'properties', 'type', 'experiment', 'sample', 'registrationDate', 'modificationDate',
                     'location']
    
            if props is not None:
                for prop in props:
                    datasets[prop.upper()] = datasets['properties'].map(lambda x: x.get(prop.upper(), ''))
                    attrs.append(prop.upper())
    
    
            return Things(self, 'dataset', datasets[attrs], 'permId')
    
        def get_experiment(self, expId, withAttachments=False, only_data=False):
    
            """ Returns an experiment object for a given identifier (expId).
            """
    
    
                "@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
                "type": {
                    "@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
                },
    
            search_request = search_request_for_identifier(expId, 'experiment')
    
            for option in ['tags', 'properties', 'attachments', 'project', 'samples']:
    
                fetchopts[option] = fetch_option[option]
    
            if withAttachments:
                fetchopts['attachments'] = fetch_option['attachmentsWithContent']
    
    
            request = {
    
                "method": "getExperiments",
                "params": [
    
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No such experiment: %s" % expId)
    
    
            for id in resp:
                if only_data:
                    return resp[id]
                else:
                    return Experiment(
                        openbis_obj = self,
                        type = self.get_experiment_type(resp[expId]["type"]["code"]),
                        data = resp[id]
                    )
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def new_experiment(self, type, code, project, props=None, **kwargs):
    
            """ Creates a new experiment of a given experiment type.
            """
    
            return Experiment(
                openbis_obj = self, 
                type = self.get_experiment_type(type), 
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                project = project,
    
                data = None,
                props = props,
                code = code, 
                **kwargs
            )
    
    
        def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
            params = {
                "experimentId": {
                    "permId": experimentId,
                    "@type": "as.dto.experiment.id.ExperimentPermId"
                },
                "@type": "as.dto.experiment.update.ExperimentUpdate"
            }
            if properties is not None:
    
                params["properties"] = properties
    
            if tagIds is not None:
                params["tagIds"] = tagIds
            if attachments is not None:
                params["attachments"] = attachments
    
            request = {
                "method": "updateExperiments",
                "params": [
                    self.token,
    
        def create_sample(self, space_ident, code, type,
                          project_ident=None, experiment_ident=None, properties=None, attachments=None, tags=None):
    
    
            tagIds = _create_tagIds(tags)
            typeId = _create_typeId(type)
            projectId = _create_projectId(project_ident)
            experimentId = _create_experimentId(experiment_ident)
    
            if properties is None:
                properties = {}
    
            request = {
                "method": "createSamples",
                "params": [
                    self.token,
                    [
                        {
                            "properties": properties,
                            "code": code,
    
                            "projectId": projectId,
                            "experimentId": experimentId,
                            "tagIds": tagIds,
                            "attachments": attachments,
    
                            "@type": "as.dto.sample.create.SampleCreation",
    
                        }
                    ]
                ],
            }
            resp = self._post_request(self.as_v3, request)
            return self.get_sample(resp[0]['permId'])
    
    
        create_object = create_sample # Alias
    
    
        def create_external_data_management_system(self, code, label, address, address_type='FILE_SYSTEM'):
    
            """Create an external DMS.
            :param code: An openBIS code for the external DMS.
            :param label: A human-readable label.
            :param address: The address for accessing the external DMS. E.g., a URL.
    
            :param address_type: One of OPENBIS, URL, or FILE_SYSTEM
    
            :return:
            """
            request = {
                "method": "createExternalDataManagementSystems",
                "params": [
                    self.token,
                    [
                        {
                            "code": code,
                            "label": label,
                            "addressType": address_type,
                            "address": address,
                            "@type": "as.dto.externaldms.create.ExternalDmsCreation",
                        }
                    ]
                ],
            }
            resp = self._post_request(self.as_v3, request)
            return self.get_external_data_management_system(resp[0]['permId'])
    
    
        def update_sample(self, sampleId, space=None, project=None, experiment=None,
    
                          parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
    
            params = {
                "sampleId": {
                    "permId": sampleId,
                    "@type": "as.dto.sample.id.SamplePermId"
                },
                "@type": "as.dto.sample.update.SampleUpdate"
            }
    
            if space is not None:
                params['spaceId'] = space
            if project is not None:
                params['projectId'] = project
    
                params["properties"] = properties
    
            if tagIds is not None:
                params["tagIds"] = tagIds
            if attachments is not None:
                params["attachments"] = attachments
    
            request = {
                "method": "updateSamples",
                "params": [
                    self.token,
    
        update_object = update_sample # Alias
    
    
    
        def delete_entity(self, entity, id, reason, id_name='PermId'):
    
            """Deletes Spaces, Projects, Experiments, Samples and DataSets
            """
    
    
            entity_type = "as.dto.{}.id.{}{}{}".format(
                entity.lower(), entity, 
                id_name[0].upper(), id_name[1:]
            )
    
                "method": "delete{}s".format(entity),
    
                        "@type": "as.dto.{}.delete.{}DeletionOptions".format(
                            entity.lower(), entity)
    
        def get_deletions(self):
            request = {
                "method": "searchDeletions",
                "params": [
                    self.token,
                    {},
                    {
                        "deletedObjects": {
                            "@type": "as.dto.deletion.fetchoptions.DeletedObjectFetchOptions"
                        }
                    }
                ]
            }
            resp = self._post_request(self.as_v3, request)
            objects = resp['objects']
            parse_jackson(objects)
    
    
            for value in objects:
                del_objs = extract_deletion(value)
                if len(del_objs) > 0:
                    new_objs.append(*del_objs)
    
            return DataFrame(new_objs)
    
        def new_project(self, space, code, description=None, **kwargs):
            return Project(self, None, space=space, code=code, description=description, **kwargs)
    
        def _gen_fetchoptions(self, options):
            fo = {}
            for option in options:
                fo[option] = fetch_option[option]
            return fo
    
    
        def get_project(self, projectId, only_data=False):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            options = ['space', 'registrator', 'modifier', 'attachments']
    
            if is_identifier(projectId) or is_permid(projectId):
    
                request = self._create_get_request(
                    'getProjects', 'project', projectId, options
                )
                resp = self._post_request(self.as_v3, request)
    
                if only_data:
                    return resp[projectId]
    
    
                return Project(self, resp[projectId])
    
            else:
                search_criteria = _gen_search_criteria({
                    'project': 'Project',
                    'operator': 'AND',
                    'code': projectId
                })
                fo = self._gen_fetchoptions(options)
                request = {
                    "method": "searchProjects",
                    "params": [self.token, search_criteria, fo]
                }
                resp = self._post_request(self.as_v3, request)
    
                if len(resp['objects']) == 0:
                    raise ValueError("No such project: %s" % projectId)
    
                if only_data:
                    return resp['objects'][0]
    
    
                return Project(self, resp['objects'][0])
    
        def get_projects(self, space=None, code=None):
    
            """ Get a list of all available projects (DataFrame object).
            """
    
            sub_criteria = []
            if space:
    
                sub_criteria.append(_subcriteria_for_code(space, 'space'))
    
            if code:
                sub_criteria.append(_criteria_for_code(code))
    
    
            criteria = {
                "criteria": sub_criteria,
                "@type": "as.dto.project.search.ProjectSearchCriteria",
                "operator": "AND"
            }
    
    
            fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
            for option in ['registrator', 'modifier', 'leader']:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                fetchopts[option] = fetch_option[option]
    
    
            request = {
                "method": "searchProjects",
    
            }
    
            resp = self._post_request(self.as_v3, request)
    
            objects = resp['objects']
            if len(objects) == 0:
                raise ValueError("No projects found!")