Skip to content
Snippets Groups Projects
pybis.py 89.2 KiB
Newer Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
pybis.py

Swen Vermeul's avatar
Swen Vermeul committed
Work with openBIS from Python.
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

import json
import re
from urllib.parse import urlparse, urljoin, quote
import zlib
from texttable import Texttable
from tabulate import tabulate
from .utils import parse_jackson, check_datatype, split_identifier, format_timestamp, is_identifier, is_permid, nvl, VERBOSE
from .property import PropertyHolder, PropertyAssignments
from .masterdata import Vocabulary
from .openbis_object import OpenBisObject 
from .definitions import fetch_option

# import the various openBIS entities
from .space import Space
from .project import Project
from .experiment import Experiment
from .sample import Sample
from .dataset import DataSet
from .person import Person
from .group import Group
from .role_assignment import RoleAssignment
from .tag import Tag
from .semantic_annotation import SemanticAnnotation
from pandas import DataFrame, Series
LOG_NONE    = 0
LOG_SEVERE  = 1
LOG_ERROR   = 2
LOG_WARNING = 3
LOG_INFO    = 4
LOG_ENTRY   = 5
LOG_PARM    = 6
LOG_DEBUG   = 7

DEBUG_LEVEL = LOG_NONE


def get_search_type_for_entity(entity, operator=None):
    """ Returns a dictionary containing the correct search criteria type
    for a given entity.

        get_search_type_for_entity('space')
        # returns:
        {'@type': 'as.dto.space.search.SpaceSearchCriteria'}
    """
    search_criteria = {
        "space": "as.dto.space.search.SpaceSearchCriteria",
        "userId": "as.dto.person.search.UserIdSearchCriteria",
        "email": "as.dto.person.search.EmailSearchCriteria",
        "firstName": "as.dto.person.search.FirstNameSearchCriteria",
        "lastName": "as.dto.person.search.LastNameSearchCriteria",
        "project": "as.dto.project.search.ProjectSearchCriteria",
        "experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
        "experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
        "sample": "as.dto.sample.search.SampleSearchCriteria",
        "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
        "dataset": "as.dto.dataset.search.DataSetSearchCriteria",
        "dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
        "external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
        "material": "as.dto.material.search.MaterialSearchCriteria",
        "material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
        "vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
        "tag": "as.dto.tag.search.TagSearchCriteria",
        "authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
        "roleAssignment": "as.dto.roleassignment.search.RoleAssignmentSearchCriteria",
        "person": "as.dto.person.search.PersonSearchCriteria",
        "code": "as.dto.common.search.CodeSearchCriteria",
        "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
        "global": "as.dto.global.GlobalSearchObject",
    }

    sc = { "@type": search_criteria[entity] }
    if operator is not None:
        sc["operator"] = operator

    return sc

def get_attrs_for_entity(entity):
    """ For a given entity this method returns an iterator for all searchable
    attributes.
    """
    search_args = {
        "person": ['firstName','lastName','email','userId']
    }
    for search_arg in search_args[entity]:
        yield search_arg

def search_request_for_identifier(ident, entity):
        search_request = {
            "identifier": ident.upper(),
            "@type": "as.dto.{}.id.{}Identifier".format(entity.lower(), entity.capitalize())
            "@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity.capitalize())
def get_search_criteria(entity, **search_args):
    search_criteria = get_search_type_for_entity(entity)

    criteria = []
    for attr in get_attrs_for_entity(entity):
        if attr in search_args:
            sub_crit = get_search_type_for_entity(attr)
            sub_crit['fieldValue'] = get_field_value_search(attr, search_args[attr])
            criteria.append(sub_crit)

    search_criteria['criteria'] = criteria
    search_criteria['operator'] = "AND"

    return search_criteria

def extract_code(obj):
Swen Vermeul's avatar
Swen Vermeul committed
    if not isinstance(obj, dict):
        return '' if obj is None else str(obj)
    return '' if obj['code'] is None else obj['code']
def extract_deletion(obj):
    del_objs = []
    for deleted_object in obj['deletedObjects']:
        del_objs.append({
            "reason": obj['reason'],
            "permId": deleted_object["id"]["permId"],
            "type": deleted_object["id"]["@type"]
        })
    return del_objs

def extract_identifier(ident):
    if not isinstance(ident, dict):
        return str(ident)
    return ident['identifier']

def extract_nested_identifier(ident):
    if not isinstance(ident, dict):
        return str(ident)
    return ident['identifier']['identifier']

Swen Vermeul's avatar
Swen Vermeul committed
def extract_permid(permid):
    if not isinstance(permid, dict):
        return str(permid)
    return permid['permId']

Swen Vermeul's avatar
Swen Vermeul committed
def extract_nested_permid(permid):
    if not isinstance(permid, dict):
        return '' if permid is None else str(permid)
    return '' if permid['permId']['permId'] is None else permid['permId']['permId'] 
Swen Vermeul's avatar
Swen Vermeul committed

def extract_property_assignments(pas):
    pa_strings = []
    for pa in pas:
        if not isinstance(pa['propertyType'], dict):
            pa_strings.append(pa['propertyType'])
        else:
            pa_strings.append(pa['propertyType']['label'])
    return pa_strings

def extract_role_assignments(ras):
    ra_strings = []
Loading
Loading full blame...