Skip to content
Snippets Groups Projects
pybis.py 85.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    """
    pybis.py
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    Work with openBIS from Python.
    
    import requests
    
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    
    import json
    import re
    
    from urllib.parse import urlparse, urljoin, quote
    
    import zlib
    
    from texttable import Texttable
    
    from tabulate import tabulate
    
    from .utils import parse_jackson, check_datatype, split_identifier, format_timestamp, is_identifier, is_permid, nvl, VERBOSE
    
    from .utils import extract_permid, extract_code,extract_deletion,extract_identifier,extract_nested_identifier,extract_nested_permid,extract_property_assignments,extract_role_assignments,extract_person, extract_person_details,extract_id,extract_userId
    
    from .property import PropertyHolder, PropertyAssignments
    from .masterdata import Vocabulary
    from .openbis_object import OpenBisObject 
    from .definitions import fetch_option
    
    # import the various openBIS entities
    
    from .things import Things
    
    from .space import Space
    from .project import Project
    from .experiment import Experiment
    from .sample import Sample
    from .dataset import DataSet
    from .person import Person
    from .group import Group
    from .role_assignment import RoleAssignment
    from .tag import Tag
    from .semantic_annotation import SemanticAnnotation
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from .plugin import Plugin
    
    from pandas import DataFrame, Series
    
    LOG_NONE    = 0
    LOG_SEVERE  = 1
    LOG_ERROR   = 2
    LOG_WARNING = 3
    LOG_INFO    = 4
    LOG_ENTRY   = 5
    LOG_PARM    = 6
    LOG_DEBUG   = 7
    
    DEBUG_LEVEL = LOG_NONE
    
    
    
    def get_search_type_for_entity(entity, operator=None):
    
        """ Returns a dictionary containing the correct search criteria type
        for a given entity.
    
    
            get_search_type_for_entity('space')
            # returns:
    
            {'@type': 'as.dto.space.search.SpaceSearchCriteria'}
        """
        search_criteria = {
            "space": "as.dto.space.search.SpaceSearchCriteria",
            "userId": "as.dto.person.search.UserIdSearchCriteria",
            "email": "as.dto.person.search.EmailSearchCriteria",
            "firstName": "as.dto.person.search.FirstNameSearchCriteria",
            "lastName": "as.dto.person.search.LastNameSearchCriteria",
            "project": "as.dto.project.search.ProjectSearchCriteria",
            "experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
            "experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
            "sample": "as.dto.sample.search.SampleSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "dataset": "as.dto.dataset.search.DataSetSearchCriteria",
            "dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
            "external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
            "material": "as.dto.material.search.MaterialSearchCriteria",
            "material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
            "vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
            "tag": "as.dto.tag.search.TagSearchCriteria",
    
            "authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
    
            "roleAssignment": "as.dto.roleassignment.search.RoleAssignmentSearchCriteria",
    
            "person": "as.dto.person.search.PersonSearchCriteria",
            "code": "as.dto.common.search.CodeSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "global": "as.dto.global.GlobalSearchObject",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "plugin": "as.dto.plugin.search.PluginSearchCriteria",
    
    
        sc = { "@type": search_criteria[entity] }
        if operator is not None:
            sc["operator"] = operator
    
        return sc
    
    
    def get_attrs_for_entity(entity):
        """ For a given entity this method returns an iterator for all searchable
        attributes.
        """
        search_args = {
            "person": ['firstName','lastName','email','userId']
        }
        for search_arg in search_args[entity]:
            yield search_arg
    
    
    def search_request_for_identifier(ident, entity):
    
            search_request = {
                "identifier": ident.upper(),
    
                "@type": "as.dto.{}.id.{}Identifier".format(entity.lower(), entity.capitalize())
    
                "@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity.capitalize())
    
    def get_search_criteria(entity, **search_args):
        search_criteria = get_search_type_for_entity(entity)
    
        criteria = []
        for attr in get_attrs_for_entity(entity):
            if attr in search_args:
                sub_crit = get_search_type_for_entity(attr)
                sub_crit['fieldValue'] = get_field_value_search(attr, search_args[attr])
                criteria.append(sub_crit)
    
        search_criteria['criteria'] = criteria
        search_criteria['operator'] = "AND"
    
        return search_criteria
    
    
    def crc32(fileName):
    
        """since Python3 the zlib module returns unsigned integers (2.7: signed int)
        """
    
        prev = 0
    
        for eachLine in open(fileName, "rb"):
    
            prev = zlib.crc32(eachLine, prev)
        # return as hex
    
    def _tagIds_for_tags(tags=None, action='Add'):
    
        """creates an action item to add or remove tags. 
        Action is either 'Add', 'Remove' or 'Set'
    
        """
        if tags is None:
            return
        if not isinstance(tags, list):
            tags = [tags]
    
        items = []
        for tag in tags:
            items.append({
                "code": tag,
                "@type": "as.dto.tag.id.TagCode"
            })
    
        tagIds = {
            "actions": [
                {
                    "items": items,
                    "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
                }
            ],
            "@type": "as.dto.common.update.IdListUpdateValue"
        }
    
        return tagIds
    
    
    def _list_update(ids=None, entity=None, action='Add'):
        """creates an action item to add, set or remove ids. 
        """
        if ids is None:
            return
        if not isinstance(ids, list):
            ids = [ids]
    
        items = []
        for ids in ids:
            items.append({
                "code": ids,
                "@type": "as.dto.{}.id.{}Code".format(entity.lower(), entity)
            })
    
        list_update = {
            "actions": [
                {
                    "items": items,
                    "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
                }
            ],
            "@type": "as.dto.common.update.IdListUpdateValue"
        }
    
    def get_field_value_search(field, value, comparison="StringEqualToValue"):
        return {
            "value": value,
            "@type": "as.dto.common.search.{}".format(comparison)
        }
    
    def _common_search(search_type, value, comparison="StringEqualToValue"):
        sreq = {
    
                "@type": "as.dto.common.search.{}".format(comparison)
            }
        }
    
    def _criteria_for_code(code):
        return {
            "fieldValue": {
    
                "@type": "as.dto.common.search.StringEqualToValue"
            },
            "@type": "as.dto.common.search.CodeSearchCriteria"
        }
    
    
    def _subcriteria_for_userId(userId):
        return {
              "criteria": [
                {
                  "fieldName": "userId",
                  "fieldType": "ATTRIBUTE",
                  "fieldValue": {
                    "value": userId,
                    "@type": "as.dto.common.search.StringEqualToValue"
                  },
                  "@type": "as.dto.person.search.UserIdSearchCriteria"
                }
              ],
              "@type": "as.dto.person.search.PersonSearchCriteria",
              "operator": "AND"
            }
    
    
    def _subcriteria_for_type(code, entity):
    
            "@type": "as.dto.{}.search.{}TypeSearchCriteria".format(entity.lower(), entity),
    
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldValue": {
                        "value": code.upper(),
                        "@type": "as.dto.common.search.StringEqualToValue"
                    }
    
    def _subcriteria_for_status(status_value):
        status_value = status_value.upper()
        valid_status = "AVAILABLE LOCKED ARCHIVED UNARCHIVE_PENDING ARCHIVE_PENDING BACKUP_PENDING".split()
        if not status_value in valid_status:
            raise ValueError("status must be one of the following: " + ", ".join(valid_status))
    
        return {
            "@type": "as.dto.dataset.search.PhysicalDataSearchCriteria",
            "operator": "AND",
            "criteria": [{
                "@type":
    
                    "as.dto.dataset.search.StatusSearchCriteria",
                "fieldName": "status",
    
                "fieldType": "ATTRIBUTE",
    
    def _gen_search_criteria(req):
    
        sreq = {}
        for key, val in req.items():
            if key == "criteria":
                items = []
                for item in req['criteria']:
    
                    items.append(_gen_search_criteria(item))
    
                sreq['criteria'] = items
            elif key == "code":
    
                sreq["criteria"] = [_common_search(
                    "as.dto.common.search.CodeSearchCriteria", val.upper()
                )]
            elif key == "identifier":
    
                if is_identifier(val):
                    # if we have an identifier, we need to search in Space and Code separately
                    si = split_identifier(val)
                    sreq["criteria"] = []
                    if "space" in si:
                        sreq["criteria"].append(
                            _gen_search_criteria({"space": "Space", "code": si["space"]})
                        )
                    if "experiment" in si:
                        pass
    
                    if "code" in si:
                        sreq["criteria"].append(
                            _common_search(
                                "as.dto.common.search.CodeSearchCriteria", si["code"].upper()
                            )
    
                elif is_permid(val):
                    sreq["criteria"] = [_common_search(
                        "as.dto.common.search.PermIdSearchCriteria", val
                    )]
                else:
                    # we assume we just got a code
                    sreq["criteria"] = [_common_search(
                        "as.dto.common.search.CodeSearchCriteria", val.upper()
                    )]
    
            else:
                sreq["@type"] = "as.dto.{}.search.{}SearchCriteria".format(key, val)
        return sreq
    
    
    def _subcriteria_for_tags(tags):
        if not isinstance(tags, list):
            tags = [tags]
    
        criterias = []
        for tag in tags:
            criterias.append({
                "fieldName": "code",
                "fieldType": "ATTRIBUTE",
                "fieldValue": {
                    "value": tag,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "@type": "as.dto.common.search.CodeSearchCriteria"
            })
    
        return {
            "@type": "as.dto.tag.search.TagSearchCriteria",
            "operator": "AND",
            "criteria": criterias
        }
    
    
    def _subcriteria_for_is_finished(is_finished):
        return {
            "@type": "as.dto.common.search.StringPropertySearchCriteria",
            "fieldName": "FINISHED_FLAG",
            "fieldType": "PROPERTY",
            "fieldValue": {
                "value": is_finished,
                "@type": "as.dto.common.search.StringEqualToValue"
            }
        }
    
    
    def _subcriteria_for_properties(prop, val):
        return {
            "@type": "as.dto.common.search.StringPropertySearchCriteria",
            "fieldName": prop.upper(),
            "fieldType": "PROPERTY",
            "fieldValue": {
                "value": val,
                "@type": "as.dto.common.search.StringEqualToValue"
            }
        }
    
    
    
    def _subcriteria_for_permid(permids, entity, parents_or_children='', operator='AND'):
    
        if not isinstance(permids, list):
            permids = [permids]
    
        criterias = []
        for permid in permids:
    
                "@type": "as.dto.common.search.PermIdSearchCriteria",
                "fieldValue": {
                    "value": permid,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "fieldType": "ATTRIBUTE",
                "fieldName": "code"
    
            "@type": "as.dto.{}.search.{}{}SearchCriteria".format(
    
                entity.lower(), entity, parents_or_children
    
            "operator": operator
    
    def _subcriteria_for_code(code, object_type):
    
        """ Creates the often used search criteria for code values. Returns a dictionary.
    
        Example::
            _subcriteria_for_code("username", "space")
    
    	{
    	    "criteria": [
    		{
    		    "fieldType": "ATTRIBUTE",
    		    "@type": "as.dto.common.search.CodeSearchCriteria",
    		    "fieldName": "code",
    		    "fieldValue": {
    			"@type": "as.dto.common.search.StringEqualToValue",
    			"value": "USERNAME"
    		    }
    		}
    	    ],
    	    "operator": "AND",
    	    "@type": "as.dto.space.search.SpaceSearchCriteria"
    	}
        """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if code is not None:
            if is_permid(code):
                fieldname = "permId"
                fieldtype = "as.dto.common.search.PermIdSearchCriteria"
            else:
                fieldname = "code"
                fieldtype = "as.dto.common.search.CodeSearchCriteria"
    
    
            search_criteria = get_search_type_for_entity(object_type.lower())
    
            search_criteria['criteria'] = [{
                "fieldName": fieldname,
                "fieldType": "ATTRIBUTE",
                "fieldValue": {
                    "value": code.upper(),
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "@type": fieldtype
            }]
            
            search_criteria["operator"] = "AND"
            return search_criteria
    
            return get_search_type_for_entity(object_type.lower())
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
        """Interface for communicating with openBIS. 
        A recent version of openBIS is required (minimum 16.05.2).
    
        For creation of datasets, dataset-uploader-api needs to be installed.
    
        def __init__(self, url=None, verify_certificates=True, token=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Initialize a new connection to an openBIS server.
    
            :param host:
    
    
            if url is None:
                try:
                    url = os.environ["OPENBIS_URL"]
                    token = os.environ["OPENBIS_TOKEN"] if "OPENBIS_TOKEN" in os.environ else None
                except KeyError:
                    raise ValueError("please provide a URL you want to connect to.")
    
    
            url_obj = urlparse(url)
    
                raise ValueError("please provide the url in this format: https://openbis.host.ch:8443")
    
            if url_obj.hostname is None:
                raise ValueError("hostname is missing")
    
            self.url = url_obj.geturl()
            self.port = url_obj.port
    
            self.hostname = url_obj.hostname
    
            self.as_v3 = '/openbis/openbis/rmi-application-server-v3.json'
            self.as_v1 = '/openbis/openbis/rmi-general-information-v1.json'
            self.reg_v1 = '/openbis/openbis/rmi-query-v1.json'
    
            self.dataset_types = None
            self.sample_types = None
    
            self.token_path = None
    
            # use an existing token, if available
            if self.token is None:
    
                self.token = self._get_cached_token()
    
            elif self.is_token_valid(token):
                pass
            else:
                print("Session is no longer valid. Please log in again.")
    
    
        def __dir__(self):
            return [
                'url', 'port', 'hostname',
                'login()', 'logout()', 'is_session_active()', 'token', 'is_token_valid("")',
                "get_dataset('permId')",
                "get_datasets()",
                "get_dataset_type('raw_data')",
                "get_dataset_types()",
                "get_datastores()",
                "get_deletions()",
                "get_experiment('permId', withAttachments=False)",
                "get_experiments()",
                "get_experiment_type('type')",
                "get_experiment_types()",
    
                "get_external_data_management_system(permId)",
    
                "get_material_type('type')",
                "get_material_types()",
                "get_project('project')",
                "get_projects(space=None, code=None)",
                "get_sample('id')",
    
                "get_object('id')", # "get_sample('id')" alias
    
                "get_objects()", # "get_samples()" alias
    
                "get_object_type(type))", # "get_sample_type(type))" alias
    
                "get_object_types()", # "get_sample_types()" alias
    
                "get_semantic_annotations()",
                "get_semantic_annotation(permId, only_data = False)",
    
                "get_tag(tagId)",
                "new_tag(code, description)",
    
                "new_person(userId, space)",
    
                "get_person(userId)",
                "get_groups()",
                "get_group(code)",
    
                "get_role_assignments()",
                "get_role_assignment(techId)",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "get_plugins()",
                "get_plugin(name)",
    
                "new_group(code, description, userIds)",
    
                'new_project(space, code, description, attachments)',
    
                'new_experiment(type, code, project, props={})',
    
                'new_sample(type, space, project, experiment, parents)',
                'new_object(type, space, project, experiment, parents)', # 'new_sample(type, space, project, experiment)' alias
    
                'new_dataset(type, parent, experiment, sample, files=[], folder, props={})',
    
                'new_semantic_annotation(entityType, propertyType)',
    
                'update_sample(sampleId, space, project, experiment, parents, children, components, properties, tagIds, attachments)',
                'update_object(sampleId, space, project, experiment, parents, children, components, properties, tagIds, attachments)', # 'update_sample(sampleId, space, project, experiment, parents, children, components, properties, tagIds, attachments)' alias
    
        @property
        def spaces(self):
            return self.get_spaces()
    
        @property
        def projects(self):
            return self.get_projects()
    
    
        def _get_cached_token(self):
    
            """Read the token from the cache, and set the token ivar to it, if there, otherwise None.
            If the token is not valid anymore, delete it. 
            """
            token_path = self.gen_token_path()
    
        def gen_token_path(self, parent_folder=None):
            """generates a path to the token file.
            The token is usually saved in a file called
            ~/.pybis/hostname.token
            """
    
                # save token under ~/.pybis folder
                parent_folder = os.path.join(
                    os.path.expanduser("~"),
                    '.pybis'
                )
            path = os.path.join(parent_folder, self.hostname + '.token')
    
        def save_token(self, token=None, parent_folder=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ saves the session token to the disk, usually here: ~/.pybis/hostname.token. When a new Openbis instance is created, it tries to read this saved token by default.
    
            if token is None:
                token = self.token
    
            token_path = None;
            if parent_folder is None:
                token_path = self.gen_token_path()
            else:
                token_path = self.gen_token_path(parent_folder)
    
            # create the necessary directories, if they don't exist yet
    
            os.makedirs(os.path.dirname(token_path), exist_ok=True)
            with open(token_path, 'w') as f:
    
                f.write(token)
                self.token_path = token_path
    
        def delete_token(self, token_path=None):
    
            """ deletes a stored session token.
            """
    
            if token_path is None:
                token_path = self.token_path
            os.remove(token_path)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _post_request(self, resource, request):
    
            """ internal method, used to handle all post requests and serializing / deserializing
            data
            """
    
            return self._post_request_full_url(urljoin(self.url,resource), request)
    
    
        def _post_request_full_url(self, full_url, request):
    
            """ internal method, used to handle all post requests and serializing / deserializing
            data
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "id" not in request:
    
                request["id"] = "2"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "jsonrpc" not in request:
                request["jsonrpc"] = "2.0"
    
            if request["params"][0] is None:
                raise ValueError("Your session expired, please log in again")
    
    
            if DEBUG_LEVEL >=LOG_DEBUG: print(json.dumps(request))
    
            resp = requests.post(
    
                verify=self.verify_certificates
            )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                resp = resp.json()
                if 'error' in resp:
    
                    if DEBUG_LEVEL >= LOG_ERROR: print(json.dumps(request))
    
                    raise ValueError(resp['error']['message'])
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                elif 'result' in resp:
                    return resp['result']
    
                else:
                    raise ValueError('request did not return either result nor error')
            else:
                raise ValueError('general error while performing post request')
    
    
        def logout(self):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ Log out of openBIS. After logout, the session token is no longer valid.
    
    
            logout_request = {
    
                "method": "logout",
                "params": [self.token],
    
            resp = self._post_request(self.as_v3, logout_request)
    
            return resp
    
        def login(self, username=None, password=None, save_token=False):
    
            Expects a username and a password and updates the token (session-ID).
            The token is then used for every request.
    
            Clients may want to store the credentials object in a credentials store after successful login.
    
            Throw a ValueError with the error message if login failed.
            """
    
            if password is None:
                import getpass
                password = getpass.getpass()
    
    
            login_request = {
    
                "method": "login",
                "params": [username, password],
    
            result = self._post_request(self.as_v3, login_request)
    
            if result is None:
                raise ValueError("login to openBIS failed")
            else:
                self.token = result
    
                if save_token:
                    self.save_token()
                return self.token
    
            # Request just 1 permId
            request = {
                "method": "createPermIdStrings",
                "params": [self.token, 1],
            }
            resp = self._post_request(self.as_v3, request)
            if resp is not None:
                return resp[0]
            else:
                raise ValueError("Could not create permId")
    
        def get_datastores(self):
    
            """ Get a list of all available datastores. Usually there is only one, but in some cases
    
            there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
    
            the file uploaded to.
            """
    
    
            request = {
                "method": "listDataStores",
    
            }
            resp = self._post_request(self.as_v1, request)
            if resp is not None:
    
                return DataFrame(resp)[['code', 'downloadUrl', 'hostUrl']]
    
                raise ValueError("No datastore found!")
    
    
        def new_person(self, userId, space=None):
            """ creates an openBIS person
            """
    
            try:
                person = self.get_person(userId=userId)
            except Exception:
                return Person(self, userId=userId, space=space) 
    
            raise ValueError(
                "There already exists a user with userId={}".format(userId)
            )
    
        def new_group(self, code, description=None, userIds=None):
    
            """ creates an openBIS person
            """
    
            return Group(self, code=code, description=description, userIds=userIds)
    
        def get_group(self, code, only_data=False):
    
            """ Get an openBIS AuthorizationGroup. Returns a Group object.
    
            ids = [{
                "@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
    
                "permId": code
    
            }]
    
            fetchopts = {}
            for option in ['roleAssignments', 'users', 'registrator']:
                fetchopts[option] = fetch_option[option]
    
    
            fetchopts['users']['space'] = fetch_option['space']
    
    
            request = {
                "method": "getAuthorizationGroups",
                "params": [
                    self.token,
                    ids,
                    fetchopts
                ]
            }
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No group found!")
    
            for permid in resp:
                group = resp[permid]
                parse_jackson(group)
    
                if only_data:
                    return group
                else:
                    return Group(self, data=group)
    
    
        def get_role_assignments(self, **search_args):
    
            """ Get the assigned roles for a given group, person or space
            """
            search_criteria = get_search_type_for_entity('roleAssignment', 'AND')
    
            allowed_search_attrs = ['role', 'roleLevel', 'user', 'group', 'person', 'space']
    
    
            sub_crit = []
            for attr in search_args:
                if attr in allowed_search_attrs:
                    if attr == 'space':
                        sub_crit.append(
                            _subcriteria_for_code(search_args[attr], 'space')
                        )
    
                    elif attr in ['user','person']:
    
                        userId = ''
                        if isinstance(search_args[attr], str):
                            userId = search_args[attr]
                        else:
                            userId = search_args[attr].userId
    
                        sub_crit.append(
                            _subcriteria_for_userId(userId)    
                        )
                    elif attr == 'group':
    
                        groupId = ''
                        if isinstance(search_args[attr], str):
                            groupId = search_args[attr]
                        else:
                            groupId = search_args[attr].code
    
                        sub_crit.append(
    
                            _subcriteria_for_permid(groupId, 'AuthorizationGroup')
    
                        )
                    elif attr == 'role':
                        # TODO
                        raise ValueError("not yet implemented")
                    elif attr == 'roleLevel':
                        # TODO
                        raise ValueError("not yet implemented")
                    else:
                        pass
                else:
    
                    raise ValueError("unknown search argument {}".format(attr))
    
    
            search_criteria['criteria'] = sub_crit
    
            fetchopts = {}
    
            for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
    
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "searchRoleAssignments",
                "params": [
                    self.token,
                    search_criteria,
                    fetchopts
                ]
            }
    
    
            attrs=['techId', 'role', 'roleLevel', 'user', 'group', 'space', 'project']
    
            resp = self._post_request(self.as_v3, request)
            if len(resp['objects']) == 0:
    
            else: 
                objects = resp['objects']
                parse_jackson(objects)
                roles = DataFrame(objects)
                roles['techId'] = roles['id'].map(extract_id)
                roles['user'] = roles['user'].map(extract_userId)
                roles['group'] = roles['authorizationGroup'].map(extract_code)
                roles['space'] = roles['space'].map(extract_code)
                roles['project'] = roles['project'].map(extract_code)
    
        def get_role_assignment(self, techId, only_data=False):
    
            """ Fetches one assigned role by its techId.
            """
    
            fetchopts = {}
            for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
                fetchopts[option] = fetch_option[option]
    
            request = {
                "method": "getRoleAssignments",
                "params": [
                    self.token,
                    [{
    
                        "techId": str(techId),
    
                        "@type": "as.dto.roleassignment.id.RoleAssignmentTechId"
                    }],
                    fetchopts
                ]
            }
    
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No assigned role found for techId={}".format(techId))
    
            for id in resp:
                data = resp[id]
                parse_jackson(data)
    
                if only_data:
                    return data
                else:
                    return RoleAssignment(self, data=data)
    
    
        def assign_role(self, role, **args):
            """ general method to assign a role to either
                - a person
                - a group
            The scope is either
                - the whole instance
                - a space
                - a project
            """
             
            userId = None
            groupId = None
            spaceId = None
            projectId = None
    
            for arg in args:
                if arg in ['person', 'group', 'space', 'project']:
                    permId = args[arg] if isinstance(args[arg],str) else args[arg].permId
                    if arg == 'person':
                        userId = {
                            "permId": permId,
                            "@type": "as.dto.person.id.PersonPermId"
                        }
                    elif arg == 'group':
                        groupId = {
                            "permId": permId,
                            "@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId"
                        }
                    elif arg == 'space':
                        spaceId = {
                            "permId": permId,
                            "@type": "as.dto.space.id.SpacePermId"
                        }
                    elif arg == 'project':
                        projectId = {
                            "permId": permId,
                            "@type": "as.dto.project.id.ProjectPermId"
                        }
    
            request = {
                "method": "createRoleAssignments",
                "params": [
                    self.token, 
                    [
    	            {
                            "role": role,
                            "userId": userId,
    		        "authorizationGroupId": groupId,
                            "spaceId": spaceId,
    		        "projectId": projectId,
    		        "@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
    	            }
    	        ]
    	    ]
    	}
            resp = self._post_request(self.as_v3, request)
            return
    
    
    
        def get_groups(self, **search_args):
            """ Get openBIS AuthorizationGroups. Returns a «Things» object.
    
            Usage::
                groups = e.get.groups()
                groups[0]             # select first group
                groups['GROUP_NAME']  # select group with this code
                for group in groups:
                    ...               # a Group object
                groups.df             # get a DataFrame object of the group list
                print(groups)         # print a nice ASCII table (eg. in IPython)
                groups                # HTML table (in a Jupyter notebook)
    
            """
    
            criteria = []
    
            for search_arg in ['code']:
                # unfortunately, there aren't many search possibilities yet...
    
                if search_arg in search_args:
    
                    if search_arg == 'code':
                        criteria.append(_criteria_for_code(search_args[search_arg]))
    
    
            search_criteria = get_search_type_for_entity('authorizationGroup')
            search_criteria['criteria'] = criteria
    
            search_criteria['operator'] = 'AND'
    
                    
            fetchopts = fetch_option['authorizationGroup']
    
            for option in ['roleAssignments', 'registrator', 'users']:
    
                fetchopts[option] = fetch_option[option]
            request = {
                "method": "searchAuthorizationGroups",
                "params": [
                    self.token,