Skip to content
Snippets Groups Projects
pybis.py 83.7 KiB
Newer Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
pybis.py

Swen Vermeul's avatar
Swen Vermeul committed
Work with openBIS from Python.
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

import json
import re
from urllib.parse import urlparse, urljoin, quote
import zlib
from texttable import Texttable
from tabulate import tabulate
from .utils import parse_jackson, check_datatype, split_identifier, format_timestamp, is_identifier, is_permid, nvl, VERBOSE
from .utils import extract_permid, extract_code,extract_deletion,extract_identifier,extract_nested_identifier,extract_nested_permid,extract_property_assignments,extract_role_assignments,extract_person, extract_person_details,extract_id,extract_userId
from .property import PropertyHolder, PropertyAssignments
from .masterdata import Vocabulary
from .openbis_object import OpenBisObject 
from .definitions import fetch_option

# import the various openBIS entities
from .things import Things
from .space import Space
from .project import Project
from .experiment import Experiment
from .sample import Sample
from .dataset import DataSet
from .person import Person
from .group import Group
from .role_assignment import RoleAssignment
from .tag import Tag
from .semantic_annotation import SemanticAnnotation
from pandas import DataFrame, Series
LOG_NONE    = 0
LOG_SEVERE  = 1
LOG_ERROR   = 2
LOG_WARNING = 3
LOG_INFO    = 4
LOG_ENTRY   = 5
LOG_PARM    = 6
LOG_DEBUG   = 7

DEBUG_LEVEL = LOG_NONE


def get_search_type_for_entity(entity, operator=None):
    """ Returns a dictionary containing the correct search criteria type
    for a given entity.

        get_search_type_for_entity('space')
        # returns:
        {'@type': 'as.dto.space.search.SpaceSearchCriteria'}
    """
    search_criteria = {
        "space": "as.dto.space.search.SpaceSearchCriteria",
        "userId": "as.dto.person.search.UserIdSearchCriteria",
        "email": "as.dto.person.search.EmailSearchCriteria",
        "firstName": "as.dto.person.search.FirstNameSearchCriteria",
        "lastName": "as.dto.person.search.LastNameSearchCriteria",
        "project": "as.dto.project.search.ProjectSearchCriteria",
        "experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
        "experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
        "sample": "as.dto.sample.search.SampleSearchCriteria",
        "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
        "dataset": "as.dto.dataset.search.DataSetSearchCriteria",
        "dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
        "external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
        "material": "as.dto.material.search.MaterialSearchCriteria",
        "material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
        "vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
        "tag": "as.dto.tag.search.TagSearchCriteria",
        "authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
        "roleAssignment": "as.dto.roleassignment.search.RoleAssignmentSearchCriteria",
        "person": "as.dto.person.search.PersonSearchCriteria",
        "code": "as.dto.common.search.CodeSearchCriteria",
        "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
        "global": "as.dto.global.GlobalSearchObject",
    }

    sc = { "@type": search_criteria[entity] }
    if operator is not None:
        sc["operator"] = operator

    return sc

def get_attrs_for_entity(entity):
    """ For a given entity this method returns an iterator for all searchable
    attributes.
    """
    search_args = {
        "person": ['firstName','lastName','email','userId']
    }
    for search_arg in search_args[entity]:
        yield search_arg

def search_request_for_identifier(ident, entity):
        search_request = {
            "identifier": ident.upper(),
            "@type": "as.dto.{}.id.{}Identifier".format(entity.lower(), entity.capitalize())
            "@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity.capitalize())
def get_search_criteria(entity, **search_args):
    search_criteria = get_search_type_for_entity(entity)

    criteria = []
    for attr in get_attrs_for_entity(entity):
        if attr in search_args:
            sub_crit = get_search_type_for_entity(attr)
            sub_crit['fieldValue'] = get_field_value_search(attr, search_args[attr])
            criteria.append(sub_crit)

    search_criteria['criteria'] = criteria
    search_criteria['operator'] = "AND"

    return search_criteria

def crc32(fileName):
    """since Python3 the zlib module returns unsigned integers (2.7: signed int)
    """
    prev = 0
    for eachLine in open(fileName, "rb"):
        prev = zlib.crc32(eachLine, prev)
    # return as hex

def _create_tagIds(tags=None):
    if tags is None:
        return None
    if not isinstance(tags, list):
        tags = [tags]
    tagIds = []
    for tag in tags:
        tagIds.append({
            "code": tag, 
            "@type": "as.dto.tag.id.TagCode"
        })
def _tagIds_for_tags(tags=None, action='Add'):
    """creates an action item to add or remove tags. 
    Action is either 'Add', 'Remove' or 'Set'
    """
    if tags is None:
        return
    if not isinstance(tags, list):
        tags = [tags]

    items = []
    for tag in tags:
        items.append({
            "code": tag,
            "@type": "as.dto.tag.id.TagCode"
        })

    tagIds = {
        "actions": [
            {
                "items": items,
                "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
            }
        ],
        "@type": "as.dto.common.update.IdListUpdateValue"
    }
    return tagIds

Loading
Loading full blame...