Newer
Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-
Adam Laskowski
committed
# Copyright ETH 2018 - 2023 Zürich, Scientific IT Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
pybis.py
"""
Swen Vermeul
committed
import json
import os
import re
import subprocess
import time
import zlib
Swen Vermeul
committed
from urllib.parse import quote, urljoin, urlparse
import requests
import urllib3
from pandas import DataFrame
Swen Vermeul
committed
from . import data_set as pbds
Swen Vermeul
committed
get_definition_for_entity,
Swen Vermeul
committed
get_method_for_entity,
get_type_for_entity,
openbis_definitions,
Swen Vermeul
committed
EntityType,
Swen Vermeul
committed
MaterialType,
SampleType,
Swen Vermeul
committed
from .group import Group
from .openbis_object import OpenBisObject, Transaction
Swen Vermeul
committed
from .person import Person
from .project import Project
from .role_assignment import RoleAssignment
from .sample import Sample
from .semantic_annotation import SemanticAnnotation
from .space import Space
from .tag import Tag
from .things import Things
Swen Vermeul
committed
VERBOSE,
extract_attr,
extract_code,
extract_deletion,
extract_id,
extract_identifier,
extract_identifiers,
extract_nested_identifier,
extract_nested_permid,
extract_nested_permids,
extract_permid,
extract_person,
extract_userId,
extract_username_from_token,
Swen Vermeul
committed
is_number,
Swen Vermeul
committed
parse_jackson,
split_identifier,
Swen Vermeul
committed
from .vocabulary import Vocabulary, VocabularyTerm
Swen Vermeul
committed
# import the various openBIS entities
LOG_WARNING = 3
LOG_INFO = 4
LOG_ENTRY = 5
LOG_PARM = 6
LOG_DEBUG = 7
PYBIS_FOLDER = Path.home() / ".pybis"
CONFIG_FILENAME = ".pybis.json"
DEBUG_LEVEL = LOG_NONE
vkovtun
committed
def now():
vkovtun
committed
return time.time()
def get_search_type_for_entity(entity, operator=None):
"""Returns a dictionary containing the correct search criteria type
get_search_type_for_entity('space')
# returns:
{'@type': 'as.dto.space.search.SpaceSearchCriteria'}
"""
search_criteria = {
"personalAccessToken": "as.dto.pat.search.PersonalAccessTokenSearchCriteria",
"space": "as.dto.space.search.SpaceSearchCriteria",
"userId": "as.dto.person.search.UserIdSearchCriteria",
"email": "as.dto.person.search.EmailSearchCriteria",
"firstName": "as.dto.person.search.FirstNameSearchCriteria",
"lastName": "as.dto.person.search.LastNameSearchCriteria",
"project": "as.dto.project.search.ProjectSearchCriteria",
"experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
"experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
"sample": "as.dto.sample.search.SampleSearchCriteria",
"sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
"dataset": "as.dto.dataset.search.DataSetSearchCriteria",
"dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
"external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
"material": "as.dto.material.search.MaterialSearchCriteria",
"material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
"vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
"tag": "as.dto.tag.search.TagSearchCriteria",
"authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
"person": "as.dto.person.search.PersonSearchCriteria",
"code": "as.dto.common.search.CodeSearchCriteria",
"global": "as.dto.global.GlobalSearchObject",
"propertyType": "as.dto.property.search.PropertyTypeSearchCriteria",
if operator is not None:
sc["operator"] = operator
return sc
def is_session_token(token: str):
return not token.startswith("$pat")
def is_personal_access_token(token: str):
return token.startswith("$pat")
def get_saved_tokens():
tokens = {}
for filepath in PYBIS_FOLDER.glob("*.token"):
with open(filepath) as fh:
if filepath.is_file:
token = fh.read()
tokens[filepath.stem] = token
def get_token_for_hostname(hostname, session_token_needed=True):
"""Searches for a stored token for a given host in this order:
~/.pybis/hostname.token
"""
tokens = get_saved_tokens()
if hostname in tokens:
if session_token_needed:
if is_session_token(tokens[hostname]):
return tokens[hostname]
else:
return tokens[hostname]
return
def save_pats_to_disk(hostname: str, url: str, resp: dict) -> None:
pats = resp["objects"]
parse_jackson(pats)
path = PYBIS_FOLDER / hostname
path.mkdir(exist_ok=True)
for existing_file in path.glob("*.pat"):
existing_file.unlink()
for token in pats:
data = {
Loading
Loading full blame...