Newer
Older
"get_tags()",
"get_tag()",
"new_tag()",
"get_terms()",
"get_term()",
"get_vocabularies()",
"get_vocabulary()",
Swen Vermeul
committed
"get_role_assignments()",
"get_role_assignment()",
"get_plugin()",
"new_plugin()",
"new_group()",
"new_space()",
"new_project()",
"new_experiment()",
"new_collection()",
"new_sample()",
"new_object()",
"new_sample_type()",
"new_object_type()",
"new_dataset()",
"new_dataset_type()",
"new_experiment_type()",
"new_collection_type()",
"new_material_type()",
"new_semantic_annotation()",
"new_transaction()",
"set_token()",
]
def _repr_html_(self):
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
attrs = [
"url",
"port",
"hostname",
"verify_certificates",
"as_v3",
"as_v1",
"reg_v1",
"token",
]
for attr in attrs:
html += "<tr> <td>{}</td> <td>{}</td> </tr>".format(
)
html += """
</tbody>
</table>
"""
return html
@property
def spaces(self):
return self.get_spaces()
@property
def projects(self):
return self.get_projects()
Swen Vermeul
committed
def gen_token_path(self, parent_folder=None):
"""generates a path to the token file.
The token is usually saved in a file called
~/.pybis/hostname.token
"""
Chandrasekhar Ramakrishnan
committed
if parent_folder is None:
Swen Vermeul
committed
# save token under ~/.pybis folder
parent_folder = os.path.join(os.path.expanduser("~"), ".pybis")
path = os.path.join(parent_folder, self.hostname + ".token")
Chandrasekhar Ramakrishnan
committed
return path
def _save_token(self, token=None, parent_folder=None):
"""saves the session token to the disk, usually here: ~/.pybis/hostname.token. When a new Openbis instance is created, it tries to read this saved token by default."""
Swen Vermeul
committed
if token is None:
token = self.token
Swen Vermeul
committed
if parent_folder is None:
token_path = self.gen_token_path()
else:
token_path = self.gen_token_path(parent_folder)
# create the necessary directories, if they don't exist yet
Chandrasekhar Ramakrishnan
committed
os.makedirs(os.path.dirname(token_path), exist_ok=True)
Swen Vermeul
committed
f.write(token)
self.token_path = token_path
# prevent other users to be able to read the token
os.chmod(token_path, 0o600)
Swen Vermeul
committed
def _get_saved_token(self):
"""Read the token from the .pybis
If the token is not valid anymore, delete it.
token_path = self.token_path or self.gen_token_path()
if not os.path.exists(token_path):
return None
try:
with open(token_path) as f:
token = f.read()
if token == "":
return None
else:
return token
except FileNotFoundError:
return None
def _delete_saved_token(self):
if self.token_path:
try:
os.remove(self.token_path)
except FileNotFoundError:
return None
"""internal method, used to handle all post requests and serializing / deserializing
data
"""
return self._post_request_full_url(urljoin(self.url, resource), request)
def _post_request_full_url(self, full_url, request):
"""internal method, used to handle all post requests and serializing / deserializing
if "jsonrpc" not in request:
request["jsonrpc"] = "2.0"
if request["params"][0] is None:
raise ValueError("Your session expired, please log in again")
if DEBUG_LEVEL >= LOG_DEBUG:
print(json.dumps(request))
full_url, json.dumps(request), verify=self.verify_certificates
Swen Vermeul
committed
raise ValueError(resp["error"]["message"])
elif "result" in resp:
return resp["result"]
raise ValueError("request did not return either result nor error")
raise ValueError("general error while performing post request")
"""Log out of openBIS. After logout, the session token is no longer valid."""
if self.token is None:
return
"method": "logout",
"params": [self.token],
resp = self._post_request(self.as_v3, logout_request)
Swen Vermeul
committed
self.token = None
self.token_path = None
Swen Vermeul
committed
def login(self, username=None, password=None, save_token=False):
"""Log into openBIS.
Expects a username and a password and updates the token (session-ID).
The token is then used for every request.
Chandrasekhar Ramakrishnan
committed
Clients may want to store the credentials object in a credentials store after successful login.
Throw a ValueError with the error message if login failed.
"""
if password is None:
import getpass
"method": "login",
"params": [username, password],
result = self._post_request(self.as_v3, login_request)
if result is None:
raise ValueError("login to openBIS failed")
else:
self.token = result
Swen Vermeul
committed
if save_token:
self._save_token()
self._password(password)
Swen Vermeul
committed
# update the OPENBIS_TOKEN environment variable, if OPENBIS_URL is identical to self.url
if os.environ.get("OPENBIS_URL") == self.url:
os.environ["OPENBIS_TOKEN"] = self.token
Swen Vermeul
committed
return self.token
"""An elegant way to store passwords which are used later
without giving the user an easy possibility to retrieve it.
"""
import inspect
if password is not None:
else:
if inspect.stack()[1][3] in allowed_methods:
else:
"This method can only be called from these internal methods: {}".format(
allowed_methods
)
)
def unmount(self, mountpoint=None):
"""Unmount a given mountpoint or unmount the stored mountpoint.
If the umount command does not work, try the pkill command.
If still not successful, throw an error message.
if mountpoint is None and not getattr(self, "mountpoint", None):
raise ValueError("please provide a mountpoint to unmount")
if mountpoint is None:
mountpoint = self.mountpoint
full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
if not os.path.exists(full_mountpoint_path):
return
# mountpoint is not a mountpoint path
if not os.path.ismount(full_mountpoint_path):
return
status = subprocess.call("umount {}".format(full_mountpoint_path), shell=True)
if status == 1:
status = subprocess.call(
'pkill -9 sshfs && umount "{}"'.format(full_mountpoint_path), shell=True
)
if status == 1:
raise OSError(
"could not unmount mountpoint: {} Please try to unmount manually".format(
full_mountpoint_path
)
)
if VERBOSE:
print("Successfully unmounted {}".format(full_mountpoint_path))
self.mountpoint = None
def is_mounted(self, mountpoint=None):
if mountpoint is None:
if mountpoint is None:
return False
return os.path.ismount(mountpoint)
def get_mountpoint(self, search_mountpoint=False):
"""Returns the path to the active mountpoint.
Returns None if no mountpoint is found or if the mountpoint is not mounted anymore.
search_mountpoint=True: Tries to figure out an existing mountpoint for a given hostname
(experimental, does not work under Windows yet)
"""
if mountpoint:
if self.is_mounted(mountpoint):
return mountpoint
else:
return None
# try to find out the mountpoint
import subprocess
p1 = subprocess.Popen(["mount", "-d"], stdout=subprocess.PIPE)
["grep", "--fixed-strings", self.hostname],
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]
output = output.decode()
# output will either be '' (=not mounted) or a string like this:
# {username}@{hostname}:{path} on {mountpoint} (osxfuse, nodev, nosuid, synchronous, mounted by vermeul)
try:
mountpoint = output.split()[2]
self.mountpoint = mountpoint
return mountpoint
except Exception:
return None
def mount(
self,
username=None,
password=None,
hostname=None,
mountpoint=None,
volname=None,
path="/",
port=2222,
kex_algorithms="+diffie-hellman-group1-sha1",
):
"""Mounts openBIS dataStore without being root, using sshfs and fuse. Both
SSHFS and FUSE must be installed on the system (see below)
Params:
username -- default: the currently used username
password -- default: the currently used password
hostname -- default: the current hostname
mountpoint -- default: ~/hostname
FUSE / SSHFS Installation (requires root privileges):
Mac OS X
========
Follow the installation instructions on
https://osxfuse.github.io
Unix Cent OS 7
==============
$ sudo yum install epel-release
$ sudo yum --enablerepo=epel -y install fuse-sshfs
$ user="$(whoami)"
$ usermod -a -G fuse "$user"
if self.is_mounted():
print(
"openBIS dataStore is already mounted on {}".format(self.mountpoint)
)
return
def check_sshfs_is_installed():
import subprocess
import errno
except OSError as e:
if e.errno == errno.ENOENT:
raise ValueError(
'Your system seems not to have SSHFS installed. For Mac OS X, see installation instructions on https://osxfuse.github.io For Unix: $ sudo yum install epel-release && sudo yum --enablerepo=epel -y install fuse-sshfs && user="$(whoami)" && usermod -a -G fuse "$user"'
)
check_sshfs_is_installed()
if username is None:
username = self._get_username()
if not username:
raise ValueError("no token available - please provide a username")
if password is None:
password = self._password()
if not password:
raise ValueError("please provide a password")
if hostname is None:
hostname = self.hostname
if not hostname:
raise ValueError("please provide a hostname")
# check if mountpoint exists, otherwise create it
full_mountpoint_path = os.path.abspath(os.path.expanduser(mountpoint))
if not os.path.exists(full_mountpoint_path):
os.makedirs(full_mountpoint_path)
print("full_mountpoint_path: ", full_mountpoint_path)
from sys import platform
if platform not in supported_platforms:
"This method is not yet supported on {} plattform".format(platform)
)
os_options = {
"darwin": "-oauto_cache,reconnect,defer_permissions,noappledouble,negative_vncache,volname={} -oStrictHostKeyChecking=no ".format(
hostname
),
Swen Vermeul
committed
"linux": "-oauto_cache,reconnect -oStrictHostKeyChecking=no",
}
if volname is None:
volname = hostname
args = {
"username": username,
"password": password,
"hostname": hostname,
"port": port,
"path": path,
"mountpoint": mountpoint,
"volname": volname,
"os_options": os_options[platform],
"kex_algorithms": kex_algorithms,
cmd = (
'echo "{password}" | sshfs'
" {username}@{hostname}:{path} {mountpoint}"
' -o port={port} -o ssh_command="ssh -oKexAlgorithms={kex_algorithms}" -o password_stdin'
" {os_options}".format(**args)
)
status = subprocess.call(cmd, shell=True)
if status == 0:
if VERBOSE:
print("Mounted successfully to {}".format(full_mountpoint_path))
self.mountpoint = full_mountpoint_path
return self.mountpoint
raise OSError("mount failed, exit status: ", status)
def get_server_information(self):
"""Returns a dict containing the following server information:
api-version, archiving-configured, authentication-service, enabled-technologies, project-samples-enabled
"""
if self.server_information is not None:
return self.server_information
request = {
"method": "getServerInformation",
"params": [self.token],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
# result is a dict of strings - use more useful types
keys_boolean = ["archiving-configured", "project-samples-enabled"]
keys_csv = ["enabled-technologies"]
for key in keys_boolean:
if key in resp:
for key in keys_csv:
if key in resp:
map(lambda item: item.strip(), resp[key].split(","))
)
self.server_information = ServerInformation(resp)
return self.server_information
else:
raise ValueError("Could not get the server information")
def create_permId(self):
Chandrasekhar Ramakrishnan
committed
"""Have the server generate a new permId"""
Chandrasekhar Ramakrishnan
committed
# Request just 1 permId
request = {
"method": "createPermIdStrings",
"params": [self.token, 1],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
return resp[0]
else:
raise ValueError("Could not create permId")
Chandrasekhar Ramakrishnan
committed
def get_datastores(self):
"""Get a list of all available datastores. Usually there is only one, but in some cases
there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
"method": "searchDataStores",
"params": [
self.token,
{"@type": "as.dto.datastore.search.DataStoreSearchCriteria"},
{"@type": "as.dto.datastore.fetchoptions.DataStoreFetchOptions"},
],
attrs = ["code", "downloadUrl", "remoteUrl"]
if len(resp["objects"]) == 0:
raise ValueError("No datastore found!")
parse_jackson(objects)
datastores = DataFrame(objects)
self.datastores = datastores[attrs]
Swen Vermeul
committed
def gen_code(self, entity, prefix=""):
"""Get the next sequence number for a Sample, Experiment, DataSet and Material. Other entities are currently not supported.
Swen Vermeul
committed
Usage::
gen_code('SAMPLE', 'SAM-')
gen_code('EXPERIMENT', 'EXP-')
gen_code('DATASET', '')
gen_code('MATERIAL', 'MAT-')
"""
entity = entity.upper()
entity2enum = {
"DATASET": "DATA_SET",
"OBJECT": "SAMPLE",
"SAMPLE": "SAMPLE",
"EXPERIMENT": "EXPERIMENT",
"COLLECTION": "EXPERIMENT",
"MATERIAL": "MATERIAL",
Swen Vermeul
committed
}
if entity not in entity2enum:
"no such entity: {}. Allowed entities are: DATA_SET, SAMPLE, EXPERIMENT, MATERIAL"
)
Swen Vermeul
committed
request = {
"method": "generateCode",
"params": [self.token, prefix, entity2enum[entity]],
Swen Vermeul
committed
}
try:
return self._post_request(self.as_v1, request)
except Exception as e:
raise ValueError("Could not generate a code for {}: {}".format(entity, e))
Swen Vermeul
committed
"""Generate a permId (or many permIds) for a dataSet"""
request = {"method": "createPermIdStrings", "params": [self.token, count]}
try:
return self._post_request(self.as_v3, request)
except Exception as exc:
raise ValueError("Could not generate a code: {}".format(exc))
def new_person(self, userId, space=None):
"""creates an openBIS person or returns the existing person"""
try:
person = self.get_person(userId=userId)
def new_group(self, code, description=None, userIds=None):
"""creates an openBIS group or returns an existing one."""
Swen Vermeul
committed
try:
group = self.get_group(code=code)
group.description = description
return group
except Exception:
return Group(self, code=code, description=description, userIds=userIds)
def get_group(self, code, only_data=False):
"""Get an openBIS AuthorizationGroup. Returns a Group object."""
ids = [
{
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
"permId": code,
}
]
fetchopts = {
"@type": "as.dto.authorizationgroup.fetchoptions.AuthorizationGroupFetchOptions"
}
for option in ["roleAssignments", "users", "registrator"]:
fetchopts[option] = fetch_option[option]
fetchopts["users"]["space"] = fetch_option["space"]
request = {
"method": "getAuthorizationGroups",
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No group found!")
for permid in resp:
group = resp[permid]
parse_jackson(group)
if only_data:
return group
else:
return Group(self, data=group)
def get_role_assignments(self, start_with=None, count=None, **search_args):
"""Get the assigned roles for a given group, person or space"""
entity = "roleAssignment"
search_criteria = get_type_for_entity(entity, "search")
allowed_search_attrs = ["role", "roleLevel", "user", "group", "person", "space"]
sub_crit = []
for attr in search_args:
if attr in allowed_search_attrs:
if attr == "space":
sub_crit.append(_subcriteria_for_code(search_args[attr], "space"))
elif attr in ["user", "person"]:
userId = ""
if isinstance(search_args[attr], str):
userId = search_args[attr]
else:
userId = search_args[attr].userId
sub_crit.append(_subcriteria_for_userId(userId))
elif attr == "group":
groupId = ""
if isinstance(search_args[attr], str):
groupId = search_args[attr]
else:
groupId = search_args[attr].code
_subcriteria_for_permid(groupId, "authorizationGroup")
# TODO
raise ValueError("not yet implemented")
# TODO
raise ValueError("not yet implemented")
else:
pass
else:
Swen Vermeul
committed
raise ValueError("unknown search argument {}".format(attr))
method_name = get_method_for_entity(entity, "search")
Swen Vermeul
committed
fetchopts = fetch_option[entity]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["space", "project", "user", "authorizationGroup", "registrator"]:
fetchopts[option] = fetch_option[option]
request = {
Swen Vermeul
committed
"method": method_name,
"params": [self.token, search_criteria, fetchopts],
attrs = ["techId", "role", "roleLevel", "user", "group", "space", "project"]
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
roles = DataFrame(columns=attrs)
Swen Vermeul
committed
parse_jackson(objects)
roles = DataFrame(objects)
roles["techId"] = roles["id"].map(extract_id)
roles["user"] = roles["user"].map(extract_userId)
roles["group"] = roles["authorizationGroup"].map(extract_code)
roles["space"] = roles["space"].map(extract_code)
roles["project"] = roles["project"].map(extract_code)
Swen Vermeul
committed
df=roles[attrs],
Swen Vermeul
committed
def get_role_assignment(self, techId, only_data=False):
fetchopts = fetch_option["roleAssignment"]
for option in ["space", "project", "user", "authorizationGroup", "registrator"]:
fetchopts[option] = fetch_option[option]
request = {
"method": "getRoleAssignments",
"params": [
self.token,
[
{
"techId": str(techId),
"@type": "as.dto.roleassignment.id.RoleAssignmentTechId",
}
],
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No assigned role found for techId={}".format(techId))
for permid in resp:
data = resp[permid]
parse_jackson(data)
if only_data:
return data
else:
return RoleAssignment(self, data=data)
def assign_role(self, role, **args):
- a person
- a group
The scope is either
- the whole instance
- a space
- a project
"""
Swen Vermeul
committed
role = role.upper()
defs = get_definition_for_entity("roleAssignment")
if role not in defs["role"]:
raise ValueError("Role should be one of these: {}".format(defs["role"]))
userId = None
groupId = None
spaceId = None
projectId = None
for arg in args:
if arg in ["person", "group", "space", "project"]:
permId = args[arg] if isinstance(args[arg], str) else args[arg].permId
if arg == "person":
userId = {
"permId": permId,
}
groupId = {
"permId": permId,
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
}
elif arg == "space":
spaceId = {"permId": permId, "@type": "as.dto.space.id.SpacePermId"}
elif arg == "project":
projectId = {
"permId": permId,
}
request = {
"method": "createRoleAssignments",
"params": [
[
{
"role": role,
"userId": userId,
"authorizationGroupId": groupId,
"spaceId": spaceId,
"projectId": projectId,
"@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
}
],
],
self._post_request(self.as_v3, request)
return
def get_groups(self, start_with=None, count=None, **search_args):
"""Get openBIS AuthorizationGroups. Returns a «Things» object.
Usage::
groups = e.get.groups()
groups[0] # select first group
groups['GROUP_NAME'] # select group with this code
for group in groups:
... # a Group object
groups.df # get a DataFrame object of the group list
print(groups) # print a nice ASCII table (eg. in IPython)
groups # HTML table (in a Jupyter notebook)
"""
criteria = []
# unfortunately, there aren't many search possibilities yet...
if search_arg in search_args:
if search_arg == "code":
criteria.append(_criteria_for_code(search_args[search_arg]))
search_criteria = get_search_type_for_entity("authorizationGroup")
search_criteria["criteria"] = criteria
search_criteria["operator"] = "AND"
fetchopts = fetch_option["authorizationGroup"]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["roleAssignments", "registrator", "users"]:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchAuthorizationGroups",
"params": [self.token, search_criteria, fetchopts],
}
resp = self._post_request(self.as_v3, request)
attrs = [
"permId",
"code",
"description",
"users",
"registrator",
"registrationDate",
"modificationDate",
]
if len(resp["objects"]) == 0:
groups = DataFrame(columns=attrs)
else:
parse_jackson(objects)
groups = DataFrame(objects)
groups["permId"] = groups["permId"].map(extract_permid)
groups["registrator"] = groups["registrator"].map(extract_person)
groups["users"] = groups["users"].map(extract_userId)
groups["registrationDate"] = groups["registrationDate"].map(
format_timestamp
)
groups["modificationDate"] = groups["modificationDate"].map(
format_timestamp
)
def get_persons(self, start_with=None, count=None, **search_args):
search_criteria = get_search_criteria("person", **search_args)
fetchopts = fetch_option["person"]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["space"]:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchPersons",
"params": [self.token, search_criteria, fetchopts],
}
resp = self._post_request(self.as_v3, request)
attrs = [
"permId",
"userId",
"firstName",
"lastName",
"email",
"space",
"registrationDate",
"active",
]
if len(resp["objects"]) == 0:
persons = DataFrame(columns=attrs)
else:
parse_jackson(objects)
persons = DataFrame(resp["objects"])
persons["permId"] = persons["permId"].map(extract_permid)
persons["registrationDate"] = persons["registrationDate"].map(
format_timestamp
)
persons["space"] = persons["space"].map(extract_nested_permid)
return Things(
def get_person(self, userId, only_data=False):
ids = [{"@type": "as.dto.person.id.PersonPermId", "permId": userId}]
fetchopts = {"@type": "as.dto.person.fetchoptions.PersonFetchOptions"}
for option in ["roleAssignments", "space"]:
fetchopts[option] = fetch_option[option]
request = {
"method": "getPersons",
"params": [
self.token,
ids,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No person found!")
for permid in resp:
person = resp[permid]
parse_jackson(person)
if only_data:
return person
else:
return Person(self, data=person)
def get_spaces(self, code=None, start_with=None, count=None, use_cache=True):
"""Get a list of all available spaces (DataFrame object). To create a sample or a
dataset, you need to specify in which space it should live.
"""
method = get_method_for_entity("space", "search")
search_criteria = _subcriteria_for_code(code, "space")
fetchopts = fetch_option["space"]
fetchopts["from"] = start_with
fetchopts["count"] = count
"method": method,
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ["code", "description", "registrationDate", "modificationDate"]
if len(resp["objects"]) == 0:
spaces = DataFrame(columns=attrs)
else:
spaces = DataFrame(resp["objects"])
spaces["registrationDate"] = spaces["registrationDate"].map(
format_timestamp
)
spaces["modificationDate"] = spaces["modificationDate"].map(
format_timestamp
)
df=spaces[attrs],
start_with=start_with,
count=count,
def get_space(self, code, only_data=False, use_cache=True):
"""Returns a Space object for a given identifier."""
code = str(code).upper()
space = (
not only_data
and use_cache
and self._object_cache(entity="space", code=code)
)
if space:
return space
fetchopts = {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"}
fetchopts[option] = fetch_option[option]
"params": [