Skip to content
Snippets Groups Projects
pybis.py 180 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    """
    pybis.py
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    Work with openBIS from Python.
    
    import requests
    
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    
    import json
    import re
    
    from urllib.parse import urlparse, urljoin, quote
    
    import zlib
    
    from texttable import Texttable
    
    from tabulate import tabulate
    
    
    from pybis.utils import parse_jackson, check_datatype, split_identifier, format_timestamp, is_identifier, is_permid, nvl
    
    from pybis.property import PropertyHolder, PropertyAssignments
    from pybis.masterdata import Vocabulary
    
    import pandas as pd
    from pandas import DataFrame, Series
    
    
    import threading
    from threading import Thread
    from queue import Queue
    
    PYBIS_PLUGIN = "dataset-uploader-api"
    
    # display messages when in a interactive context (IPython or Jupyter)
    try:
        get_ipython()
    except Exception:
        VERBOSE = False
    else:
        VERBOSE = True
    
    
    LOG_NONE    = 0
    LOG_SEVERE  = 1
    LOG_ERROR   = 2
    LOG_WARNING = 3
    LOG_INFO    = 4
    LOG_ENTRY   = 5
    LOG_PARM    = 6
    LOG_DEBUG   = 7
    
    DEBUG_LEVEL = LOG_NONE
    
    
    def _definitions(entity):
    
            "Space": {
                "attrs_new": "code description".split(),
                "attrs_up": "description".split(),
    
                "attrs": "code permId description registrator registrationDate modificationDate".split(),
    
            },
            "Project": {
                "attrs_new": "code description space attachments".split(),
                "attrs_up": "description space attachments".split(),
                "attrs": "code description permId identifier space leader registrator registrationDate modifier modificationDate attachments".split(),
                "multi": "".split(),
                "identifier": "projectId",
            },
            "Experiment": {
    
                "attrs_new": "code type project tags attachments".split(),
                "attrs_up": "project tags attachments".split(),
                "attrs": "code permId identifier type project tags attachments".split(),
    
                "multi": "tags attachments".split(),
                "identifier": "experimentId",
            },
    
                "attrs_new": "code type parents children space experiment tags attachments".split(),
                "attrs_up": "parents children space experiment tags attachments".split(),
    
                "attrs": "code permId identifier type parents children components space experiment tags attachments".split(),
    
                    'parentIds': {'permId': {'@type': 'as.dto.sample.id.SamplePermId'}},
                    'childIds': {'permId': {'@type': 'as.dto.sample.id.SamplePermId'}},
                    'componentIds': {'permId': {'@type': 'as.dto.sample.id.SamplePermId'}},
    
                },
                "identifier": "sampleId",
                "cre_type": "as.dto.sample.create.SampleCreation",
    
                "multi": "parents children components tags attachments".split(),
    
            "SemanticAnnotation": {
                "attrs_new": "permId entityType propertyType predicateOntologyId predicateOntologyVersion predicateAccessionId descriptorOntologyId descriptorOntologyVersion descriptorAccessionId".split(),
                "attrs_up": "entityType propertyType predicateOntologyId predicateOntologyVersion predicateAccessionId descriptorOntologyId descriptorOntologyVersion descriptorAccessionId ".split(),
                "attrs": "permId entityType propertyType predicateOntologyId predicateOntologyVersion predicateAccessionId descriptorOntologyId descriptorOntologyVersion descriptorAccessionId creationDate".split(),
                "ids2type": {
                    "propertyTypeId": { 
                        "permId": "as.dto.property.id.PropertyTypePermId"
                    },
                    "entityTypeId": { 
                        "permId": "as.dto.entity.id.EntityTypePermId"
                    },
                },
                "identifier": "permId",
                "cre_type": "as.dto.sample.create.SampleCreation",
                "multi": "parents children components tags attachments".split(),
            },
    
                "attrs_new": "type experiment sample parents children components tags".split(),
                "attrs_up": "parents children experiment sample components tags".split(),
                "attrs": "code permId type experiment sample parents children components tags accessDate dataProducer dataProductionDate registrator registrationDate modifier modificationDate dataStore measured".split(),
    
                    'parentIds': {'permId': {'@type': 'as.dto.dataset.id.DataSetPermId'}},
                    'childIds': {'permId': {'@type': 'as.dto.dataset.id.DataSetPermId'}},
                    'componentIds': {'permId': {'@type': 'as.dto.dataset.id.DataSetPermId'}},
                    'containerIds': {'permId': {'@type': 'as.dto.dataset.id.DataSetPermId'}},
    
                "multi": "parents children container".split(),
    
            "Material": {
                "attrs_new": "code description type creation tags".split(),
    
                "attrs": "code description type creation registrator tags".split()
    
            },
            "Tag": {
                "attrs_new": "code description experiments samples dataSets materials".split(),
                "attrs": "code description experiments samples dataSets materials registrationDate".split(),
            },
    
            "Person": {
                "attrs_new": "userId space".split(),
                "attrs_up": "space".split(),
    
                "attrs": "permId userId firstName lastName email space registrationDate ".split(),
    
                "multi": "".split(),
                "identifier": "userId",
            },
            "AuthorizationGroup" : {
                "attrs_new": "code description userIds".split(),
    
                "attrs_up": "code description userIds".split(),
                "attrs": "permId code description registrator registrationDate modificationDate users".split(),
                "multi": "users".split(),
                "identifier": "groupId",
    
            "RoleAssignment" : {
                "attrs": "id user authorizationGroup role roleLevel space project registrator registrationDate".split(),
                "attrs_new": "role roleLevel user authorizationGroup role space project".split(),
            },
    
                "space": "spaceId",
                "project": "projectId",
                "sample": "sampleId",
                "samples": "sampleIds",
                "dataSet": "dataSetId",
                "dataSets": "dataSetIds",
                "experiment": "experimentId",
    
                "experiments": "experimentIds",
    
                "material": "materialId",
                "materials": "materialIds",
                "container": "containerId",
                "component": "componentId",
                "components": "componentIds",
                "parents": "parentIds",
                "children": "childIds",
                "tags": "tagIds",
    
                "userId": "userId",
                "users": "userIds",
    
                "description": "description",
    
                'spaceId': {'permId': {'@type': 'as.dto.space.id.SpacePermId'}},
                'projectId': {'permId': {'@type': 'as.dto.project.id.ProjectPermId'}},
                'experimentId': {'permId': {'@type': 'as.dto.experiment.id.ExperimentPermId'}},
                'tagIds': {'code': {'@type': 'as.dto.tag.id.TagCode'}},
    
        return entities[entity]
    
    def get_search_type_for_entity(entity, operator=None):
    
        """ Returns a dictionary containing the correct search criteria type
        for a given entity.
    
    
            get_search_type_for_entity('space')
            # returns:
    
            {'@type': 'as.dto.space.search.SpaceSearchCriteria'}
        """
        search_criteria = {
            "space": "as.dto.space.search.SpaceSearchCriteria",
            "userId": "as.dto.person.search.UserIdSearchCriteria",
            "email": "as.dto.person.search.EmailSearchCriteria",
            "firstName": "as.dto.person.search.FirstNameSearchCriteria",
            "lastName": "as.dto.person.search.LastNameSearchCriteria",
            "project": "as.dto.project.search.ProjectSearchCriteria",
            "experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
            "experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
            "sample": "as.dto.sample.search.SampleSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "dataset": "as.dto.dataset.search.DataSetSearchCriteria",
            "dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
            "external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
            "material": "as.dto.material.search.MaterialSearchCriteria",
            "material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
            "vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
            "tag": "as.dto.tag.search.TagSearchCriteria",
    
            "authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
    
            "roleAssignment": "as.dto.roleassignment.search.RoleAssignmentSearchCriteria",
    
            "person": "as.dto.person.search.PersonSearchCriteria",
            "code": "as.dto.common.search.CodeSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "global": "as.dto.global.GlobalSearchObject",
        }
    
    
        sc = { "@type": search_criteria[entity] }
        if operator is not None:
            sc["operator"] = operator
    
        return sc
    
    
    def get_attrs_for_entity(entity):
        """ For a given entity this method returns an iterator for all searchable
        attributes.
        """
        search_args = {
            "person": ['firstName','lastName','email','userId']
        }
        for search_arg in search_args[entity]:
            yield search_arg
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    fetch_option = {
    
        "space": {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"},
        "project": {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"},
    
        "person": {"@type": "as.dto.person.fetchoptions.PersonFetchOptions"},
    
        "users": {"@type": "as.dto.person.fetchoptions.PersonFetchOptions" },
    
        "user": {"@type": "as.dto.person.fetchoptions.PersonFetchOptions" },
        "authorizationGroup": {"@type": "as.dto.authorizationgroup.fetchoptions.AuthorizationGroupFetchOptions"},
    
            "@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
    
            "type": {"@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions"}
    
            "@type": "as.dto.sample.fetchoptions.SampleFetchOptions",
    
            "type": {"@type": "as.dto.sample.fetchoptions.SampleTypeFetchOptions"}
    
        "samples": {"@type": "as.dto.sample.fetchoptions.SampleFetchOptions"},
        "dataSets": {
    
            "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions",
    
            "properties": {"@type": "as.dto.property.fetchoptions.PropertyFetchOptions"},
            "type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"},
    
        "physicalData": {"@type": "as.dto.dataset.fetchoptions.PhysicalDataFetchOptions"},
    
        "linkedData": {
            "externalDms": {"@type": "as.dto.externaldms.fetchoptions.ExternalDmsFetchOptions"},
            "@type": "as.dto.dataset.fetchoptions.LinkedDataFetchOptions"
        },
    
        "roleAssignments": {
            "@type": "as.dto.roleassignment.fetchoptions.RoleAssignmentFetchOptions",
            "space": {
                "@type": "as.dto.space.fetchoptions.SpaceFetchOptions"
            }
        },
    
        "properties": {"@type": "as.dto.property.fetchoptions.PropertyFetchOptions"},
        "propertyAssignments": {
            "@type": "as.dto.property.fetchoptions.PropertyAssignmentFetchOptions",
    
                "@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions",
                "vocabulary": {
                    "@type": "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions",
                }
    
        "tags": {"@type": "as.dto.tag.fetchoptions.TagFetchOptions"},
    
        "registrator": {"@type": "as.dto.person.fetchoptions.PersonFetchOptions"},
        "modifier": {"@type": "as.dto.person.fetchoptions.PersonFetchOptions"},
        "leader": {"@type": "as.dto.person.fetchoptions.PersonFetchOptions"},
    
        "attachments": {"@type": "as.dto.attachment.fetchoptions.AttachmentFetchOptions"},
    
        "attachmentsWithContent": {
            "@type": "as.dto.attachment.fetchoptions.AttachmentFetchOptions",
            "content": {
                "@type": "as.dto.common.fetchoptions.EmptyFetchOptions"
            },
        },
    
        "history": {"@type": "as.dto.history.fetchoptions.HistoryEntryFetchOptions"},
        "dataStore": {"@type": "as.dto.datastore.fetchoptions.DataStoreFetchOptions"},
    
    def search_request_for_identifier(ident, entity):
    
            search_request = {
                "identifier": ident.upper(),
    
                "@type": "as.dto.{}.id.{}Identifier".format(entity.lower(), entity.capitalize())
    
                "@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity.capitalize())
    
    def get_search_criteria(entity, **search_args):
        search_criteria = get_search_type_for_entity(entity)
    
        criteria = []
        for attr in get_attrs_for_entity(entity):
            if attr in search_args:
                sub_crit = get_search_type_for_entity(attr)
                sub_crit['fieldValue'] = get_field_value_search(attr, search_args[attr])
                criteria.append(sub_crit)
    
        search_criteria['criteria'] = criteria
        search_criteria['operator'] = "AND"
    
        return search_criteria
    
    
    def extract_code(obj):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if not isinstance(obj, dict):
    
            return '' if obj is None else str(obj)
        return '' if obj['code'] is None else obj['code']
    
    def extract_deletion(obj):
        del_objs = []
        for deleted_object in obj['deletedObjects']:
            del_objs.append({
                "reason": obj['reason'],
                "permId": deleted_object["id"]["permId"],
                "type": deleted_object["id"]["@type"]
            })
        return del_objs
    
    
    def extract_identifier(ident):
    
        if not isinstance(ident, dict):
    
            return str(ident)
        return ident['identifier']
    
    
    def extract_nested_identifier(ident):
    
        if not isinstance(ident, dict):
    
            return str(ident)
        return ident['identifier']['identifier']
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def extract_permid(permid):
        if not isinstance(permid, dict):
            return str(permid)
        return permid['permId']
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    def extract_nested_permid(permid):
        if not isinstance(permid, dict):
    
            return '' if permid is None else str(permid)
        return '' if permid['permId']['permId'] is None else permid['permId']['permId'] 
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
    def extract_property_assignments(pas):
        pa_strings = []
        for pa in pas:
            if not isinstance(pa['propertyType'], dict):
                pa_strings.append(pa['propertyType'])
            else:
                pa_strings.append(pa['propertyType']['label'])
        return pa_strings
    
    
    def extract_role_assignments(ras):
        ra_strings = []
        for ra in ras:
            ra_strings.append({
                "role": ra['role'],
                "roleLevel": ra['roleLevel'],
                "space": ra['space']['code'] if ra['space'] else None
            })
        return ra_strings
    
    
    
    def extract_person(person):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if not isinstance(person, dict):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        return person['userId']
    
    def extract_person_details(person):
        if not isinstance(person, dict):
            return str(person)
        return "{} {} <{}>".format(
            person['firstName'],
            person['lastName'],
            person['email']
        )
    
    def extract_id(id):
        if not isinstance(id, dict):
            return str(id)
        else:
            return id['techId']
    
    def extract_userId(user):
        if isinstance(user, list):
            return ", ".join([
                u['userId'] for u in user
            ])
        elif isinstance(user, dict):
            return user['userId']
        else:
            return str(user)
    
    def crc32(fileName):
    
        """since Python3 the zlib module returns unsigned integers (2.7: signed int)
        """
    
        prev = 0
    
        for eachLine in open(fileName, "rb"):
    
            prev = zlib.crc32(eachLine, prev)
        # return as hex
    
    
    def _create_tagIds(tags=None):
        if tags is None:
            return None
    
        if not isinstance(tags, list):
            tags = [tags]
    
        tagIds = []
        for tag in tags:
    
            tagIds.append({
                "code": tag, 
                "@type": "as.dto.tag.id.TagCode"
            })
    
    def _tagIds_for_tags(tags=None, action='Add'):
    
        """creates an action item to add or remove tags. 
        Action is either 'Add', 'Remove' or 'Set'
    
        """
        if tags is None:
            return
        if not isinstance(tags, list):
            tags = [tags]
    
        items = []
        for tag in tags:
            items.append({
                "code": tag,
                "@type": "as.dto.tag.id.TagCode"
            })
    
        tagIds = {
            "actions": [
                {
                    "items": items,
                    "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
                }
            ],
            "@type": "as.dto.common.update.IdListUpdateValue"
        }
    
        return tagIds
    
    
    def _list_update(ids=None, entity=None, action='Add'):
        """creates an action item to add, set or remove ids. 
        """
        if ids is None:
            return
        if not isinstance(ids, list):
            ids = [ids]
    
        items = []
        for ids in ids:
            items.append({
                "code": ids,
                "@type": "as.dto.{}.id.{}Code".format(entity.lower(), entity)
            })
    
        list_update = {
            "actions": [
                {
                    "items": items,
                    "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
                }
            ],
            "@type": "as.dto.common.update.IdListUpdateValue"
        }
    
    
    def _create_typeId(type):
        return {
            "permId": type.upper(),
            "@type": "as.dto.entitytype.id.EntityTypePermId"
        }
    
    
    def _create_projectId(ident):
        match = re.match('/', ident)
        if match:
            return {
                "identifier": ident,
                "@type": "as.dto.project.id.ProjectIdentifier"
            }
        else:
    
                "permId": ident,
                "@type": "as.dto.project.id.ProjectPermId"
            }
    
    
    def _create_experimentId(ident):
        return {
            "identifier": ident,
            "@type": "as.dto.experiment.id.ExperimentIdentifier"
        }
    
    
    def get_field_value_search(field, value, comparison="StringEqualToValue"):
        return {
            "value": value,
            "@type": "as.dto.common.search.{}".format(comparison)
        }
    
    def _common_search(search_type, value, comparison="StringEqualToValue"):
        sreq = {
    
                "@type": "as.dto.common.search.{}".format(comparison)
            }
        }
    
    def _criteria_for_code(code):
        return {
            "fieldValue": {
    
                "@type": "as.dto.common.search.StringEqualToValue"
            },
            "@type": "as.dto.common.search.CodeSearchCriteria"
        }
    
    
    def _subcriteria_for_userId(userId):
        return {
              "criteria": [
                {
                  "fieldName": "userId",
                  "fieldType": "ATTRIBUTE",
                  "fieldValue": {
                    "value": userId,
                    "@type": "as.dto.common.search.StringEqualToValue"
                  },
                  "@type": "as.dto.person.search.UserIdSearchCriteria"
                }
              ],
              "@type": "as.dto.person.search.PersonSearchCriteria",
              "operator": "AND"
            }
    
    
    def _subcriteria_for_type(code, entity):
    
            "@type": "as.dto.{}.search.{}TypeSearchCriteria".format(entity.lower(), entity),
    
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldValue": {
                        "value": code.upper(),
                        "@type": "as.dto.common.search.StringEqualToValue"
                    }
    
    def _subcriteria_for_status(status_value):
        status_value = status_value.upper()
        valid_status = "AVAILABLE LOCKED ARCHIVED UNARCHIVE_PENDING ARCHIVE_PENDING BACKUP_PENDING".split()
        if not status_value in valid_status:
            raise ValueError("status must be one of the following: " + ", ".join(valid_status))
    
        return {
            "@type": "as.dto.dataset.search.PhysicalDataSearchCriteria",
            "operator": "AND",
            "criteria": [{
                "@type":
    
                    "as.dto.dataset.search.StatusSearchCriteria",
                "fieldName": "status",
    
                "fieldType": "ATTRIBUTE",
    
    def _gen_search_criteria(req):
    
        sreq = {}
        for key, val in req.items():
            if key == "criteria":
                items = []
                for item in req['criteria']:
    
                    items.append(_gen_search_criteria(item))
    
                sreq['criteria'] = items
            elif key == "code":
    
                sreq["criteria"] = [_common_search(
                    "as.dto.common.search.CodeSearchCriteria", val.upper()
                )]
            elif key == "identifier":
    
                if is_identifier(val):
                    # if we have an identifier, we need to search in Space and Code separately
                    si = split_identifier(val)
                    sreq["criteria"] = []
                    if "space" in si:
                        sreq["criteria"].append(
                            _gen_search_criteria({"space": "Space", "code": si["space"]})
                        )
                    if "experiment" in si:
                        pass
    
                    if "code" in si:
                        sreq["criteria"].append(
                            _common_search(
                                "as.dto.common.search.CodeSearchCriteria", si["code"].upper()
                            )
    
                elif is_permid(val):
                    sreq["criteria"] = [_common_search(
                        "as.dto.common.search.PermIdSearchCriteria", val
                    )]
                else:
                    # we assume we just got a code
                    sreq["criteria"] = [_common_search(
                        "as.dto.common.search.CodeSearchCriteria", val.upper()
                    )]
    
            else:
                sreq["@type"] = "as.dto.{}.search.{}SearchCriteria".format(key, val)
        return sreq
    
    
    def _subcriteria_for_tags(tags):
        if not isinstance(tags, list):
            tags = [tags]
    
        criterias = []
        for tag in tags:
            criterias.append({
                "fieldName": "code",
                "fieldType": "ATTRIBUTE",
                "fieldValue": {
                    "value": tag,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "@type": "as.dto.common.search.CodeSearchCriteria"
            })
    
        return {
            "@type": "as.dto.tag.search.TagSearchCriteria",
            "operator": "AND",
            "criteria": criterias
        }
    
    
    def _subcriteria_for_is_finished(is_finished):
        return {
            "@type": "as.dto.common.search.StringPropertySearchCriteria",
            "fieldName": "FINISHED_FLAG",
            "fieldType": "PROPERTY",
            "fieldValue": {
                "value": is_finished,
                "@type": "as.dto.common.search.StringEqualToValue"
            }
        }
    
    
    def _subcriteria_for_properties(prop, val):
        return {
            "@type": "as.dto.common.search.StringPropertySearchCriteria",
            "fieldName": prop.upper(),
            "fieldType": "PROPERTY",
            "fieldValue": {
                "value": val,
                "@type": "as.dto.common.search.StringEqualToValue"
            }
        }
    
    
    
    def _subcriteria_for_permid(permids, entity, parents_or_children='', operator='AND'):
    
        if not isinstance(permids, list):
            permids = [permids]
    
        criterias = []
        for permid in permids:
    
                "@type": "as.dto.common.search.PermIdSearchCriteria",
                "fieldValue": {
                    "value": permid,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "fieldType": "ATTRIBUTE",
                "fieldName": "code"
    
            "@type": "as.dto.{}.search.{}{}SearchCriteria".format(
    
                entity.lower(), entity, parents_or_children
    
            "operator": operator
    
    def _subcriteria_for_code(code, object_type):
    
        """ Creates the often used search criteria for code values. Returns a dictionary.
    
        Example::
            _subcriteria_for_code("username", "space")
    
    	{
    	    "criteria": [
    		{
    		    "fieldType": "ATTRIBUTE",
    		    "@type": "as.dto.common.search.CodeSearchCriteria",
    		    "fieldName": "code",
    		    "fieldValue": {
    			"@type": "as.dto.common.search.StringEqualToValue",
    			"value": "USERNAME"
    		    }
    		}
    	    ],
    	    "operator": "AND",
    	    "@type": "as.dto.space.search.SpaceSearchCriteria"
    	}
        """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if code is not None:
            if is_permid(code):
                fieldname = "permId"
                fieldtype = "as.dto.common.search.PermIdSearchCriteria"
            else:
                fieldname = "code"
                fieldtype = "as.dto.common.search.CodeSearchCriteria"
    
    
            search_criteria = get_search_type_for_entity(object_type.lower())
    
            search_criteria['criteria'] = [{
                "fieldName": fieldname,
                "fieldType": "ATTRIBUTE",
                "fieldValue": {
                    "value": code.upper(),
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "@type": fieldtype
            }]
            
            search_criteria["operator"] = "AND"
            return search_criteria
    
            return get_search_type_for_entity(object_type.lower())
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
        """Interface for communicating with openBIS. 
        A recent version of openBIS is required (minimum 16.05.2).
    
        For creation of datasets, dataset-uploader-api needs to be installed.
    
        def __init__(self, url=None, verify_certificates=True, token=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Initialize a new connection to an openBIS server.
    
            :param host:
    
    
            if url is None:
                try:
                    url = os.environ["OPENBIS_URL"]
                    token = os.environ["OPENBIS_TOKEN"] if "OPENBIS_TOKEN" in os.environ else None
                except KeyError:
                    raise ValueError("please provide a URL you want to connect to.")
    
    
            url_obj = urlparse(url)
    
                raise ValueError("please provide the url in this format: https://openbis.host.ch:8443")
    
            if url_obj.hostname is None:
                raise ValueError("hostname is missing")
    
            self.url = url_obj.geturl()
            self.port = url_obj.port
    
            self.hostname = url_obj.hostname
    
            self.as_v3 = '/openbis/openbis/rmi-application-server-v3.json'
            self.as_v1 = '/openbis/openbis/rmi-general-information-v1.json'
            self.reg_v1 = '/openbis/openbis/rmi-query-v1.json'
    
            self.dataset_types = None
            self.sample_types = None
    
            self.token_path = None
    
            # use an existing token, if available
            if self.token is None:
    
                self.token = self._get_cached_token()
    
            elif self.is_token_valid(token):
                pass
            else:
                print("Session is no longer valid. Please log in again.")
    
    
        def __dir__(self):
            return [
                'url', 'port', 'hostname',
                'login()', 'logout()', 'is_session_active()', 'token', 'is_token_valid("")',
                "get_dataset('permId')",
                "get_datasets()",
                "get_dataset_type('raw_data')",
                "get_dataset_types()",
                "get_datastores()",
                "get_deletions()",
                "get_experiment('permId', withAttachments=False)",
                "get_experiments()",
                "get_experiment_type('type')",
                "get_experiment_types()",
    
                "get_external_data_management_system(permId)",
    
                "get_material_type('type')",
                "get_material_types()",
                "get_project('project')",
                "get_projects(space=None, code=None)",
                "get_sample('id')",
    
                "get_object('id')", # "get_sample('id')" alias
    
                "get_objects()", # "get_samples()" alias
    
                "get_object_type(type))", # "get_sample_type(type))" alias
    
                "get_object_types()", # "get_sample_types()" alias
    
                "get_semantic_annotations()",
                "get_semantic_annotation(permId, only_data = False)",
    
                "get_spaces()",
                "get_tags()",
                "get_terms()",
    
                "new_person(userId, space)",
    
                "get_person(userId)",
                "get_groups()",
                "get_group(code)",
    
                "get_role_assignments()",
                "get_role_assignment(techId)",
    
                "new_group(code, description, userIds)",
    
                'new_project(space, code, description, attachments)',
    
                'new_experiment(type, code, project, props={})',
    
                'new_sample(type, space, project, experiment, parents)',
                'new_object(type, space, project, experiment, parents)', # 'new_sample(type, space, project, experiment)' alias
    
                'new_dataset(type, parent, experiment, sample, files=[], folder, props={})',
    
                'new_semantic_annotation(entityType, propertyType)',
    
                'update_sample(sampleId, space, project, experiment, parents, children, components, properties, tagIds, attachments)',
                'update_object(sampleId, space, project, experiment, parents, children, components, properties, tagIds, attachments)', # 'update_sample(sampleId, space, project, experiment, parents, children, components, properties, tagIds, attachments)' alias
    
        @property
        def spaces(self):
            return self.get_spaces()
    
        @property
        def projects(self):
            return self.get_projects()
    
    
        def _get_cached_token(self):
    
            """Read the token from the cache, and set the token ivar to it, if there, otherwise None.
            If the token is not valid anymore, delete it. 
            """
            token_path = self.gen_token_path()
    
        def gen_token_path(self, parent_folder=None):
            """generates a path to the token file.
            The token is usually saved in a file called
            ~/.pybis/hostname.token
            """
    
                # save token under ~/.pybis folder
                parent_folder = os.path.join(
                    os.path.expanduser("~"),
                    '.pybis'
                )
            path = os.path.join(parent_folder, self.hostname + '.token')
    
        def save_token(self, token=None, parent_folder=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ saves the session token to the disk, usually here: ~/.pybis/hostname.token. When a new Openbis instance is created, it tries to read this saved token by default.
    
            if token is None:
                token = self.token
    
            token_path = None;
            if parent_folder is None:
                token_path = self.gen_token_path()
            else:
                token_path = self.gen_token_path(parent_folder)
    
            # create the necessary directories, if they don't exist yet
    
            os.makedirs(os.path.dirname(token_path), exist_ok=True)
            with open(token_path, 'w') as f:
    
                f.write(token)
                self.token_path = token_path
    
        def delete_token(self, token_path=None):
    
            """ deletes a stored session token.
            """
    
            if token_path is None:
                token_path = self.token_path
            os.remove(token_path)
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _post_request(self, resource, request):
    
            """ internal method, used to handle all post requests and serializing / deserializing
            data
            """
    
            return self._post_request_full_url(urljoin(self.url,resource), request)
    
    
        def _post_request_full_url(self, full_url, request):
    
            """ internal method, used to handle all post requests and serializing / deserializing
            data
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "id" not in request:
    
                request["id"] = "2"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "jsonrpc" not in request:
                request["jsonrpc"] = "2.0"
    
            if request["params"][0] is None:
                raise ValueError("Your session expired, please log in again")
    
    
            if DEBUG_LEVEL >=LOG_DEBUG: print(json.dumps(request))
    
            resp = requests.post(
    
                verify=self.verify_certificates
            )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                resp = resp.json()
                if 'error' in resp:
    
                    if DEBUG_LEVEL >= LOG_ERROR: print(json.dumps(request))
    
                    raise ValueError(resp['error']['message'])
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                elif 'result' in resp:
                    return resp['result']
    
                else:
                    raise ValueError('request did not return either result nor error')
            else:
                raise ValueError('general error while performing post request')
    
    
        def logout(self):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ Log out of openBIS. After logout, the session token is no longer valid.
    
    
            logout_request = {
    
                "method": "logout",
                "params": [self.token],
    
            resp = self._post_request(self.as_v3, logout_request)
    
            return resp
    
        def login(self, username=None, password=None, save_token=False):
    
            Expects a username and a password and updates the token (session-ID).
            The token is then used for every request.