diff --git a/pybis/src/python/pybis/pybis.py b/pybis/src/python/pybis/pybis.py index f913fcf186ad484296b1c059b76d1e83bf7bbf1c..82aad44abd64a4396cee42c2e7472e091bb2aca0 100644 --- a/pybis/src/python/pybis/pybis.py +++ b/pybis/src/python/pybis/pybis.py @@ -59,7 +59,6 @@ from .tag import Tag from .things import Things from .utils import ( VERBOSE, - check_datatype, extract_attr, extract_code, extract_deletion, @@ -71,15 +70,12 @@ from .utils import ( extract_nested_permids, extract_permid, extract_person, - extract_person_details, - extract_property_assignments, - extract_role_assignments, extract_userId, + extract_username_from_token, format_timestamp, is_identifier, is_number, is_permid, - nvl, parse_jackson, split_identifier, ) @@ -232,7 +228,7 @@ def get_token_for_hostname(hostname, session_token_needed=True): return -def save_pats_to_disk(hostname: str, resp: dict) -> None: +def save_pats_to_disk(hostname: str, url: str, resp: dict) -> None: pats = resp["objects"] parse_jackson(pats) path = PYBIS_FOLDER / hostname @@ -242,6 +238,7 @@ def save_pats_to_disk(hostname: str, resp: dict) -> None: for token in pats: data = { + "url": url, "hostname": hostname, "owner": token["owner"]["userId"], "registrationDate": format_timestamp(token["owner"]["registrationDate"]), @@ -1079,21 +1076,22 @@ class Openbis: else: while True: try: - self.token = self._get_saved_token() + self.token = config_local.get("token") break except ValueError: - self._delete_saved_token() pass try: - self.token = config_local.get("token") + self.token = config_global.get("token") break except ValueError: pass try: - self.token = config_global.get("token") + self.token = self._get_saved_token() break except ValueError: + self._delete_saved_token() pass + break def _get_username(self): if self.token: @@ -1397,6 +1395,7 @@ class Openbis: if save_token: self._save_token_to_disk() self._password(password) + self.username = username return self.token def _password(self, password=None, pstore={}): @@ -1641,17 +1640,6 @@ class Openbis: } resp = self._post_request(self.as_v3, request) if resp is not None: - # result is a dict of strings - use more useful types - keys_boolean = ["archiving-configured", "project-samples-enabled"] - keys_csv = ["enabled-technologies"] - for key in keys_boolean: - if key in resp: - resp[key] = resp[key] == "true" - for key in keys_csv: - if key in resp: - resp[key] = list( - map(lambda item: item.strip(), resp[key].split(",")) - ) self.server_information = ServerInformation(resp) return self.server_information else: @@ -2045,15 +2033,22 @@ class Openbis: ) -> str: """Creates a new personal access token (PAT)""" - if validFrom is None: - validFrom = datetime.now() - if validTo is None: - validTo = datetime.now() + relativedelta(years=1) + session_token = self.token + if not is_session_token(session_token): + session_token = self.session_token + if not session_token: + session_token = get_token_for_hostname( + self.hostname, session_token_needed=True + ) - if is_personal_access_token(self.token): + if not self.is_token_valid(session_token): raise ValueError( "You you need a session token to create a new personal access token." ) + if validFrom is None: + validFrom = datetime.now() + if validTo is None: + validTo = datetime.now() + relativedelta(years=1) entity = "personalAccessToken" request = { @@ -2182,7 +2177,7 @@ class Openbis: return pats[attrs] if save_to_disk: - save_pats_to_disk(hostname=self.hostname, resp=resp) + save_pats_to_disk(hostname=self.hostname, url=self.url, resp=resp) return Things( openbis_obj=self, @@ -4441,6 +4436,7 @@ class Openbis: # just in case we need it later self.session_token = self.token self.__dict__["token"] = token + self.username = extract_username_from_token(token) if save_token: self._save_token_to_disk() @@ -5385,15 +5381,44 @@ class ExternalDMS: class ServerInformation: def __init__(self, info): - self._info = info + self._info = self._reformat_info(info) self.attrs = [ "api_version", "archiving_configured", "authentication_service", + "authentication-service.switch-aai.label", + "authentication-service.switch-aai.link", + "create-continuous-sample-codes", "enabled_technologies", - "project_samples_enabled", + "openbis-version", + "openbis.support.email", + "personal-access-tokens-enabled", + "personal-access-tokens-max-validity-period", + "personal-access-tokens-validity-warning-period", + "project-samples-enabled", ] + def _reformat_info(self, info): + for bool_field in [ + "archiving-configured", + "project-samples-enabled", + "personal-access-tokens-enabled", + ]: + if bool_field in info: + info[bool_field] = info[bool_field] == "true" + for csv_field in ["enabled-technologies"]: + if csv_field in info: + info[csv_field] = list( + map(lambda item: item.strip(), info[csv_field].split(",")) + ) + for int_field in [ + "personal-access-tokens-max-validity-period", + "personal-access-tokens-validity-warning-period", + ]: + if int_field in info: + info[int_field] = int(info[int_field]) + return info + def __dir__(self): return self.attrs @@ -5449,7 +5474,22 @@ class PersonalAccessToken( entity="personalAccessToken", single_item_method_name="get_personal_access_token", ): - pass + def renew(self, validFrom: datetime = None, validTo: datetime = None): + if not validFrom: + validFrom = datetime.now() + + if not validTo: + validFrom_orig = datetime.strptime(self.validFromDate, "%Y-%m-%d %H:%M:%S") + validTo_orig = datetime.strptime(self.validToDate, "%Y-%m-%d %H:%M:%S") + days_delta = abs(validFrom_orig - validTo_orig).days + validTo = validFrom + relativedelta(days=days_delta) + + new_pat = self.openbis.new_personal_access_token( + sessionName=self.sessionName, validFrom=validFrom, validTo=validTo + ) + self.openbis.set_token(new_pat) + if VERBOSE: + print(self.openbis.token) class SessionInformation(