Newer
Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
pybis.py
"""
Swen Vermeul
committed
from __future__ import print_function
import os
Chandrasekhar Ramakrishnan
committed
import random
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
Swen Vermeul
committed
import time
from urllib.parse import urlparse, urljoin, quote
from collections import namedtuple
from texttable import Texttable
from tabulate import tabulate
Chandrasekhar Ramakrishnan
committed
from . import data_set as pbds
Swen Vermeul
committed
from .utils import parse_jackson, check_datatype, split_identifier, format_timestamp, is_identifier, is_permid, nvl, VERBOSE
from .property import PropertyHolder, PropertyAssignments
from .masterdata import Vocabulary
from .openbis_object import OpenBisObject
from .definitions import fetch_option
# import the various openBIS entities
from .space import Space
from .project import Project
from .experiment import Experiment
from .sample import Sample
from .dataset import DataSet
from .person import Person
from .group import Group
from .role_assignment import RoleAssignment
from .tag import Tag
from .semantic_annotation import SemanticAnnotation
Swen Vermeul
committed
import pandas as pd
Chandrasekhar Ramakrishnan
committed
from datetime import datetime
LOG_NONE = 0
LOG_SEVERE = 1
LOG_ERROR = 2
LOG_WARNING = 3
LOG_INFO = 4
LOG_ENTRY = 5
LOG_PARM = 6
LOG_DEBUG = 7
DEBUG_LEVEL = LOG_NONE
def get_search_type_for_entity(entity, operator=None):
""" Returns a dictionary containing the correct search criteria type
for a given entity.
get_search_type_for_entity('space')
# returns:
{'@type': 'as.dto.space.search.SpaceSearchCriteria'}
"""
search_criteria = {
"space": "as.dto.space.search.SpaceSearchCriteria",
"userId": "as.dto.person.search.UserIdSearchCriteria",
"email": "as.dto.person.search.EmailSearchCriteria",
"firstName": "as.dto.person.search.FirstNameSearchCriteria",
"lastName": "as.dto.person.search.LastNameSearchCriteria",
"project": "as.dto.project.search.ProjectSearchCriteria",
"experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
"experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
"sample": "as.dto.sample.search.SampleSearchCriteria",
"sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
"dataset": "as.dto.dataset.search.DataSetSearchCriteria",
"dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
"external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
"material": "as.dto.material.search.MaterialSearchCriteria",
"material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
"vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
"tag": "as.dto.tag.search.TagSearchCriteria",
"authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
"roleAssignment": "as.dto.roleassignment.search.RoleAssignmentSearchCriteria",
"person": "as.dto.person.search.PersonSearchCriteria",
"code": "as.dto.common.search.CodeSearchCriteria",
"sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
"global": "as.dto.global.GlobalSearchObject",
}
sc = { "@type": search_criteria[entity] }
if operator is not None:
sc["operator"] = operator
return sc
def get_attrs_for_entity(entity):
""" For a given entity this method returns an iterator for all searchable
attributes.
"""
search_args = {
"person": ['firstName','lastName','email','userId']
}
for search_arg in search_args[entity]:
yield search_arg
def search_request_for_identifier(ident, entity):
search_request = {}
Swen Vermeul
committed
if is_identifier(ident):
search_request = {
"identifier": ident.upper(),
"@type": "as.dto.{}.id.{}Identifier".format(entity.lower(), entity.capitalize())
}
else:
search_request = {
"permId": ident,
"@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity.capitalize())
}
return search_request
def get_search_criteria(entity, **search_args):
search_criteria = get_search_type_for_entity(entity)
criteria = []
for attr in get_attrs_for_entity(entity):
if attr in search_args:
sub_crit = get_search_type_for_entity(attr)
sub_crit['fieldValue'] = get_field_value_search(attr, search_args[attr])
criteria.append(sub_crit)
search_criteria['criteria'] = criteria
search_criteria['operator'] = "AND"
return search_criteria
return '' if obj is None else str(obj)
return '' if obj['code'] is None else obj['code']
def extract_deletion(obj):
del_objs = []
for deleted_object in obj['deletedObjects']:
del_objs.append({
"reason": obj['reason'],
"permId": deleted_object["id"]["permId"],
"type": deleted_object["id"]["@type"]
})
return del_objs
def extract_identifier(ident):
if not isinstance(ident, dict):
return str(ident)
return ident['identifier']
def extract_nested_identifier(ident):
if not isinstance(ident, dict):
return str(ident)
return ident['identifier']['identifier']
def extract_permid(permid):
if not isinstance(permid, dict):
return str(permid)
return permid['permId']
def extract_nested_permid(permid):
if not isinstance(permid, dict):
return '' if permid is None else str(permid)
return '' if permid['permId']['permId'] is None else permid['permId']['permId']
def extract_property_assignments(pas):
pa_strings = []
for pa in pas:
if not isinstance(pa['propertyType'], dict):
pa_strings.append(pa['propertyType'])
else:
pa_strings.append(pa['propertyType']['label'])
return pa_strings
def extract_role_assignments(ras):
ra_strings = []
Loading
Loading full blame...