Newer
Older
Swen Vermeul
committed
except ValueError:
raise ValueError(
"This token is no longer valid. Please provide an valid token or use the login method."
)
Swen Vermeul
committed
# We try to set the saved token, during initialisation instead of errors, a message is printed
try:
token = self._get_saved_token()
self.token = token
except ValueError:
pass
def _get_username(self):
if self.token:
match = re.search(r"(?P<username>.*)-.*", self.token)
username = match.groupdict()["username"]
return username
@property
def token(self):
return self.__dict__.get("token")
@token.setter
def token(self, token: str):
self.set_token(token, save_token=True)
def __dir__(self):
return [
"url",
"port",
"hostname",
"token",
"login()",
"logout()",
"is_session_active()",
"is_token_valid()",
"mount()",
"unmount()",
"use_cache",
"clear_cache()",
"get_server_information()",
"get_datasets()",
"get_dataset_type()",
"get_dataset_types()",
"get_datastores()",
"get_deletions()",
"get_experiments()",
"get_experiment_type()",
"get_experiment_types()",
"get_collections()",
"get_collection_type()",
"get_collection_types()",
"get_external_data_management_systems()",
"get_external_data_management_system()",
"get_material_type()",
"get_material_types()",
"get_project()",
"get_projects()",
"get_sample()",
"get_object()",
"get_samples()",
"get_objects()",
"get_sample_type()",
"get_object_type()",
"get_sample_types()",
"get_object_types()",
"get_property_types()",
"get_property_type()",
"get_personal_access_tokens()",
"new_property_type()",
"get_semantic_annotations()",
"get_semantic_annotation()",
"get_space()",
"get_spaces()",
"get_tags()",
"get_tag()",
"new_tag()",
"get_terms()",
"get_term()",
"get_vocabularies()",
"get_vocabulary()",
Swen Vermeul
committed
"get_role_assignments()",
"get_role_assignment()",
"get_plugin()",
"new_plugin()",
"new_group()",
"new_space()",
"new_project()",
"new_experiment()",
"new_collection()",
"new_sample()",
"new_object()",
"new_sample_type()",
"new_object_type()",
"new_dataset()",
"new_dataset_type()",
"new_experiment_type()",
"new_collection_type()",
"new_material_type()",
"new_semantic_annotation()",
"new_transaction()",
Swen Vermeul
committed
"get_or_create_personal_access_token()",
]
def _repr_html_(self):
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
attrs = [
"url",
"port",
"hostname",
"verify_certificates",
"as_v3",
"as_v1",
"reg_v1",
"token",
]
for attr in attrs:
html += f"<tr> <td>{attr}</td> <td>{getattr(self, attr, '')}</td> </tr>"
html += """
</tbody>
</table>
"""
return html
@property
def spaces(self):
return self.get_spaces()
@property
def projects(self):
return self.get_projects()
def gen_token_path(self, os_home=None):
Swen Vermeul
committed
"""generates a path to the token file.
The token is usually saved in a file called
~/.pybis/hostname.token
"""
raise ValueError(
"hostname needs to be set before retrieving the token path."
)
if os_home is None:
home = os.path.expanduser("~")
home = os_home
parent_folder = os.path.join(home, ".pybis")
path = os.path.join(parent_folder, self.hostname + ".token")
Chandrasekhar Ramakrishnan
committed
return path
def save_token_on_behalf(self, os_home):
"""Set the correct user, only the owner of the token should be able to access it,
used by jupyterhub authenticator
"""
token_path = self._save_token_to_disk(os_home)
Swen Vermeul
committed
lastIndexOfMinus = len(self.token) - "".join(reversed(self.token)).index("-") - 1
token_user_name = self.token[0:lastIndexOfMinus]
token_user_name_uid = getpwnam(token_user_name).pw_uid
token_user_name_gid = getpwnam(token_user_name).pw_gid
Swen Vermeul
committed
os.chown(token_path, token_user_name_uid, token_user_name_gid)
path = Path(token_path)
token_parent_path = path.parent.absolute()
os.chown(token_parent_path, token_user_name_uid, token_user_name_gid)
def _save_token_to_disk(self, os_home=None):
"""saves the session token to the disk, usually here: ~/.pybis/hostname.token. When a new Openbis instance is created, it tries to read this saved token by default."""
token_path = self.gen_token_path(os_home)
Swen Vermeul
committed
# create the necessary directories, if they don't exist yet
Chandrasekhar Ramakrishnan
committed
os.makedirs(os.path.dirname(token_path), exist_ok=True)
# prevent other users to be able to read the token
os.chmod(token_path, 0o600)
Swen Vermeul
committed
def _delete_saved_token(self, os_home=None):
token_path = self.gen_token_path(os_home)
if os.path.exists(token_path):
os.unlink(token_path)
def _get_saved_token(self):
"""Read the token from the .pybis, on the default user location"""
if not os.path.exists(token_path):
return None
try:
with open(token_path) as f:
token = f.read()
if token == "":
return None
else:
return token
except FileNotFoundError:
return None
"""internal method, used to handle all post requests and serializing / deserializing
data
"""
return self._post_request_full_url(urljoin(self.url, resource), request)
def _recover_session(self, full_url, request):
"""Current token seems to be expired,
try to use other means to connect.
"""
if is_session_token(self.token):
for session_token in get_saved_tokens(hostname=self.hostname):
pass
else:
for token in get_saved_pats(hostname=self.hostname):
if self.is_token_valid(token=token):
return requests.post(
full_url, json.dumps(request), verify=self.verify_certificates
)
def _post_request_full_url(self, full_url, request):
"""internal method, used to handle all post requests and serializing / deserializing
if "jsonrpc" not in request:
request["jsonrpc"] = "2.0"
if request["params"][0] is None:
raise ValueError("Your session expired, please log in again")
if DEBUG_LEVEL >= LOG_DEBUG:
print(json.dumps(request))
try:
resp = requests.post(
full_url, json.dumps(request), verify=self.verify_certificates
)
except requests.exceptions.SSLError as exc:
raise requests.exceptions.SSLError(
"Certificate validation failed. Use o=Openbis(url, verify_certificates=False) if you are using self-signed certificates."
) from exc
except requests.ConnectionError as exc:
raise requests.ConnectionError(
"Could not connecto to the openBIS server. Please check your internet connection, the specified hostname and port."
) from exc
# print(full_url)
print(json.dumps(request))
raise ValueError(resp["error"]["message"])
elif "result" in resp:
return resp["result"]
raise ValueError("request did not return either result nor error")
raise ValueError("general error while performing post request")
"""Log out of openBIS. After logout, the session token is no longer valid."""
if self.token is None:
return
"method": "logout",
"params": [self.token],
resp = self._post_request(self.as_v3, logout_request)
Swen Vermeul
committed
self.token = None
def login(self, username=None, password=None, save_token=False):
"""Log into openBIS.
Expects a username and a password and updates the token (session-ID).
The token is then used for every request.
Chandrasekhar Ramakrishnan
committed
Clients may want to store the credentials object in a credentials store after successful login.
Throw a ValueError with the error message if login failed.
"""
if password is None:
import getpass
"method": "login",
"params": [username, password],
self.token = self._post_request(self.as_v3, login_request)
if self.token is None:
raise ValueError("login to openBIS failed")
if save_token:
self._save_token_to_disk()
self._password(password)
self.username = username
def _password(self, password=None, pstore={}):
"""An elegant way to store passwords which are used later
without giving the user an easy possibility to retrieve it.
"""
import inspect
allowed_methods = ["mount"]
if password is not None:
pstore["password"] = password
else:
if inspect.stack()[1][3] in allowed_methods:
return pstore.get("password")
else:
raise Exception(
f"This method can only be called from these internal methods: {allowed_methods}"
)
def unmount(self, mountpoint=None):
"""Unmount a given mountpoint or unmount the stored mountpoint.
If the umount command does not work, try the pkill command.
If still not successful, throw an error message.
if mountpoint is None and not getattr(self, "mountpoint", None):
raise ValueError("please provide a mountpoint to unmount")
if mountpoint is None:
mountpoint = self.mountpoint
full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
if not os.path.exists(full_mountpoint_path):
return
# mountpoint is not a mountpoint path
if not os.path.ismount(full_mountpoint_path):
return
status = subprocess.call(f"umount {full_mountpoint_path}", shell=True)
if status == 1:
status = subprocess.call(
f'pkill -9 sshfs && umount "{full_mountpoint_path}"', shell=True
)
if status == 1:
f"could not unmount mountpoint: {full_mountpoint_path} Please try to unmount manually"
print(f"Successfully unmounted {full_mountpoint_path}")
def is_mounted(self, mountpoint=None):
if mountpoint is None:
if mountpoint is None:
return False
return os.path.ismount(mountpoint)
def get_mountpoint(self, search_mountpoint=False):
"""Returns the path to the active mountpoint.
Returns None if no mountpoint is found or if the mountpoint is not mounted anymore.
search_mountpoint=True: Tries to figure out an existing mountpoint for a given hostname
(experimental, does not work under Windows yet)
"""
if mountpoint:
if self.is_mounted(mountpoint):
return mountpoint
else:
return None
# try to find out the mountpoint
import subprocess
p1 = subprocess.Popen(["mount", "-d"], stdout=subprocess.PIPE)
["grep", "--fixed-strings", self.hostname],
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]
output = output.decode()
# output will either be '' (=not mounted) or a string like this:
# {username}@{hostname}:{path} on {mountpoint} (osxfuse, nodev, nosuid, synchronous, mounted by vermeul)
try:
mountpoint = output.split()[2]
self.mountpoint = mountpoint
return mountpoint
except Exception:
return None
Adam Laskowski
committed
self,
username=None,
password=None,
hostname=None,
mountpoint=None,
volname=None,
path="/",
port=2222,
kex_algorithms="+diffie-hellman-group1-sha1",
"""Mounts openBIS dataStore without being root, using sshfs and fuse. Both
SSHFS and FUSE must be installed on the system (see below)
Params:
username -- default: the currently used username
password -- default: the currently used password
hostname -- default: the current hostname
mountpoint -- default: ~/hostname
FUSE / SSHFS Installation (requires root privileges):
Mac OS X
========
Follow the installation instructions on
https://osxfuse.github.io
Unix Cent OS 7
==============
$ sudo yum install epel-release
$ sudo yum --enablerepo=epel -y install fuse-sshfs
$ user="$(whoami)"
$ usermod -a -G fuse "$user"
if self.is_mounted():
print(f"openBIS dataStore is already mounted on {self.mountpoint}")
return
def check_sshfs_is_installed():
import errno
Swen Vermeul
committed
import subprocess
except OSError as e:
if e.errno == errno.ENOENT:
raise ValueError(
'Your system seems not to have SSHFS installed. For Mac OS X, see installation instructions on https://osxfuse.github.io For Unix: $ sudo yum install epel-release && sudo yum --enablerepo=epel -y install fuse-sshfs && user="$(whoami)" && usermod -a -G fuse "$user"'
)
check_sshfs_is_installed()
username = self._get_username()
if not username:
raise ValueError("no token available - please provide a username")
password = self._password()
if not password:
raise ValueError("please provide a password")
if hostname is None:
hostname = self.hostname
if not hostname:
raise ValueError("please provide a hostname")
# check if mountpoint exists, otherwise create it
full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
if not os.path.exists(full_mountpoint_path):
os.makedirs(full_mountpoint_path)
print("full_mountpoint_path: ", full_mountpoint_path)
from sys import platform
if platform not in supported_platforms:
f"This method is not yet supported on {platform} plattform"
os_options = {
"darwin": f"-oauto_cache,reconnect,defer_permissions,noappledouble,negative_vncache,volname={hostname} -oStrictHostKeyChecking=no ",
Swen Vermeul
committed
"linux": "-oauto_cache,reconnect -oStrictHostKeyChecking=no",
}
if volname is None:
volname = hostname
args = {
"username": username,
"password": password,
"hostname": hostname,
"port": port,
"path": path,
"mountpoint": mountpoint,
"volname": volname,
"os_options": os_options[platform],
"kex_algorithms": kex_algorithms,
cmd = (
'echo "{password}" | sshfs'
" {username}@{hostname}:{path} {mountpoint}"
' -o port={port} -o ssh_command="ssh -oKexAlgorithms={kex_algorithms}" -o password_stdin'
" {os_options}".format(**args)
)
status = subprocess.call(cmd, shell=True)
if status == 0:
print(f"Mounted successfully to {full_mountpoint_path}")
self.mountpoint = full_mountpoint_path
return self.mountpoint
raise OSError("mount failed, exit status: ", status)
def get_server_information(self):
"""Returns a dict containing the following server information:
api-version, archiving-configured, authentication-service, enabled-technologies, project-samples-enabled
"""
if self.server_information is not None:
return self.server_information
request = {
"method": "getServerInformation",
"params": [self.token],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
self.server_information = ServerInformation(resp)
return self.server_information
else:
raise ValueError("Could not get the server information")
def create_permId(self):
Chandrasekhar Ramakrishnan
committed
"""Have the server generate a new permId"""
Chandrasekhar Ramakrishnan
committed
# Request just 1 permId
request = {
"method": "createPermIdStrings",
"params": [self.token, 1],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
return resp[0]
else:
raise ValueError("Could not create permId")
Chandrasekhar Ramakrishnan
committed
def get_datastores(self):
"""Get a list of all available datastores. Usually there is only one, but in some cases
there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
"method": "searchDataStores",
"params": [
self.token,
{"@type": "as.dto.datastore.search.DataStoreSearchCriteria"},
{"@type": "as.dto.datastore.fetchoptions.DataStoreFetchOptions"},
],
attrs = ["code", "downloadUrl", "remoteUrl"]
if len(resp["objects"]) == 0:
raise ValueError("No datastore found!")
parse_jackson(objects)
datastores = DataFrame(objects)
self.datastores = datastores[attrs]
def gen_codes(self, entity: str, prefix: str = "", count: int = 1) -> List[str]:
Swen Vermeul
committed
entity = entity.upper()
Swen Vermeul
committed
entity2enum = {
"DATASET": "DATA_SET",
"OBJECT": "SAMPLE",
"SAMPLE": "SAMPLE",
"EXPERIMENT": "EXPERIMENT",
"COLLECTION": "EXPERIMENT",
"MATERIAL": "MATERIAL",
Swen Vermeul
committed
}
if entity not in entity2enum:
"no such entity: {}. Allowed entities are: DATA_SET, SAMPLE, EXPERIMENT, MATERIAL"
)
Swen Vermeul
committed
request = {
"method": "createCodes",
"params": [self.token, prefix, entity2enum[entity], count],
Swen Vermeul
committed
}
try:
return self._post_request(self.as_v3, request)
Swen Vermeul
committed
except Exception as e:
raise ValueError(f"Could not generate a code(s) for {entity}: {e}")
def gen_code(self, entity, prefix="") -> str:
"""Get the next sequence number for a Sample, Experiment, DataSet and Material. Other entities are currently not supported.
Usage::
gen_code('sample', 'SAM-')
gen_code('collection', 'COL-')
gen_code('dataset', '')
"""
return self.gen_codes(entity=entity, prefix=prefix)[0]
Swen Vermeul
committed
"""Generate a permId (or many permIds) for a dataSet"""
request = {"method": "createPermIdStrings", "params": [self.token, count]}
try:
return self._post_request(self.as_v3, request)
raise ValueError(f"Could not generate a code: {exc}")
def new_person(self, userId, space=None):
"""creates an openBIS person or returns the existing person"""
try:
person = self.get_person(userId=userId)
def new_group(self, code, description=None, userIds=None):
"""creates an openBIS group or returns an existing one."""
Swen Vermeul
committed
try:
group = self.get_group(code=code)
group.description = description
return group
except Exception:
return Group(self, code=code, description=description, userIds=userIds)
def get_group(self, code, only_data=False):
"""Get an openBIS AuthorizationGroup. Returns a Group object."""
ids = [
{
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
"permId": code,
}
]
fetchopts = {
"@type": "as.dto.authorizationgroup.fetchoptions.AuthorizationGroupFetchOptions"
}
for option in ["roleAssignments", "users", "registrator"]:
fetchopts[option] = get_fetchoption_for_entity(option)
fetchopts["users"]["space"] = get_fetchoption_for_entity("space")
request = {
"method": "getAuthorizationGroups",
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No group found!")
for permid in resp:
group = resp[permid]
parse_jackson(group)
if only_data:
return group
else:
return Group(self, data=group)
def get_role_assignments(self, start_with=None, count=None, **search_args):
"""Get the assigned roles for a given group, person or space"""
entity = "roleAssignment"
search_criteria = get_type_for_entity(entity, "search")
allowed_search_attrs = ["role", "roleLevel", "user", "group", "person", "space"]
sub_crit = []
for attr in search_args:
if attr in allowed_search_attrs:
if attr == "space":
sub_crit.append(_subcriteria_for_code(search_args[attr], "space"))
elif attr in ["user", "person"]:
userId = ""
if isinstance(search_args[attr], str):
userId = search_args[attr]
else:
userId = search_args[attr].userId
sub_crit.append(_subcriteria_for_userId(userId))
elif attr == "group":
groupId = ""
if isinstance(search_args[attr], str):
groupId = search_args[attr]
else:
groupId = search_args[attr].code
_subcriteria_for_permid(groupId, "authorizationGroup")
# TODO
raise ValueError("not yet implemented")
# TODO
raise ValueError("not yet implemented")
else:
pass
else:
raise ValueError(f"unknown search argument {attr}")
method_name = get_method_for_entity(entity, "search")
fetchopts = get_fetchoption_for_entity(entity)
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["space", "project", "user", "authorizationGroup", "registrator"]:
fetchopts[option] = get_fetchoption_for_entity(option)
Swen Vermeul
committed
"method": method_name,
"params": [self.token, search_criteria, fetchopts],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = ["techId", "role", "roleLevel", "user", "group", "space", "project"]
if len(response["objects"]) == 0:
roles = DataFrame(columns=attrs)
else:
objects = response["objects"]
parse_jackson(objects)
roles = DataFrame(objects)
roles["techId"] = roles["id"].map(extract_id)
roles["user"] = roles["user"].map(extract_userId)
roles["group"] = roles["authorizationGroup"].map(extract_code)
roles["space"] = roles["space"].map(extract_code)
roles["project"] = roles["project"].map(extract_code)
return roles[attrs]
entity="role_assignment",
identifier_name="techId",
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
Swen Vermeul
committed
def get_role_assignment(self, techId, only_data=False):
fetchopts = get_fetchoption_for_entity("roleAssignment")
for option in ["space", "project", "user", "authorizationGroup", "registrator"]:
fetchopts[option] = get_fetchoption_for_entity(option)
request = {
"method": "getRoleAssignments",
"params": [
self.token,
[
{
"techId": str(techId),
"@type": "as.dto.roleassignment.id.RoleAssignmentTechId",
}
],
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError(f"No assigned role found for techId={techId}")
for permid in resp:
data = resp[permid]
parse_jackson(data)
if only_data:
return data
else:
return RoleAssignment(self, data=data)
def assign_role(self, role, **args):
- a person
- a group
The scope is either
- the whole instance
- a space
- a project
"""
Swen Vermeul
committed
role = role.upper()
defs = get_definition_for_entity("roleAssignment")
if role not in defs["role"]:
raise ValueError(f"Role should be one of these: {defs['role']}")
userId = None
groupId = None
spaceId = None
projectId = None
for arg in args:
if arg in ["person", "group", "space", "project"]:
permId = args[arg] if isinstance(args[arg], str) else args[arg].permId
if arg == "person":
userId = {
"permId": permId,
}
groupId = {
"permId": permId,
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
}
elif arg == "space":
spaceId = {"permId": permId, "@type": "as.dto.space.id.SpacePermId"}
elif arg == "project":
projectId = {
"permId": permId,
}
request = {
"method": "createRoleAssignments",
"params": [
[
{
"role": role,
"userId": userId,
"authorizationGroupId": groupId,
"spaceId": spaceId,
"projectId": projectId,
"@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
}
],
],
self._post_request(self.as_v3, request)
return
def get_groups(self, start_with=None, count=None, **search_args):
"""Get openBIS AuthorizationGroups. Returns a «Things» object.
Swen Vermeul
committed
Usage:
groups = e.get.groups()
groups[0] # select first group
groups['GROUP_NAME'] # select group with this code
for group in groups:
... # a Group object
groups.df # get a DataFrame object of the group list
print(groups) # print a nice ASCII table (eg. in IPython)
groups # HTML table (in a Jupyter notebook)
"""
criteria = []
# unfortunately, there aren't many search possibilities yet...
if search_arg in search_args:
if search_arg == "code":
criteria.append(_criteria_for_code(search_args[search_arg]))
search_criteria = get_search_type_for_entity("authorizationGroup")
search_criteria["criteria"] = criteria
search_criteria["operator"] = "AND"
fetchopts = get_fetchoption_for_entity("authorizationGroup")
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["roleAssignments", "registrator", "users"]:
fetchopts[option] = get_fetchoption_for_entity(option)
request = {
"method": "searchAuthorizationGroups",
"params": [self.token, search_criteria, fetchopts],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
def create_data_frame(attrs, props, response):
attrs = [
"permId",
"code",
"description",
"users",
"registrator",
"registrationDate",
"modificationDate",
]
if len(response["objects"]) == 0:
groups = DataFrame(columns=attrs)
else:
objects = response["objects"]
parse_jackson(objects)
groups = DataFrame(objects)
groups["permId"] = groups["permId"].map(extract_permid)
groups["registrator"] = groups["registrator"].map(extract_person)
groups["users"] = groups["users"].map(extract_userId)
groups["registrationDate"] = groups["registrationDate"].map(
format_timestamp
)
groups["modificationDate"] = groups["modificationDate"].map(
format_timestamp
)
return groups[attrs]
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
Swen Vermeul
committed
def get_or_create_personal_access_token(
Adam Laskowski
committed
self,
sessionName: str,
validFrom: datetime = datetime.now(),
validTo: datetime = None,
force=False,
"""Creates a new personal access token (PAT). If a PAT with the given sessionName
Swen Vermeul
committed
already exists and its expiry date (validToDate) is not within the warning period,
the existing PAT is returned instead.
Args:
sessionName (str): a session name (mandatory)
Swen Vermeul
committed
validFrom (datetime): begin of the validity period (default: now)
validTo (datetime): end of the validity period (default: validFrom + maximum validity period, as configured in openBIS)
force (bool): if set to True, a new PAT is created, regardless of existing ones.
server_info = self.get_server_information()
session_token = self.token
if not is_session_token(session_token):
session_token = self.session_token
if not session_token:
session_token = get_token_for_hostname(
self.hostname, session_token_needed=True
)
if not self.is_token_valid(session_token):
raise ValueError(
"You you need a session token to create a new personal access token."
)
for existing_pat in self.get_personal_access_tokens(sessionName=sessionName):
# check if we already reached the warning period
validTo_date = datetime.strptime(
existing_pat.validToDate, "%Y-%m-%d %H:%M:%S"
)
if validTo_date > (
Adam Laskowski
committed
datetime.now()