Skip to content
Snippets Groups Projects
pybis.py 185 KiB
Newer Older
  • Learn to ignore specific revisions
  • Juan Fuentes's avatar
    Juan Fuentes committed
                try:
                    self.set_token(token)
    
                except ValueError:
                    raise ValueError(
                        "This token is no longer valid. Please provide an valid token or use the login method."
                    )
    
    Juan Fuentes's avatar
    Juan Fuentes committed
            else:
    
                # We try to set the saved token, during initialisation instead of errors, a message is printed
    
                try:
                    token = self._get_saved_token()
                    self.token = token
                except ValueError:
                    pass
    
        def _get_username(self):
            if self.token:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                match = re.search(r"(?P<username>.*)-.*", self.token)
                username = match.groupdict()["username"]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return ""
    
        @property
        def token(self):
            return self.__dict__.get("token")
    
        @token.setter
        def token(self, token: str):
            self.set_token(token, save_token=True)
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "url",
                "port",
                "hostname",
                "token",
                "login()",
                "logout()",
                "is_session_active()",
                "is_token_valid()",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "download_prefix",
                "get_mountpoint()",
    
                "get_server_information()",
    
                "get_dataset_types()",
                "get_datastores()",
    
                "get_experiment_type()",
    
                "get_collection_type()",
    
                "get_external_data_management_systems()",
    
                "get_external_data_management_system()",
                "get_material_type()",
    
                "get_project()",
                "get_projects()",
                "get_sample()",
                "get_object()",
    
                "get_objects()",
                "get_sample_type()",
                "get_object_type()",
    
                "get_property_types()",
                "get_property_type()",
    
                "get_personal_access_tokens()",
    
                "get_semantic_annotation()",
                "get_space()",
    
                "get_term()",
                "get_vocabularies()",
                "get_vocabulary()",
    
                "get_role_assignment()",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "get_plugins()",
    
                "get_plugin()",
                "new_plugin()",
                "new_group()",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "new_space()",
                "new_project()",
                "new_experiment()",
                "new_collection()",
                "new_sample()",
                "new_object()",
                "new_sample_type()",
                "new_object_type()",
                "new_dataset()",
                "new_dataset_type()",
                "new_experiment_type()",
                "new_collection_type()",
                "new_material_type()",
                "new_semantic_annotation()",
                "new_transaction()",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "set_token()",
    
        def _repr_html_(self):
            html = """
                <table border="1" class="dataframe">
                <thead>
                    <tr style="text-align: right;">
                    <th>attribute</th>
                    <th>value</th>
                    </tr>
                </thead>
                <tbody>
            """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            attrs = [
                "url",
                "port",
                "hostname",
                "verify_certificates",
                "as_v3",
                "as_v1",
                "reg_v1",
                "token",
            ]
    
                html += f"<tr> <td>{attr}</td> <td>{getattr(self, attr, '')}</td> </tr>"
    
        @property
        def spaces(self):
            return self.get_spaces()
    
        @property
        def projects(self):
            return self.get_projects()
    
    
        def gen_token_path(self, os_home=None):
    
            """generates a path to the token file.
            The token is usually saved in a file called
            ~/.pybis/hostname.token
            """
    
            if self.hostname is None:
    
                raise ValueError(
                    "hostname needs to be set before retrieving the token path."
                )
    
            if os_home is None:
                home = os.path.expanduser("~")
    
                home = os_home
            parent_folder = os.path.join(home, ".pybis")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            path = os.path.join(parent_folder, self.hostname + ".token")
    
        def save_token_on_behalf(self, os_home):
    
            """Set the correct user, only the owner of the token should be able to access it,
            used by jupyterhub authenticator
            """
    
            token_path = self._save_token_to_disk(os_home)
    
            lastIndexOfMinus = len(self.token) - "".join(reversed(self.token)).index("-") - 1
    
            token_user_name = self.token[0:lastIndexOfMinus]
    
            if token_user_name.startswith("$pat-"):
    
                token_user_name = token_user_name[5:]
    
            from pwd import getpwnam
    
            token_user_name_uid = getpwnam(token_user_name).pw_uid
    
            token_user_name_gid = getpwnam(token_user_name).pw_gid
    
            os.chown(token_path, token_user_name_uid, token_user_name_gid)
    
    
            path = Path(token_path)
            token_parent_path = path.parent.absolute()
    
    vkovtun's avatar
    vkovtun committed
            os.chown(token_parent_path, token_user_name_uid, token_user_name_gid)
    
        def _save_token_to_disk(self, os_home=None):
    
            """saves the session token to the disk, usually here: ~/.pybis/hostname.token. When a new Openbis instance is created, it tries to read this saved token by default."""
            token_path = self.gen_token_path(os_home)
    
            # create the necessary directories, if they don't exist yet
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            with open(token_path, "w") as f:
    
                f.write(self.token)
    
            # prevent other users to be able to read the token
            os.chmod(token_path, 0o600)
    
            return token_path
    
        def _delete_saved_token(self, os_home=None):
            token_path = self.gen_token_path(os_home)
            if os.path.exists(token_path):
                os.unlink(token_path)
    
    
            """Read the token from the .pybis, on the default user location"""
    
            token_path = self.gen_token_path()
    
            if not os.path.exists(token_path):
                return None
            try:
                with open(token_path) as f:
                    token = f.read()
                    if token == "":
                        return None
                    else:
                        return token
            except FileNotFoundError:
                return None
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _post_request(self, resource, request):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """internal method, used to handle all post requests and serializing / deserializing
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return self._post_request_full_url(urljoin(self.url, resource), request)
    
        def _recover_session(self, full_url, request):
            """Current token seems to be expired,
            try to use other means to connect.
            """
    
            if is_session_token(self.token):
                for session_token in get_saved_tokens(hostname=self.hostname):
                    pass
    
            else:
                for token in get_saved_pats(hostname=self.hostname):
                    if self.is_token_valid(token=token):
                        return requests.post(
                            full_url, json.dumps(request), verify=self.verify_certificates
                        )
    
        def _post_request_full_url(self, full_url, request):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """internal method, used to handle all post requests and serializing / deserializing
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "id" not in request:
    
                request["id"] = "2"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "jsonrpc" not in request:
                request["jsonrpc"] = "2.0"
    
            if request["params"][0] is None:
                raise ValueError("Your session expired, please log in again")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if DEBUG_LEVEL >= LOG_DEBUG:
                print(json.dumps(request))
    
            try:
                resp = requests.post(
                    full_url, json.dumps(request), verify=self.verify_certificates
                )
            except requests.exceptions.SSLError as exc:
                raise requests.exceptions.SSLError(
                    "Certificate validation failed. Use o=Openbis(url, verify_certificates=False) if you are using self-signed certificates."
                ) from exc
            except requests.ConnectionError as exc:
                raise requests.ConnectionError(
                    "Could not connecto to the openBIS server. Please check your internet connection, the specified hostname and port."
                ) from exc
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                resp = resp.json()
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if "error" in resp:
    
                    # print(full_url)
                    print(json.dumps(request))
                    raise ValueError(resp["error"]["message"])
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                elif "result" in resp:
                    return resp["result"]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    raise ValueError("request did not return either result nor error")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError("general error while performing post request")
    
        def logout(self):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Log out of openBIS. After logout, the session token is no longer valid."""
    
    
            logout_request = {
    
                "method": "logout",
                "params": [self.token],
    
            resp = self._post_request(self.as_v3, logout_request)
    
            return resp
    
        def login(self, username=None, password=None, save_token=False):
    
            Expects a username and a password and updates the token (session-ID).
            The token is then used for every request.
    
            Clients may want to store the credentials object in a credentials store after successful login.
    
            Throw a ValueError with the error message if login failed.
            """
    
            if password is None:
                import getpass
    
                password = getpass.getpass()
    
    
            login_request = {
    
                "method": "login",
                "params": [username, password],
    
            self.token = self._post_request(self.as_v3, login_request)
            if self.token is None:
    
                raise ValueError("login to openBIS failed")
    
            if save_token:
                self._save_token_to_disk()
    
                self.username = username
    
            return self.token
    
        def _password(self, password=None, pstore={}):
            """An elegant way to store passwords which are used later
            without giving the user an easy possibility to retrieve it.
            """
            import inspect
    
            allowed_methods = ["mount"]
    
            if password is not None:
                pstore["password"] = password
            else:
                if inspect.stack()[1][3] in allowed_methods:
                    return pstore.get("password")
                else:
                    raise Exception(
    
                        f"This method can only be called from these internal methods: {allowed_methods}"
    
        def unmount(self, mountpoint=None):
            """Unmount a given mountpoint or unmount the stored mountpoint.
            If the umount command does not work, try the pkill command.
            If still not successful, throw an error message.
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if mountpoint is None and not getattr(self, "mountpoint", None):
    
                raise ValueError("please provide a mountpoint to unmount")
    
            if mountpoint is None:
                mountpoint = self.mountpoint
    
            full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
    
            if not os.path.exists(full_mountpoint_path):
                return
    
            # mountpoint is not a mountpoint path
            if not os.path.ismount(full_mountpoint_path):
                return
    
    
            status = subprocess.call(f"umount {full_mountpoint_path}", shell=True)
    
            if status == 1:
                status = subprocess.call(
    
                    f'pkill -9 sshfs && umount "{full_mountpoint_path}"', shell=True
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise OSError(
    
                    f"could not unmount mountpoint: {full_mountpoint_path} Please try to unmount manually"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if VERBOSE:
    
                    print(f"Successfully unmounted {full_mountpoint_path}")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                self.mountpoint = None
    
        def is_mounted(self, mountpoint=None):
    
            if mountpoint is None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                mountpoint = getattr(self, "mountpoint", None)
    
    
            if mountpoint is None:
                return False
    
            return os.path.ismount(mountpoint)
    
    
        def get_mountpoint(self, search_mountpoint=False):
    
            """Returns the path to the active mountpoint.
            Returns None if no mountpoint is found or if the mountpoint is not mounted anymore.
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_mountpoint=True:  Tries to figure out an existing mountpoint for a given hostname
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                                     (experimental, does not work under Windows yet)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            mountpoint = getattr(self, "mountpoint", None)
    
            if mountpoint:
                if self.is_mounted(mountpoint):
                    return mountpoint
                else:
                    return None
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if not search_mountpoint:
                    return None
    
    
            # try to find out the mountpoint
            import subprocess
    
            p1 = subprocess.Popen(["mount", "-d"], stdout=subprocess.PIPE)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            p2 = subprocess.Popen(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                ["grep", "--fixed-strings", self.hostname],
                stdin=p1.stdout,
                stdout=subprocess.PIPE,
            )
    
            p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
            output = p2.communicate()[0]
            output = output.decode()
            # output will either be '' (=not mounted) or a string like this:
            # {username}@{hostname}:{path} on {mountpoint} (osxfuse, nodev, nosuid, synchronous, mounted by vermeul)
            try:
                mountpoint = output.split()[2]
                self.mountpoint = mountpoint
                return mountpoint
            except Exception:
                return None
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def mount(
    
                self,
                username=None,
                password=None,
                hostname=None,
                mountpoint=None,
                volname=None,
                path="/",
                port=2222,
                kex_algorithms="+diffie-hellman-group1-sha1",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        ):
    
            """Mounts openBIS dataStore without being root, using sshfs and fuse. Both
            SSHFS and FUSE must be installed on the system (see below)
    
            username -- default: the currently used username
            password -- default: the currently used password
    
            hostname -- default: the current hostname
            mountpoint -- default: ~/hostname
    
    
            FUSE / SSHFS Installation (requires root privileges):
    
    
            Mac OS X
            ========
            Follow the installation instructions on
            https://osxfuse.github.io
    
            Unix Cent OS 7
            ==============
            $ sudo yum install epel-release
            $ sudo yum --enablerepo=epel -y install fuse-sshfs
            $ user="$(whoami)"
            $ usermod -a -G fuse "$user"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if VERBOSE:
    
                    print(f"openBIS dataStore is already mounted on {self.mountpoint}")
    
    
            def check_sshfs_is_installed():
                import errno
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    subprocess.call("sshfs --help", shell=True)
    
                except OSError as e:
                    if e.errno == errno.ENOENT:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        raise ValueError(
                            'Your system seems not to have SSHFS installed. For Mac OS X, see installation instructions on https://osxfuse.github.io For Unix: $ sudo yum install epel-release && sudo yum --enablerepo=epel -y install fuse-sshfs && user="$(whoami)" && usermod -a -G fuse "$user"'
                        )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if username is None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if not username:
                raise ValueError("no token available - please provide a username")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if password is None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if not password:
                raise ValueError("please provide a password")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if hostname is None:
                hostname = self.hostname
            if not hostname:
                raise ValueError("please provide a hostname")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if mountpoint is None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                mountpoint = os.path.join("~", self.hostname)
    
            # check if mountpoint exists, otherwise create it
            full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
            if not os.path.exists(full_mountpoint_path):
                os.makedirs(full_mountpoint_path)
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            print("full_mountpoint_path: ", full_mountpoint_path)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            supported_platforms = ["darwin", "linux"]
    
            if platform not in supported_platforms:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError(
    
                    f"This method is not yet supported on {platform} plattform"
    
                "darwin": f"-oauto_cache,reconnect,defer_permissions,noappledouble,negative_vncache,volname={hostname} -oStrictHostKeyChecking=no ",
    
                "linux": "-oauto_cache,reconnect -oStrictHostKeyChecking=no",
    
            import subprocess
    
            args = {
                "username": username,
                "password": password,
    
                "port": port,
                "path": path,
                "mountpoint": mountpoint,
                "volname": volname,
    
                "os_options": os_options[platform],
                "kex_algorithms": kex_algorithms,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            cmd = (
                'echo "{password}" | sshfs'
                " {username}@{hostname}:{path} {mountpoint}"
                ' -o port={port} -o ssh_command="ssh -oKexAlgorithms={kex_algorithms}" -o password_stdin'
                " {os_options}".format(**args)
            )
    
    
            status = subprocess.call(cmd, shell=True)
    
            if status == 0:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if VERBOSE:
    
                    print(f"Mounted successfully to {full_mountpoint_path}")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                self.mountpoint = full_mountpoint_path
                return self.mountpoint
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise OSError("mount failed, exit status: ", status)
    
        def get_server_information(self):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Returns a dict containing the following server information:
            api-version, archiving-configured, authentication-service, enabled-technologies, project-samples-enabled
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """
            if self.server_information is not None:
                return self.server_information
    
    
            request = {
                "method": "getServerInformation",
                "params": [self.token],
            }
            resp = self._post_request(self.as_v3, request)
            if resp is not None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                self.server_information = ServerInformation(resp)
                return self.server_information
    
            else:
                raise ValueError("Could not get the server information")
    
    
            # Request just 1 permId
            request = {
                "method": "createPermIdStrings",
                "params": [self.token, 1],
            }
            resp = self._post_request(self.as_v3, request)
            if resp is not None:
                return resp[0]
            else:
                raise ValueError("Could not create permId")
    
        def get_datastores(self):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Get a list of all available datastores. Usually there is only one, but in some cases
    
            there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
    
            the file uploaded to.
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if hasattr(self, "datastores"):
    
                return self.datastores
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "method": "searchDataStores",
                "params": [
                    self.token,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    {"@type": "as.dto.datastore.search.DataStoreSearchCriteria"},
                    {"@type": "as.dto.datastore.fetchoptions.DataStoreFetchOptions"},
                ],
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            resp = self._post_request(self.as_v3, request)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            attrs = ["code", "downloadUrl", "remoteUrl"]
            if len(resp["objects"]) == 0:
    
                raise ValueError("No datastore found!")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            else:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                objects = resp["objects"]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                parse_jackson(objects)
                datastores = DataFrame(objects)
    
                self.datastores = datastores[attrs]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                return datastores[attrs]
    
        def gen_codes(self, entity: str, prefix: str = "", count: int = 1) -> List[str]:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "DATASET": "DATA_SET",
                "OBJECT": "SAMPLE",
                "SAMPLE": "SAMPLE",
                "EXPERIMENT": "EXPERIMENT",
                "COLLECTION": "EXPERIMENT",
                "MATERIAL": "MATERIAL",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    "no such entity: {}. Allowed entities are: DATA_SET, SAMPLE, EXPERIMENT, MATERIAL"
                )
    
                "method": "createCodes",
                "params": [self.token, prefix, entity2enum[entity], count],
    
                return self._post_request(self.as_v3, request)
    
                raise ValueError(f"Could not generate a code(s) for {entity}: {e}")
    
        def gen_code(self, entity, prefix="") -> str:
            """Get the next sequence number for a Sample, Experiment, DataSet and Material. Other entities are currently not supported.
            Usage::
                gen_code('sample', 'SAM-')
                gen_code('collection', 'COL-')
                gen_code('dataset', '')
            """
            return self.gen_codes(entity=entity, prefix=prefix)[0]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def gen_permId(self, count=1):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Generate a permId (or many permIds) for a dataSet"""
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            request = {"method": "createPermIdStrings", "params": [self.token, count]}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            try:
                return self._post_request(self.as_v3, request)
    
            except Exception as exc:
    
                raise ValueError(f"Could not generate a code: {exc}")
    
    
        def new_person(self, userId, space=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """creates an openBIS person or returns the existing person"""
    
            try:
                person = self.get_person(userId=userId)
    
                return person
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                return Person(self, userId=userId, space=space)
    
        def new_group(self, code, description=None, userIds=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """creates an openBIS group or returns an existing one."""
    
            try:
                group = self.get_group(code=code)
                group.description = description
                return group
            except Exception:
                return Group(self, code=code, description=description, userIds=userIds)
    
        def get_group(self, code, only_data=False):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Get an openBIS AuthorizationGroup. Returns a Group object."""
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            ids = [
                {
                    "@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
                    "permId": code,
                }
            ]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts = {
                "@type": "as.dto.authorizationgroup.fetchoptions.AuthorizationGroupFetchOptions"
            }
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for option in ["roleAssignments", "users", "registrator"]:
    
                fetchopts[option] = get_fetchoption_for_entity(option)
    
            fetchopts["users"]["space"] = get_fetchoption_for_entity("space")
    
            request = {
                "method": "getAuthorizationGroups",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "params": [self.token, ids, fetchopts],
    
            }
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
                raise ValueError("No group found!")
    
            for permid in resp:
                group = resp[permid]
                parse_jackson(group)
    
                if only_data:
                    return group
                else:
                    return Group(self, data=group)
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_role_assignments(self, start_with=None, count=None, **search_args):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Get the assigned roles for a given group, person or space"""
            entity = "roleAssignment"
            search_criteria = get_type_for_entity(entity, "search")
            allowed_search_attrs = ["role", "roleLevel", "user", "group", "person", "space"]
    
    
            sub_crit = []
            for attr in search_args:
                if attr in allowed_search_attrs:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    if attr == "space":
                        sub_crit.append(_subcriteria_for_code(search_args[attr], "space"))
                    elif attr in ["user", "person"]:
                        userId = ""
    
                        if isinstance(search_args[attr], str):
                            userId = search_args[attr]
                        else:
                            userId = search_args[attr].userId
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        sub_crit.append(_subcriteria_for_userId(userId))
                    elif attr == "group":
                        groupId = ""
    
                        if isinstance(search_args[attr], str):
                            groupId = search_args[attr]
                        else:
                            groupId = search_args[attr].code
    
                        sub_crit.append(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            _subcriteria_for_permid(groupId, "authorizationGroup")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif attr == "role":
    
                        # TODO
                        raise ValueError("not yet implemented")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif attr == "roleLevel":
    
                        # TODO
                        raise ValueError("not yet implemented")
                    else:
                        pass
                else:
    
                    raise ValueError(f"unknown search argument {attr}")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_criteria["criteria"] = sub_crit
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            method_name = get_method_for_entity(entity, "search")
    
            fetchopts = get_fetchoption_for_entity(entity)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts["from"] = start_with
            fetchopts["count"] = count
            for option in ["space", "project", "user", "authorizationGroup", "registrator"]:
    
                fetchopts[option] = get_fetchoption_for_entity(option)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "params": [self.token, search_criteria, fetchopts],
    
            }
    
            resp = self._post_request(self.as_v3, request)
    
    
            def create_data_frame(attrs, props, response):
                attrs = ["techId", "role", "roleLevel", "user", "group", "space", "project"]
                if len(response["objects"]) == 0:
                    roles = DataFrame(columns=attrs)
                else:
                    objects = response["objects"]
                    parse_jackson(objects)
                    roles = DataFrame(objects)
                    roles["techId"] = roles["id"].map(extract_id)
                    roles["user"] = roles["user"].map(extract_userId)
                    roles["group"] = roles["authorizationGroup"].map(extract_code)
                    roles["space"] = roles["space"].map(extract_code)
                    roles["project"] = roles["project"].map(extract_code)
                return roles[attrs]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                openbis_obj=self,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                entity="role_assignment",
                identifier_name="techId",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                start_with=start_with,
                count=count,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                totalCount=resp.get("totalCount"),
    
                df_initializer=create_data_frame,
    
        def get_role_assignment(self, techId, only_data=False):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Fetches one assigned role by its techId."""
    
            fetchopts = get_fetchoption_for_entity("roleAssignment")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for option in ["space", "project", "user", "authorizationGroup", "registrator"]:
    
                fetchopts[option] = get_fetchoption_for_entity(option)
    
    
            request = {
                "method": "getRoleAssignments",
                "params": [
                    self.token,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    [
                        {
                            "techId": str(techId),
                            "@type": "as.dto.roleassignment.id.RoleAssignmentTechId",
                        }
                    ],
                    fetchopts,
                ],
    
            }
    
            resp = self._post_request(self.as_v3, request)
            if len(resp) == 0:
    
                raise ValueError(f"No assigned role found for techId={techId}")
    
            for permid in resp:
                data = resp[permid]
    
                parse_jackson(data)
    
                if only_data:
                    return data
                else:
                    return RoleAssignment(self, data=data)
    
        def assign_role(self, role, **args):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """general method to assign a role to either
    
                - a person
                - a group
            The scope is either
                - the whole instance
                - a space
                - a project
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            defs = get_definition_for_entity("roleAssignment")
            if role not in defs["role"]:
    
                raise ValueError(f"Role should be one of these: {defs['role']}")
    
            userId = None
            groupId = None
            spaceId = None
            projectId = None
    
            for arg in args:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if arg in ["person", "group", "space", "project"]:
                    permId = args[arg] if isinstance(args[arg], str) else args[arg].permId
                    if arg == "person":
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            "@type": "as.dto.person.id.PersonPermId",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif arg == "group":
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            "@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif arg == "space":
                        spaceId = {"permId": permId, "@type": "as.dto.space.id.SpacePermId"}
                    elif arg == "project":
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            "@type": "as.dto.project.id.ProjectPermId",
    
                        }
    
            request = {
                "method": "createRoleAssignments",
                "params": [
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    [
                        {
                            "role": role,
                            "userId": userId,
                            "authorizationGroupId": groupId,
                            "spaceId": spaceId,
                            "projectId": projectId,
                            "@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
                        }
                    ],
                ],
    
            self._post_request(self.as_v3, request)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def get_groups(self, start_with=None, count=None, **search_args):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Get openBIS AuthorizationGroups. Returns a «Things» object.
    
                groups = e.get.groups()
                groups[0]             # select first group
                groups['GROUP_NAME']  # select group with this code
                for group in groups:
                    ...               # a Group object
                groups.df             # get a DataFrame object of the group list
                print(groups)         # print a nice ASCII table (eg. in IPython)
                groups                # HTML table (in a Jupyter notebook)
    
            """
    
            criteria = []
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            for search_arg in ["code"]:
    
                # unfortunately, there aren't many search possibilities yet...
    
                if search_arg in search_args:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    if search_arg == "code":
                        criteria.append(_criteria_for_code(search_args[search_arg]))
    
            search_criteria = get_search_type_for_entity("authorizationGroup")
            search_criteria["criteria"] = criteria
            search_criteria["operator"] = "AND"
    
    
            fetchopts = get_fetchoption_for_entity("authorizationGroup")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            fetchopts["from"] = start_with
            fetchopts["count"] = count
            for option in ["roleAssignments", "registrator", "users"]:
    
                fetchopts[option] = get_fetchoption_for_entity(option)
    
            request = {
                "method": "searchAuthorizationGroups",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "params": [self.token, search_criteria, fetchopts],
    
            }
            resp = self._post_request(self.as_v3, request)
    
            def create_data_frame(attrs, props, response):
                attrs = [
                    "permId",
                    "code",
                    "description",
                    "users",
                    "registrator",
                    "registrationDate",
                    "modificationDate",
                ]
                if len(response["objects"]) == 0:
                    groups = DataFrame(columns=attrs)
                else:
                    objects = response["objects"]
                    parse_jackson(objects)
                    groups = DataFrame(objects)
    
                    groups["permId"] = groups["permId"].map(extract_permid)
                    groups["registrator"] = groups["registrator"].map(extract_person)
                    groups["users"] = groups["users"].map(extract_userId)
                    groups["registrationDate"] = groups["registrationDate"].map(
                        format_timestamp
                    )
                    groups["modificationDate"] = groups["modificationDate"].map(
                        format_timestamp
                    )
                return groups[attrs]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                openbis_obj=self,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                entity="group",
                identifier_name="permId",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                start_with=start_with,
                count=count,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                totalCount=resp.get("totalCount"),
    
                df_initializer=create_data_frame,
    
                self,
                sessionName: str,
                validFrom: datetime = datetime.now(),
                validTo: datetime = None,
                force=False,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Creates a new personal access token (PAT).  If a PAT with the given sessionName
    
            already exists and its expiry date (validToDate) is not within the warning period,
            the existing PAT is returned instead.
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
            Args:
    
                sessionName (str):    a session name (mandatory)
    
                validFrom (datetime): begin of the validity period (default: now)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                validTo (datetime):   end of the validity period (default: validFrom + maximum validity period, as configured in openBIS)
                force (bool):         if set to True, a new PAT is created, regardless of existing ones.
    
            server_info = self.get_server_information()
    
            session_token = self.token
            if not is_session_token(session_token):
                session_token = self.session_token
            if not session_token:
                session_token = get_token_for_hostname(
                    self.hostname, session_token_needed=True
                )
    
            if not self.is_token_valid(session_token):
    
                raise ValueError(
                    "You you need a session token to create a new personal access token."
                )
    
    
            for existing_pat in self.get_personal_access_tokens(sessionName=sessionName):
                # check if we already reached the warning period
                validTo_date = datetime.strptime(
                    existing_pat.validToDate, "%Y-%m-%d %H:%M:%S"
                )
                if validTo_date > (