Skip to content
Snippets Groups Projects
pybis.py 186 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    #   Copyright ETH 2018 - 2023 Zürich, Scientific IT Services
    # 
    #   Licensed under the Apache License, Version 2.0 (the "License");
    #   you may not use this file except in compliance with the License.
    #   You may obtain a copy of the License at
    # 
    #        http://www.apache.org/licenses/LICENSE-2.0
    #   
    #   Unless required by applicable law or agreed to in writing, software
    #   distributed under the License is distributed on an "AS IS" BASIS,
    #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #   See the License for the specific language governing permissions and
    #   limitations under the License.
    #
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    Work with openBIS using Python.
    
    import json
    import os
    import re
    import subprocess
    import time
    import zlib
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from datetime import datetime
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from pathlib import Path
    from typing import List
    
    from urllib.parse import quote, urljoin, urlparse
    
    import requests
    import urllib3
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from dateutil.relativedelta import relativedelta
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from .dataset import DataSet
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from .definitions import (
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        get_fetchoption_for_entity,
        get_fetchoptions,
    
        get_method_for_entity,
        get_type_for_entity,
        openbis_definitions,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    )
    from .entity_type import (
        DataSetType,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        ExperimentType,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    )
    
    from .group import Group
    from .openbis_object import OpenBisObject, Transaction
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from .experiment import Experiment
    
    from .person import Person
    from .project import Project
    from .role_assignment import RoleAssignment
    from .sample import Sample
    from .semantic_annotation import SemanticAnnotation
    from .space import Space
    from .tag import Tag
    from .things import Things
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    from .utils import (
    
        VERBOSE,
        extract_attr,
        extract_code,
        extract_deletion,
        extract_id,
        extract_identifier,
        extract_identifiers,
        extract_nested_identifier,
        extract_nested_permid,
        extract_nested_permids,
        extract_permid,
        extract_person,
        extract_userId,
    
        extract_username_from_token,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        format_timestamp,
        is_identifier,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        is_permid,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    )
    
    from .vocabulary import Vocabulary, VocabularyTerm
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    LOG_NONE = 0
    LOG_SEVERE = 1
    LOG_ERROR = 2
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    LOG_INFO = 4
    LOG_ENTRY = 5
    LOG_PARM = 6
    LOG_DEBUG = 7
    
    PYBIS_FOLDER = Path.home() / ".pybis"
    
    CONFIG_FILENAME = ".pybis.json"
    
    def get_search_type_for_entity(entity, operator=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        """Returns a dictionary containing the correct search criteria type
    
            get_search_type_for_entity('space')
            # returns:
    
            {'@type': 'as.dto.space.search.SpaceSearchCriteria'}
        """
        search_criteria = {
    
            "personalAccessToken": "as.dto.pat.search.PersonalAccessTokenSearchCriteria",
    
            "space": "as.dto.space.search.SpaceSearchCriteria",
            "userId": "as.dto.person.search.UserIdSearchCriteria",
            "email": "as.dto.person.search.EmailSearchCriteria",
            "firstName": "as.dto.person.search.FirstNameSearchCriteria",
            "lastName": "as.dto.person.search.LastNameSearchCriteria",
            "project": "as.dto.project.search.ProjectSearchCriteria",
            "experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
            "experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
            "sample": "as.dto.sample.search.SampleSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "dataset": "as.dto.dataset.search.DataSetSearchCriteria",
            "dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
            "external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
            "material": "as.dto.material.search.MaterialSearchCriteria",
            "material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
            "vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
            "tag": "as.dto.tag.search.TagSearchCriteria",
    
            "authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
    
            "person": "as.dto.person.search.PersonSearchCriteria",
            "code": "as.dto.common.search.CodeSearchCriteria",
            "global": "as.dto.global.GlobalSearchObject",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "plugin": "as.dto.plugin.search.PluginSearchCriteria",
    
            "propertyType": "as.dto.property.search.PropertyTypeSearchCriteria",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        sc = {"@type": search_criteria[entity]}
    
        if operator is not None:
            sc["operator"] = operator
    
        return sc
    
    def is_session_token(token: str):
        return not token.startswith("$pat")
    
    
    def is_personal_access_token(token: str):
        return token.startswith("$pat")
    
    
    def get_saved_tokens():
    
        for filepath in PYBIS_FOLDER.glob("*.token"):
    
            with open(filepath) as fh:
                if filepath.is_file:
    
                    token = fh.read()
                    tokens[filepath.stem] = token
    
    def get_token_for_hostname(hostname, session_token_needed=True):
        """Searches for a stored token for a given host in this order:
        ~/.pybis/hostname.token
        """
        tokens = get_saved_tokens()
        if hostname in tokens:
            if session_token_needed:
                if is_session_token(tokens[hostname]):
                    return tokens[hostname]
            else:
                return tokens[hostname]
        return
    
    
    
    def save_pats_to_disk(hostname: str, url: str, resp: dict) -> None:
    
        pats = resp["objects"]
        parse_jackson(pats)
        path = PYBIS_FOLDER / hostname
        path.mkdir(exist_ok=True)
        for existing_file in path.glob("*.pat"):
            existing_file.unlink()
    
        for token in pats:
            data = {
    
                "hostname": hostname,
                "owner": token["owner"]["userId"],
                "registrationDate": format_timestamp(token["owner"]["registrationDate"]),
                "validFromDate": format_timestamp(token["validFromDate"]),
                "validToDate": format_timestamp(token["validToDate"]),
                "sessionName": token["sessionName"],
                "permId": token["permId"]["permId"],
            }
            with open(path / (token["hash"] + ".pat"), "w", encoding="utf-8") as fh:
                fh.write(json.dumps(data, indent=4))
    
    
    def get_saved_pats(hostname=None, sessionName=None):
        """return all personal access tokens stored on disk."""
        if hostname is None:
            hostname = ""
        path = PYBIS_FOLDER / hostname
        tokens = []
        for filepath in path.rglob("*.pat"):
            with open(filepath) as fh:
                if filepath.is_file:
                    pat = json.load(fh)
                    if sessionName:
                        if pat["sessionName"] != sessionName:
                            continue
                    tokens.append(pat)
        return tokens
    
    
    
        ident = ident.strip()
    
        """Returns the data type for a given identifier/permId for use with the API call, e.g.
        {
            "identifier": "/DEFAULT/SAMPLE_NAME",
            "@type": "as.dto.sample.id.SampleIdentifier"
        }
        or
        {
            "permId": "20160817175233002-331",
            "@type": "as.dto.sample.id.SamplePermId"
        }
        """
    
        # Tags have strange permIds...
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if entity.lower() == "tag":
            if "/" in ident:
                if not ident.startswith("/"):
                    ident = "/" + ident
                return {"permId": ident, "@type": "as.dto.tag.id.TagPermId"}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                return {"code": ident, "@type": "as.dto.tag.id.TagCode"}
    
        if entity == "personalAccessToken":
            return {"permId": ident, "@type": "as.dto.pat.id.PersonalAccessTokenPermId"}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "sample": "Sample",
            "dataset": "DataSet",
            "experiment": "Experiment",
            "plugin": "Plugin",
            "space": "Space",
            "project": "Project",
    
        if entity.lower() in entities:
            entity_capitalize = entities[entity.lower()]
        else:
            entity_capitalize = entity.capitalize()
    
            # people tend to omit the / prefix of an identifier...
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if not ident.startswith("/"):
                ident = "/" + ident
    
            # ELN-LIMS style contains also experiment in sample identifer, i.e. /space/project/experiment/sample_code
            # we have to remove the experiment-code
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if ident.count("/") == 4:
                codes = ident.split("/")
                ident = "/".join([codes[0], codes[1], codes[2], codes[4]])
    
            search_request = {
                "identifier": ident.upper(),
    
                "@type": f"as.dto.{entity.lower()}.id.{entity_capitalize}Identifier",
    
                "@type": f"as.dto.{entity.lower()}.id.{entity_capitalize}PermId",
    
    def get_search_criteria(entity, **search_args):
        search_criteria = get_search_type_for_entity(entity)
    
        criteria = []
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        attrs = openbis_definitions(entity)["attrs"]
    
            if attr in search_args:
                sub_crit = get_search_type_for_entity(attr)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                sub_crit["fieldValue"] = get_field_value_search(attr, search_args[attr])
    
                criteria.append(sub_crit)
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        search_criteria["criteria"] = criteria
        search_criteria["operator"] = "AND"
    
    def crc32(fileName):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        """since Python3 the zlib module returns unsigned integers (2.7: signed int)"""
    
        prev = 0
    
        for eachLine in open(fileName, "rb"):
    
            prev = zlib.crc32(eachLine, prev)
        # return as hex
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _tagIds_for_tags(tags=None, action="Add"):
        """creates an action item to add or remove tags.
    
        Action is either 'Add', 'Remove' or 'Set'
    
        """
        if tags is None:
            return
        if not isinstance(tags, list):
            tags = [tags]
    
    
        items = list(map(lambda tag: {"code": tag, "@type": "as.dto.tag.id.TagCode"}, tags))
    
                    "@type": f"as.dto.common.update.ListUpdateAction{action.capitalize()}",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "@type": "as.dto.common.update.IdListUpdateValue",
    
        return tagIds
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _list_update(ids=None, entity=None, action="Add"):
        """creates an action item to add, set or remove ids."""
    
        if ids is None:
            return
        if not isinstance(ids, list):
            ids = [ids]
    
    
        items = list(
            map(
                lambda id: {
                    "code": id,
    
                    "@type": f"as.dto.{entity.lower()}.id.{entity}Code",
    
                    "@type": f"as.dto.common.update.ListUpdateAction{action.capitalize()}",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "@type": "as.dto.common.update.IdListUpdateValue",
    
    def get_field_value_search(field, value, comparison="StringEqualToValue"):
    
        return {"value": value, "@type": f"as.dto.common.search.{comparison}"}
    
    def _common_search(search_type, value, comparison="StringEqualToValue"):
        sreq = {
    
                "@type": f"as.dto.common.search.{comparison}",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            },
    
    def _criteria_for_code(code):
        return {
            "fieldValue": {
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "@type": "as.dto.common.search.StringEqualToValue",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "@type": "as.dto.common.search.CodeSearchCriteria",
    
    def _criteria_for_permId(permId):
        return {
            "fieldName": "perm_id",
            "fieldType": "ATTRIBUTE",
            "fieldValue": {
                "value": permId,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "@type": "as.dto.common.search.StringEqualToValue",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "@type": "as.dto.common.search.PermIdSearchCriteria",
    
    def _subcriteria_for_userId(userId):
        return {
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "criteria": [
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    "fieldName": "userId",
                    "fieldType": "ATTRIBUTE",
                    "fieldValue": {
                        "value": userId,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        "@type": "as.dto.common.search.StringEqualToValue",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    },
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    "@type": "as.dto.person.search.UserIdSearchCriteria",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            ],
            "@type": "as.dto.person.search.PersonSearchCriteria",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "operator": "AND",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        }
    
    def _subcriteria_for_type(code, entity):
    
            "@type": f"as.dto.{entity.lower()}.search.{entity}TypeSearchCriteria",
    
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldValue": {
                        "value": code.upper(),
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            ],
    
    def _subcriteria_for_status(status_value):
        status_value = status_value.upper()
        valid_status = "AVAILABLE LOCKED ARCHIVED UNARCHIVE_PENDING ARCHIVE_PENDING BACKUP_PENDING".split()
        if not status_value in valid_status:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            raise ValueError(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "status must be one of the following: " + ", ".join(valid_status)
            )
    
    
        return {
            "@type": "as.dto.dataset.search.PhysicalDataSearchCriteria",
            "operator": "AND",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "criteria": [
                {
                    "@type": "as.dto.dataset.search.StatusSearchCriteria",
                    "fieldName": "status",
                    "fieldType": "ATTRIBUTE",
                    "fieldValue": status_value,
                }
            ],
    
    def _gen_search_criteria(req):
    
        sreq = {}
        for key, val in req.items():
            if key == "criteria":
    
                sreq["criteria"] = list(
                    map(lambda item: _gen_search_criteria(item), req["criteria"])
                )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                sreq["criteria"] = [
                    _common_search("as.dto.common.search.CodeSearchCriteria", val.upper())
                ]
    
                if is_identifier(val):
                    # if we have an identifier, we need to search in Space and Code separately
                    si = split_identifier(val)
                    sreq["criteria"] = []
                    if "space" in si:
                        sreq["criteria"].append(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            _gen_search_criteria({"space": "Space", "code": si["space"]})
    
                        )
                    if "experiment" in si:
                        pass
    
                    if "code" in si:
                        sreq["criteria"].append(
                            _common_search(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                                "as.dto.common.search.CodeSearchCriteria",
                                si["code"].upper(),
    
                elif is_permid(val):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    sreq["criteria"] = [
                        _common_search("as.dto.common.search.PermIdSearchCriteria", val)
                    ]
    
                else:
                    # we assume we just got a code
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    sreq["criteria"] = [
                        _common_search(
                            "as.dto.common.search.CodeSearchCriteria", val.upper()
                        )
                    ]
    
                sreq["@type"] = f"as.dto.{key}.search.{val}SearchCriteria"
    
    def _subcriteria_for_tags(tags):
        if not isinstance(tags, list):
            tags = [tags]
    
    
        criteria = list(
            map(
                lambda tag: {
                    "fieldName": "code",
                    "fieldType": "ATTRIBUTE",
                    "fieldValue": {
                        "value": tag,
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                },
                tags,
            )
        )
    
    
        return {
            "@type": "as.dto.tag.search.TagSearchCriteria",
            "operator": "AND",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "criteria": criteria,
    
    def _subcriteria_for_is_finished(is_finished):
        return {
            "@type": "as.dto.common.search.StringPropertySearchCriteria",
            "fieldName": "FINISHED_FLAG",
            "fieldType": "PROPERTY",
            "fieldValue": {
                "value": is_finished,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "@type": "as.dto.common.search.StringEqualToValue",
            },
    
    def _subcriteria_for_properties(prop, value, entity):
    
        """This internal method creates the JSON RPC criterias for searching
        in properties. It distinguishes between numbers, dates and strings
        and uses the comparative operator (< > >= <=), if available.
        creationDate and modificationDate attributes can be searched as well.
        To search in the properties of parents, children, etc. the user has to
        prefix the propery accordingly:
    
        - parent_propertyName
        - child_propertyName
        - container_propertyName
        """
    
        additional_attr = {}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if "*" in str(value):
            additional_attr["useWildcards"] = True
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            additional_attr["useWildcards"] = False
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "sample": {
                "parent": "as.dto.sample.search.SampleParentsSearchCriteria",
                "parents": "as.dto.sample.search.SampleParentsSearchCriteria",
                "child": "as.dto.sample.search.SampleChildrenSearchCriteria",
                "children": "as.dto.sample.search.SampleChildrenSearchCriteria",
                "container": "as.dto.sample.search.SampleContainerSearchCriteria",
            },
            "dataset": {
                "parent": "as.dto.dataset.search.DataSetParentsSearchCriteria",
                "parents": "as.dto.dataset.search.DataSetParentsSearchCriteria",
                "child": "as.dto.dataset.search.DataSetChildrenSearchCriteria",
                "children": "as.dto.dataset.search.DataSetChildrenSearchCriteria",
                "container": "as.dto.dataset.search.DataSetContainerSearchCriteria",
    
        # default values of fieldType, str_type and eq_type
        fieldType = "PROPERTY"
        eq_type = "as.dto.common.search.StringEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        str_type = "as.dto.common.search.StringPropertySearchCriteria"
    
        is_date = False
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if "date" in prop.lower() and re.search(r"\d{4}\-\d{2}\-\d{2}", value):
    
            is_date = True
            eq_type = "as.dto.common.search.DateEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if prop.lower().endswith("registrationdate"):
    
                str_type = "as.dto.common.search.RegistrationDateSearchCriteria"
                fieldType = "ATTRIBUTE"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            elif prop.lower().endswith("modificationdate"):
    
                str_type = "as.dto.common.search.ModificationDateSearchCriteria"
                fieldType = "ATTRIBUTE"
            else:
                str_type = "as.dto.common.search.DatePropertySearchCriteria"
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if any(str(value).startswith(operator) for operator in [">", "<", "="]):
            match = re.search(
                r"""
    
                ^
                (?P<comp_operator>\>\=|\>|\<\=|\<|\=\=|\=)  # extract the comparative operator
                \s*
                (?P<value>.*)                           # extract the value
                """,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                value,
                flags=re.X,
            )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                comp_operator = match.groupdict()["comp_operator"]
                value = match.groupdict()["value"]
    
    
                # date comparison
                if is_date:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    if comp_operator == ">":
    
                        eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == ">=":
    
                        eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "<":
    
                        eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "<=":
    
                        eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
                    else:
                        eq_type = "as.dto.common.search.DateEqualToValue"
    
                # numeric comparison
                elif is_number(value):
                    str_type = "as.dto.common.search.NumberPropertySearchCriteria"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    if comp_operator == ">":
    
                        eq_type = "as.dto.common.search.NumberGreaterThanValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == ">=":
    
                        eq_type = "as.dto.common.search.NumberGreaterThanOrEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "<":
    
                        eq_type = "as.dto.common.search.NumberLessThanValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "<=":
    
                        eq_type = "as.dto.common.search.NumberLessThanOrEqualToValue"
                    else:
                        eq_type = "as.dto.common.search.NumberEqualToValue"
    
                # string comparison
                else:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    if comp_operator == ">":
    
                        eq_type = "as.dto.common.search.StringGreaterThanValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == ">=":
    
                        eq_type = "as.dto.common.search.StringGreaterThanOrEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "<":
    
                        eq_type = "as.dto.common.search.StringLessThanValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "<=":
    
                        eq_type = "as.dto.common.search.StringLessThanOrEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    elif comp_operator == "=":
    
                        eq_type = "as.dto.common.search.StringEqualToValue"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        additional_attr["useWildcards"] = False
    
                    else:
                        eq_type = "as.dto.common.search.StringEqualToValue"
    
    
        # searching for parent/child/container identifier
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if any(
    
                relation == prop.lower()
                for relation in [
                    "parent",
                    "child",
                    "container",
                    "parents",
                    "children",
                    "containers",
                ]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        ):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            relation = prop.lower()
    
            if is_identifier(value):
                identifier_search_type = "as.dto.common.search.IdentifierSearchCriteria"
    
            # find any parent, child, container
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            elif value == "*":
    
                return {
                    "@type": search_types[entity][relation],
                    "criteria": [
                        {
                            "@type": "as.dto.common.search.AnyFieldSearchCriteria",
                            "fieldValue": {
                                "@type": "as.dto.common.search.AnyStringValue",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            },
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    ],
    
                }
            elif is_permid(value):
                identifier_search_type = "as.dto.common.search.PermIdSearchCriteria"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            else:
    
                identifier_search_type = "as.dto.common.search.CodeSearchCriteria"
            return {
                "@type": search_types[entity][relation],
                "criteria": [
                    {
                        "@type": identifier_search_type,
                        "fieldType": "ATTRIBUTE",
                        "fieldValue": {
                            "@type": "as.dto.common.search.StringEqualToValue",
                            "value": value,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                        **additional_attr,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                ],
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        # searching for parent/child/container property:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        elif any(
    
                prop.lower().startswith(relation)
                for relation in ["parent_", "child_", "container_"]
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        ):
            match = re.search(r"^(\w+?)_(.*)", prop.lower())
    
            if match:
                relation, property_name = match.groups()
                return {
                    "@type": search_types[entity][relation],
                    "criteria": [
                        {
                            "@type": str_type,
                            "fieldName": property_name.upper(),
    
                            "fieldType": fieldType,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                            **additional_attr,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    ],
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if prop.startswith("_"):
            fieldName = "$" + prop[1:]
    
        return {
            "@type": str_type,
    
            "fieldType": fieldType,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "fieldValue": {"value": value, "@type": eq_type},
            **additional_attr,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _subcriteria_for(thing, entity, parents_or_children="", operator="AND"):
    
        """Returns the sub-search criteria for «thing», which can be either:
        - a python object (sample, dataSet, experiment)
        - a permId
        - an identifier
        - a code
        """
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        entity, *_ = entity.split(".")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            new_entity = ".".join(_)
    
            subcrit = _subcriteria_for(thing, new_entity)
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_type = get_type_for_entity(entity, "search", parents_or_children)
            return {"criteria": subcrit, **search_type, "operator": operator}
    
        if isinstance(thing, str):
            if is_permid(thing):
                return _subcriteria_for_permid(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    thing,
    
                    entity=entity,
                    parents_or_children=parents_or_children,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    operator=operator,
    
                )
            elif is_identifier(thing):
                return _subcriteria_for_identifier(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    thing,
    
                    entity=entity,
                    parents_or_children=parents_or_children,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    operator=operator,
    
                )
            else:
                # look for code
                return _subcriteria_for_code_new(
                    thing,
                    entity=entity,
                    parents_or_children=parents_or_children,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    operator=operator,
    
                )
    
        elif isinstance(thing, list):
            criteria = []
            for element in thing:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                crit = _subcriteria_for(element, entity, parents_or_children, operator)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return {"criteria": criteria, "@type": crit["@type"], "operator": "OR"}
    
        elif thing is None:
            # we just need the type
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            search_type = get_type_for_entity(entity, "search", parents_or_children)
            return {"criteria": [], **search_type, "operator": operator}
    
        else:
            # we passed an object
            return _subcriteria_for_permid(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                thing.permId,
    
                entity=entity,
                parents_or_children=parents_or_children,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                operator=operator,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _subcriteria_for_identifier(ids, entity, parents_or_children="", operator="AND"):
    
        if not isinstance(ids, list):
            ids = [ids]
    
    
        criteria = list(
            map(
                lambda id: {
                    "@type": "as.dto.common.search.IdentifierSearchCriteria",
                    "fieldValue": {
                        "value": id,
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
                    "fieldType": "ATTRIBUTE",
                    "fieldName": "identifier",
                },
                ids,
            )
        )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        search_type = get_type_for_entity(entity, "search", parents_or_children)
        return {"criteria": criteria, **search_type, "operator": operator}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _subcriteria_for_permid(permids, entity, parents_or_children="", operator="AND"):
    
        if not isinstance(permids, list):
            permids = [permids]
    
    
        criteria = list(
            map(
                lambda permid: {
                    "@type": "as.dto.common.search.PermIdSearchCriteria",
                    "fieldValue": {
                        "value": permid,
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
                    "fieldType": "ATTRIBUTE",
                    "fieldName": "perm_id",
                },
                permids,
            )
        )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        search_type = get_type_for_entity(entity, "search", parents_or_children)
        return {"criteria": criteria, **search_type, "operator": operator}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _subcriteria_for_permid_new(codes, entity, parents_or_children="", operator="AND"):
    
        if not isinstance(codes, list):
            codes = [codes]
    
    
        criteria = list(
            map(
                lambda code: {
                    "@type": "as.dto.common.search.PermIdSearchCriteria",
                    "fieldValue": {
                        "value": code,
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
                    "fieldType": "ATTRIBUTE",
                    "fieldName": "perm_id",
                },
                codes,
            )
        )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        search_type = get_type_for_entity(entity, "search", parents_or_children)
        return {"criteria": criteria, **search_type, "operator": operator}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def _subcriteria_for_code_new(codes, entity, parents_or_children="", operator="AND"):
    
        if not isinstance(codes, list):
            codes = [codes]
    
    
        criteria = list(
            map(
                lambda code: {
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldValue": {
                        "value": code,
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
                    "fieldType": "ATTRIBUTE",
                    "fieldName": "code",
                },
                codes,
            )
        )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        search_type = get_type_for_entity(entity, "search", parents_or_children)
        return {"criteria": criteria, **search_type, "operator": operator}
    
    def _subcriteria_for_code(code, entity):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        """Creates the often used search criteria for code values. Returns a dictionary.
    
    
        Example::
            _subcriteria_for_code("username", "space")
    
    
        {
            "criteria": [
                {
                    "fieldType": "ATTRIBUTE",
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldName": "code",
                    "fieldValue": {
                        "@type": "as.dto.common.search.StringEqualToValue",
                        "value": "USERNAME"
                    }
                }
            ],
            "operator": "AND",
            "@type": "as.dto.space.search.SpaceSearchCriteria"
        }
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if code is not None:
            if is_permid(code):
                fieldname = "permId"
                fieldtype = "as.dto.common.search.PermIdSearchCriteria"
            else:
                fieldname = "code"
                fieldtype = "as.dto.common.search.CodeSearchCriteria"
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            # search_criteria = get_search_type_for_entity(entity.lower())
            search_criteria = get_type_for_entity(entity, "search")
            search_criteria["criteria"] = [
                {
                    "fieldName": fieldname,
                    "fieldType": "ATTRIBUTE",
                    "fieldValue": {
                        "value": code.upper(),
                        "@type": "as.dto.common.search.StringEqualToValue",
                    },
                    "@type": fieldtype,
                }
            ]
    
            search_criteria["operator"] = "AND"
            return search_criteria
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            return get_type_for_entity(entity, "search")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            # return get_search_type_for_entity(entity.lower())
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        """Interface for communicating with openBIS.
    
    
        Note:
            * A recent version of openBIS is required (minimum 16.05.2).
            * For creation of datasets, the dataset-uploader-api ingestion plugin must be present.
    
    
        token: str
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def __init__(
    
                self,
                url=None,
                verify_certificates=True,
                token=None,
                use_cache=True,
                allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks=False,
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        ):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Initialize a new connection to an openBIS server.
    
    
            Examples:
                o = Openbis('https://openbis.example.com')
                o_test = Openbis('https://test_openbis.example.com:8443', verify_certificates=False)
    
            Args:
                url (str): https://openbis.example.com
                verify_certificates (bool): set to False when you use self-signed certificates
                token (str): a valid openBIS token. If not set, pybis will try to read a valid token from ~/.pybis
    
                use_cache: make openBIS to store spaces, projects, sample types, vocabulary terms and oder more-or-less static objects to optimise speed
    
                allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks (bool): False
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            self.as_v3 = "/openbis/openbis/rmi-application-server-v3.json"
            self.as_v1 = "/openbis/openbis/rmi-general-information-v1.json"
            self.reg_v1 = "/openbis/openbis/rmi-query-v1.json"
    
            self.verify_certificates = verify_certificates
    
            if not verify_certificates:
                urllib3.disable_warnings()
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                url = os.environ.get("OPENBIS_URL") or os.environ.get("OPENBIS_HOST")
    
                if url is None:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    raise ValueError("please provide a URL you want to connect to.")
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if not url.startswith("http"):
                url = "https://" + url
    
            url_obj = urlparse(url)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if url_obj.netloc is None or url_obj.netloc == "":
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                raise ValueError(
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                    "please provide the url in this format: https://openbis.host.ch:8443"
                )