Newer
Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
pybis.py
"""
Swen Vermeul
committed
from __future__ import print_function
from .utils import (
extract_attr,
extract_permid,
extract_code,
extract_deletion,
extract_identifier,
extract_nested_identifier,
extract_nested_permid,
extract_nested_permids,
extract_identifiers,
extract_property_assignments,
extract_role_assignments,
extract_person,
extract_person_details,
extract_id,
extract_userId,
is_number,
)
from datetime import datetime
import pandas as pd
from pandas import DataFrame, Series
from .semantic_annotation import SemanticAnnotation
from .tag import Tag
from .role_assignment import RoleAssignment
from .group import Group
from .person import Person
from .dataset import DataSet
from .sample import Sample
from .experiment import Experiment
from .project import Project
from .space import Space
from .things import Things
from .definitions import (
openbis_definitions,
get_definition_for_entity,
fetch_option,
get_fetchoption_for_entity,
get_type_for_entity,
get_method_for_entity,
get_fetchoptions,
)
from .openbis_object import OpenBisObject, Transaction
from .vocabulary import Vocabulary, VocabularyTerm
from .entity_type import (
EntityType,
SampleType,
DataSetType,
MaterialType,
ExperimentType,
)
from .utils import (
parse_jackson,
check_datatype,
split_identifier,
format_timestamp,
is_identifier,
is_permid,
nvl,
VERBOSE,
)
from . import data_set as pbds
from tabulate import tabulate
from texttable import Texttable
from collections import namedtuple, defaultdict
import zlib
from urllib.parse import urlparse, urljoin, quote
import re
import json
import time
import os
Chandrasekhar Ramakrishnan
committed
import random
import subprocess
import errno
Chandrasekhar Ramakrishnan
committed
vkovtun
committed
import logging
import sys
Swen Vermeul
committed
# import the various openBIS entities
LOG_WARNING = 3
LOG_INFO = 4
LOG_ENTRY = 5
LOG_PARM = 6
LOG_DEBUG = 7
DEBUG_LEVEL = LOG_NONE
vkovtun
committed
def now():
vkovtun
committed
return time.time()
def get_search_type_for_entity(entity, operator=None):
"""Returns a dictionary containing the correct search criteria type
get_search_type_for_entity('space')
# returns:
{'@type': 'as.dto.space.search.SpaceSearchCriteria'}
"""
search_criteria = {
"space": "as.dto.space.search.SpaceSearchCriteria",
"userId": "as.dto.person.search.UserIdSearchCriteria",
"email": "as.dto.person.search.EmailSearchCriteria",
"firstName": "as.dto.person.search.FirstNameSearchCriteria",
"lastName": "as.dto.person.search.LastNameSearchCriteria",
"project": "as.dto.project.search.ProjectSearchCriteria",
"experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
"experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
"sample": "as.dto.sample.search.SampleSearchCriteria",
"sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
"dataset": "as.dto.dataset.search.DataSetSearchCriteria",
"dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
"external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
"material": "as.dto.material.search.MaterialSearchCriteria",
"material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
"vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
"tag": "as.dto.tag.search.TagSearchCriteria",
"authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
"person": "as.dto.person.search.PersonSearchCriteria",
"code": "as.dto.common.search.CodeSearchCriteria",
"global": "as.dto.global.GlobalSearchObject",
"propertyType": "as.dto.property.search.PropertyTypeSearchCriteria",
if operator is not None:
sc["operator"] = operator
return sc
Swen Vermeul
committed
def _type_for_id(ident, entity):
Swen Vermeul
committed
"""Returns the data type for a given identifier/permId for use with the API call, e.g.
{
"identifier": "/DEFAULT/SAMPLE_NAME",
"@type": "as.dto.sample.id.SampleIdentifier"
}
or
{
"permId": "20160817175233002-331",
"@type": "as.dto.sample.id.SamplePermId"
}
"""
if entity.lower() == "tag":
if "/" in ident:
if not ident.startswith("/"):
ident = "/" + ident
return {"permId": ident, "@type": "as.dto.tag.id.TagPermId"}
return {"code": ident, "@type": "as.dto.tag.id.TagCode"}
Swen Vermeul
committed
entities = {
"sample": "Sample",
"dataset": "DataSet",
"experiment": "Experiment",
"plugin": "Plugin",
"space": "Space",
"project": "Project",
Swen Vermeul
committed
"semanticannotation": "SemanticAnnotation",
}
search_request = {}
Swen Vermeul
committed
if entity.lower() in entities:
entity_capitalize = entities[entity.lower()]
else:
entity_capitalize = entity.capitalize()
Swen Vermeul
committed
if is_identifier(ident):
# people tend to omit the / prefix of an identifier...
if not ident.startswith("/"):
ident = "/" + ident
Swen Vermeul
committed
# ELN-LIMS style contains also experiment in sample identifer, i.e. /space/project/experiment/sample_code
# we have to remove the experiment-code
if ident.count("/") == 4:
codes = ident.split("/")
ident = "/".join([codes[0], codes[1], codes[2], codes[4]])
Swen Vermeul
committed
search_request = {
"identifier": ident.upper(),
"@type": "as.dto.{}.id.{}Identifier".format(
entity.lower(), entity_capitalize
),
}
else:
search_request = {
"permId": ident,
"@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity_capitalize),
}
return search_request
def get_search_criteria(entity, **search_args):
search_criteria = get_search_type_for_entity(entity)
criteria = []
for attr in attrs:
if attr in search_args:
sub_crit = get_search_type_for_entity(attr)
sub_crit["fieldValue"] = get_field_value_search(attr, search_args[attr])
criteria.append(sub_crit)
search_criteria["criteria"] = criteria
search_criteria["operator"] = "AND"
return search_criteria
"""since Python3 the zlib module returns unsigned integers (2.7: signed int)"""
for eachLine in open(fileName, "rb"):
prev = zlib.crc32(eachLine, prev)
# return as hex
return "%x" % (prev & 0xFFFFFFFF)
def _tagIds_for_tags(tags=None, action="Add"):
"""creates an action item to add or remove tags.
Swen Vermeul
committed
Action is either 'Add', 'Remove' or 'Set'
"""
if tags is None:
return
if not isinstance(tags, list):
tags = [tags]
items = list(map(lambda tag: {"code": tag, "@type": "as.dto.tag.id.TagCode"}, tags))
tagIds = {
"actions": [
{
"items": items,
"@type": "as.dto.common.update.ListUpdateAction{}".format(
action.capitalize()
),
}
],
"@type": "as.dto.common.update.IdListUpdateValue",
}
def _list_update(ids=None, entity=None, action="Add"):
"""creates an action item to add, set or remove ids."""
if ids is None:
return
if not isinstance(ids, list):
ids = [ids]
items = list(
map(
lambda id: {
"code": id,
"@type": "as.dto.{}.id.{}Code".format(entity.lower(), entity),
},
ids,
)
)
list_update = {
"actions": [
{
"items": items,
"@type": "as.dto.common.update.ListUpdateAction{}".format(
action.capitalize()
),
}
],
"@type": "as.dto.common.update.IdListUpdateValue",
}
return list_update
def get_field_value_search(field, value, comparison="StringEqualToValue"):
return {"value": value, "@type": "as.dto.common.search.{}".format(comparison)}
def _common_search(search_type, value, comparison="StringEqualToValue"):
sreq = {
"@type": search_type,
"fieldValue": {
"value": value,
"@type": "as.dto.common.search.{}".format(comparison),
},
return sreq
def _criteria_for_code(code):
return {
"fieldValue": {
"value": code.upper(),
"@type": "as.dto.common.search.StringEqualToValue",
"@type": "as.dto.common.search.CodeSearchCriteria",
def _criteria_for_permId(permId):
return {
"fieldName": "perm_id",
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": permId,
"@type": "as.dto.common.search.StringEqualToValue",
"@type": "as.dto.common.search.PermIdSearchCriteria",
def _subcriteria_for_userId(userId):
return {
"fieldName": "userId",
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": userId,
"@type": "as.dto.common.search.StringEqualToValue",
"@type": "as.dto.person.search.UserIdSearchCriteria",
],
"@type": "as.dto.person.search.PersonSearchCriteria",
def _subcriteria_for_type(code, entity):
return {
"@type": "as.dto.{}.search.{}TypeSearchCriteria".format(entity.lower(), entity),
"criteria": [
{
"@type": "as.dto.common.search.CodeSearchCriteria",
"fieldValue": {
"value": code.upper(),
"@type": "as.dto.common.search.StringEqualToValue",
},
}
}
def _subcriteria_for_status(status_value):
status_value = status_value.upper()
valid_status = "AVAILABLE LOCKED ARCHIVED UNARCHIVE_PENDING ARCHIVE_PENDING BACKUP_PENDING".split()
if not status_value in valid_status:
"status must be one of the following: " + ", ".join(valid_status)
)
return {
"@type": "as.dto.dataset.search.PhysicalDataSearchCriteria",
"operator": "AND",
"criteria": [
{
"@type": "as.dto.dataset.search.StatusSearchCriteria",
"fieldName": "status",
"fieldType": "ATTRIBUTE",
"fieldValue": status_value,
}
],
sreq = {}
for key, val in req.items():
if key == "criteria":
sreq["criteria"] = list(
map(lambda item: _gen_search_criteria(item), req["criteria"])
)
elif key == "code":
sreq["criteria"] = [
_common_search("as.dto.common.search.CodeSearchCriteria", val.upper())
]
elif key == "identifier":
if is_identifier(val):
# if we have an identifier, we need to search in Space and Code separately
si = split_identifier(val)
sreq["criteria"] = []
if "space" in si:
sreq["criteria"].append(
_gen_search_criteria({"space": "Space", "code": si["space"]})
)
if "experiment" in si:
pass
if "code" in si:
sreq["criteria"].append(
_common_search(
"as.dto.common.search.CodeSearchCriteria",
si["code"].upper(),
sreq["criteria"] = [
_common_search("as.dto.common.search.PermIdSearchCriteria", val)
]
else:
# we assume we just got a code
sreq["criteria"] = [
_common_search(
"as.dto.common.search.CodeSearchCriteria", val.upper()
)
]
elif key == "operator":
sreq["operator"] = val.upper()
else:
sreq["@type"] = "as.dto.{}.search.{}SearchCriteria".format(key, val)
return sreq
def _subcriteria_for_tags(tags):
if not isinstance(tags, list):
tags = [tags]
criteria = list(
map(
lambda tag: {
"fieldName": "code",
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": tag,
"@type": "as.dto.common.search.StringEqualToValue",
},
"@type": "as.dto.common.search.CodeSearchCriteria",
},
tags,
)
)
return {
"@type": "as.dto.tag.search.TagSearchCriteria",
"operator": "AND",
}
def _subcriteria_for_is_finished(is_finished):
return {
"@type": "as.dto.common.search.StringPropertySearchCriteria",
"fieldName": "FINISHED_FLAG",
"fieldType": "PROPERTY",
"fieldValue": {
"value": is_finished,
"@type": "as.dto.common.search.StringEqualToValue",
},
}
Swen Vermeul
committed
def _subcriteria_for_properties(prop, value, entity):
"""This internal method creates the JSON RPC criterias for searching
in properties. It distinguishes between numbers, dates and strings
and uses the comparative operator (< > >= <=), if available.
creationDate and modificationDate attributes can be searched as well.
To search in the properties of parents, children, etc. the user has to
prefix the propery accordingly:
- parent_propertyName
- child_propertyName
- container_propertyName
"""
if "*" in str(value):
additional_attr["useWildcards"] = True
Swen Vermeul
committed
search_types = {
"sample": {
"parent": "as.dto.sample.search.SampleParentsSearchCriteria",
"parents": "as.dto.sample.search.SampleParentsSearchCriteria",
"child": "as.dto.sample.search.SampleChildrenSearchCriteria",
"children": "as.dto.sample.search.SampleChildrenSearchCriteria",
"container": "as.dto.sample.search.SampleContainerSearchCriteria",
},
"dataset": {
"parent": "as.dto.dataset.search.DataSetParentsSearchCriteria",
"parents": "as.dto.dataset.search.DataSetParentsSearchCriteria",
"child": "as.dto.dataset.search.DataSetChildrenSearchCriteria",
"children": "as.dto.dataset.search.DataSetChildrenSearchCriteria",
"container": "as.dto.dataset.search.DataSetContainerSearchCriteria",
Swen Vermeul
committed
},
}
# default values of fieldType, str_type and eq_type
fieldType = "PROPERTY"
eq_type = "as.dto.common.search.StringEqualToValue"
str_type = "as.dto.common.search.StringPropertySearchCriteria"
if "date" in prop.lower() and re.search(r"\d{4}\-\d{2}\-\d{2}", value):
is_date = True
eq_type = "as.dto.common.search.DateEqualToValue"
str_type = "as.dto.common.search.RegistrationDateSearchCriteria"
fieldType = "ATTRIBUTE"
str_type = "as.dto.common.search.ModificationDateSearchCriteria"
fieldType = "ATTRIBUTE"
else:
str_type = "as.dto.common.search.DatePropertySearchCriteria"
if any(str(value).startswith(operator) for operator in [">", "<", "="]):
match = re.search(
r"""
^
(?P<comp_operator>\>\=|\>|\<\=|\<|\=\=|\=) # extract the comparative operator
\s*
(?P<value>.*) # extract the value
""",
comp_operator = match.groupdict()["comp_operator"]
value = match.groupdict()["value"]
eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
else:
eq_type = "as.dto.common.search.DateEqualToValue"
# numeric comparison
elif is_number(value):
str_type = "as.dto.common.search.NumberPropertySearchCriteria"
eq_type = "as.dto.common.search.NumberGreaterThanValue"
eq_type = "as.dto.common.search.NumberGreaterThanOrEqualToValue"
eq_type = "as.dto.common.search.NumberLessThanValue"
eq_type = "as.dto.common.search.NumberLessThanOrEqualToValue"
else:
eq_type = "as.dto.common.search.NumberEqualToValue"
# string comparison
else:
eq_type = "as.dto.common.search.StringGreaterThanValue"
eq_type = "as.dto.common.search.StringGreaterThanOrEqualToValue"
eq_type = "as.dto.common.search.StringLessThanValue"
eq_type = "as.dto.common.search.StringLessThanOrEqualToValue"
eq_type = "as.dto.common.search.StringEqualToValue"
else:
eq_type = "as.dto.common.search.StringEqualToValue"
Swen Vermeul
committed
# searching for parent/child/container identifier
if any(
relation == prop.lower()
for relation in [
"parent",
"child",
"container",
"parents",
"children",
"containers",
]
):
Swen Vermeul
committed
if is_identifier(value):
identifier_search_type = "as.dto.common.search.IdentifierSearchCriteria"
# find any parent, child, container
Swen Vermeul
committed
return {
"@type": search_types[entity][relation],
"criteria": [
{
"@type": "as.dto.common.search.AnyFieldSearchCriteria",
"fieldValue": {
"@type": "as.dto.common.search.AnyStringValue",
Swen Vermeul
committed
}
Swen Vermeul
committed
}
elif is_permid(value):
identifier_search_type = "as.dto.common.search.PermIdSearchCriteria"
Swen Vermeul
committed
identifier_search_type = "as.dto.common.search.CodeSearchCriteria"
return {
"@type": search_types[entity][relation],
"criteria": [
{
"@type": identifier_search_type,
"fieldType": "ATTRIBUTE",
"fieldValue": {
"@type": "as.dto.common.search.StringEqualToValue",
"value": value,
Swen Vermeul
committed
}
Swen Vermeul
committed
}
elif any(
prop.lower().startswith(relation)
for relation in ["parent_", "child_", "container_"]
):
match = re.search(r"^(\w+?)_(.*)", prop.lower())
Swen Vermeul
committed
if match:
relation, property_name = match.groups()
return {
"@type": search_types[entity][relation],
"criteria": [
{
"@type": str_type,
"fieldName": property_name.upper(),
Swen Vermeul
committed
"fieldValue": {
"@type": eq_type,
"value": value,
Swen Vermeul
committed
}
Swen Vermeul
committed
}
Swen Vermeul
committed
# searching for properties
if prop.startswith("_"):
fieldName = "$" + prop[1:]
else:
fieldName = prop
"fieldName": fieldName.upper(),
"fieldValue": {"value": value, "@type": eq_type},
**additional_attr,
def _subcriteria_for(thing, entity, parents_or_children="", operator="AND"):
"""Returns the sub-search criteria for «thing», which can be either:
- a python object (sample, dataSet, experiment)
- a permId
- an identifier
- a code
"""
subcrit = _subcriteria_for(thing, new_entity)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": subcrit, **search_type, "operator": operator}
if isinstance(thing, str):
if is_permid(thing):
return _subcriteria_for_permid(
entity=entity,
parents_or_children=parents_or_children,
)
elif is_identifier(thing):
return _subcriteria_for_identifier(
entity=entity,
parents_or_children=parents_or_children,
)
else:
# look for code
return _subcriteria_for_code_new(
thing,
entity=entity,
parents_or_children=parents_or_children,
)
elif isinstance(thing, list):
criteria = []
for element in thing:
crit = _subcriteria_for(element, entity, parents_or_children, operator)
criteria += crit["criteria"]
return {"criteria": criteria, "@type": crit["@type"], "operator": "OR"}
elif thing is None:
# we just need the type
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": [], **search_type, "operator": operator}
else:
# we passed an object
return _subcriteria_for_permid(
entity=entity,
parents_or_children=parents_or_children,
def _subcriteria_for_identifier(ids, entity, parents_or_children="", operator="AND"):
if not isinstance(ids, list):
ids = [ids]
criteria = list(
map(
lambda id: {
"@type": "as.dto.common.search.IdentifierSearchCriteria",
"fieldValue": {
"value": id,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "identifier",
},
ids,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_permid(permids, entity, parents_or_children="", operator="AND"):
if not isinstance(permids, list):
permids = [permids]
criteria = list(
map(
lambda permid: {
"@type": "as.dto.common.search.PermIdSearchCriteria",
"fieldValue": {
"value": permid,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "perm_id",
},
permids,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_permid_new(codes, entity, parents_or_children="", operator="AND"):
if not isinstance(codes, list):
codes = [codes]
criteria = list(
map(
lambda code: {
"@type": "as.dto.common.search.PermIdSearchCriteria",
"fieldValue": {
"value": code,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "perm_id",
},
codes,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_code_new(codes, entity, parents_or_children="", operator="AND"):
if not isinstance(codes, list):
codes = [codes]
criteria = list(
map(
lambda code: {
"@type": "as.dto.common.search.CodeSearchCriteria",
"fieldValue": {
"value": code,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "code",
},
codes,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_code(code, entity):
"""Creates the often used search criteria for code values. Returns a dictionary.
Example::
_subcriteria_for_code("username", "space")
{
"criteria": [
{
"fieldType": "ATTRIBUTE",
"@type": "as.dto.common.search.CodeSearchCriteria",
"fieldName": "code",
"fieldValue": {
"@type": "as.dto.common.search.StringEqualToValue",
"value": "USERNAME"
}
}
],
"operator": "AND",
"@type": "as.dto.space.search.SpaceSearchCriteria"
}
if code is not None:
if is_permid(code):
fieldname = "permId"
fieldtype = "as.dto.common.search.PermIdSearchCriteria"
else:
fieldname = "code"
fieldtype = "as.dto.common.search.CodeSearchCriteria"
# search_criteria = get_search_type_for_entity(entity.lower())
search_criteria = get_type_for_entity(entity, "search")
search_criteria["criteria"] = [
{
"fieldName": fieldname,
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": code.upper(),
"@type": "as.dto.common.search.StringEqualToValue",
},
"@type": fieldtype,
}
]
search_criteria["operator"] = "AND"
return search_criteria
class Openbis:
Swen Vermeul
committed
Note:
* A recent version of openBIS is required (minimum 16.05.2).
* For creation of datasets, the dataset-uploader-api ingestion plugin must be present.
def __init__(
self,
url=None,
verify_certificates=True,
token=None,
use_cache=True,
allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks=False,
token_path=None,
):
"""Initialize a new connection to an openBIS server.
Swen Vermeul
committed
Examples:
o = Openbis('https://openbis.example.com')
o_test = Openbis('https://test_openbis.example.com:8443', verify_certificates=False)
Args:
url (str): https://openbis.example.com
verify_certificates (bool): set to False when you use self-signed certificates
token (str): a valid openBIS token. If not set, pybis will try to read a valid token from ~/.pybis
token_path: a path to a file which contains an openBIS token
use_cache: make openBIS to store spaces, projects, sample types, vocabulary terms and oder more-or-less static objects to optimise speed
Swen Vermeul
committed
allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks (bool): False
"""
self.as_v3 = "/openbis/openbis/rmi-application-server-v3.json"
self.as_v1 = "/openbis/openbis/rmi-general-information-v1.json"
self.reg_v1 = "/openbis/openbis/rmi-query-v1.json"
self.verify_certificates = verify_certificates
if not verify_certificates:
urllib3.disable_warnings()
if url is None:
url = os.environ.get("OPENBIS_URL") or os.environ.get("OPENBIS_HOST")
raise ValueError("please provide a URL you want to connect to.")
if not url.startswith("http"):
url = "https://" + url
Swen Vermeul
committed
if url_obj.netloc is None or url_obj.netloc == "":
"please provide the url in this format: https://openbis.host.ch:8443"
)
Swen Vermeul
committed
if url_obj.hostname is None:
raise ValueError("hostname is missing")
if (
url_obj.scheme == "http"
and not allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks
):
Swen Vermeul
committed
raise ValueError("always use https!")
self.url = url_obj.geturl()
self.port = url_obj.port
self.download_prefix = os.path.join("data", self.hostname)
self.use_cache = use_cache
self.cache = {}
Swen Vermeul
committed
self.token_path = token_path or self.gen_token_path()
self.token = token or os.environ.get("OPENBIS_TOKEN") or self._get_saved_token()
if self.is_token_valid(self.token):
if token is not None:
self._save_token_to_disk(token=token)
else:
token_path = self._delete_saved_token()
if token_path and VERBOSE:
print("Session is no longer valid. Please log in again.")
def _get_username(self):
if self.token:
match = re.search(r"(?P<username>.*)-.*", self.token)
username = match.groupdict()["username"]
return username
def __dir__(self):
return [
"url",
"port",
"hostname",
"token",
"login()",
"logout()",
"is_session_active()",
"is_token_valid()",
"mount()",
"unmount()",
"use_cache",
"clear_cache()",
"get_server_information()",
"get_datasets()",
"get_dataset_type()",
"get_dataset_types()",
"get_datastores()",
"get_deletions()",
"get_experiments()",
"get_experiment_type()",
"get_experiment_types()",
"get_collections()",
"get_collection_type()",
"get_collection_types()",
"get_external_data_management_systems()",
"get_external_data_management_system()",
"get_material_type()",
"get_material_types()",
"get_project()",
"get_projects()",
"get_sample()",
"get_object()",