Skip to content
Snippets Groups Projects
pybis.py 155 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    """
    pybis.py
    
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    Work with openBIS from Python.
    
    import subprocess
    import errno
    
    import requests
    
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    
    import json
    import re
    
    from urllib.parse import urlparse, urljoin, quote
    
    import zlib
    
    from collections import namedtuple, defaultdict
    
    from texttable import Texttable
    
    from tabulate import tabulate
    
    from .utils import parse_jackson, check_datatype, split_identifier, format_timestamp, is_identifier, is_permid, nvl, VERBOSE
    
    from .utils import extract_attr, extract_permid, extract_code,extract_deletion,extract_identifier,extract_nested_identifier,extract_nested_permid, extract_nested_permids, extract_property_assignments,extract_role_assignments,extract_person, extract_person_details,extract_id,extract_userId
    
    from .entity_type import EntityType, SampleType, DataSetType, MaterialType, ExperimentType
    
    from .vocabulary import Vocabulary, VocabularyTerm
    
    from .openbis_object import OpenBisObject, Transaction
    
    from .definitions import openbis_definitions, get_definition_for_entity, fetch_option, get_fetchoption_for_entity, get_type_for_entity, get_method_for_entity, get_fetchoptions
    
    
    from .things import Things
    
    from .space import Space
    from .project import Project
    from .experiment import Experiment
    from .sample import Sample
    from .dataset import DataSet
    from .person import Person
    from .group import Group
    from .role_assignment import RoleAssignment
    from .tag import Tag
    from .semantic_annotation import SemanticAnnotation
    
    from pandas import DataFrame, Series
    
    LOG_NONE    = 0
    LOG_SEVERE  = 1
    LOG_ERROR   = 2
    LOG_WARNING = 3
    LOG_INFO    = 4
    LOG_ENTRY   = 5
    LOG_PARM    = 6
    LOG_DEBUG   = 7
    
    DEBUG_LEVEL = LOG_NONE
    
    
    
    def get_search_type_for_entity(entity, operator=None):
    
        """ Returns a dictionary containing the correct search criteria type
        for a given entity.
    
    
            get_search_type_for_entity('space')
            # returns:
    
            {'@type': 'as.dto.space.search.SpaceSearchCriteria'}
        """
        search_criteria = {
            "space": "as.dto.space.search.SpaceSearchCriteria",
            "userId": "as.dto.person.search.UserIdSearchCriteria",
            "email": "as.dto.person.search.EmailSearchCriteria",
            "firstName": "as.dto.person.search.FirstNameSearchCriteria",
            "lastName": "as.dto.person.search.LastNameSearchCriteria",
            "project": "as.dto.project.search.ProjectSearchCriteria",
            "experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
            "experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
            "sample": "as.dto.sample.search.SampleSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "dataset": "as.dto.dataset.search.DataSetSearchCriteria",
            "dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
            "external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
            "material": "as.dto.material.search.MaterialSearchCriteria",
            "material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
            "vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
            "tag": "as.dto.tag.search.TagSearchCriteria",
    
            "authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
    
            "person": "as.dto.person.search.PersonSearchCriteria",
            "code": "as.dto.common.search.CodeSearchCriteria",
            "sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
            "global": "as.dto.global.GlobalSearchObject",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            "plugin": "as.dto.plugin.search.PluginSearchCriteria",
    
            "propertyType": "as.dto.property.search.PropertyTypeSearchCriteria",
    
    
        sc = { "@type": search_criteria[entity] }
        if operator is not None:
            sc["operator"] = operator
    
        return sc
    
        ident = ident.strip()
    
        """Returns the data type for a given identifier/permId for use with the API call, e.g.
        {
            "identifier": "/DEFAULT/SAMPLE_NAME",
            "@type": "as.dto.sample.id.SampleIdentifier"
        }
        or
        {
            "permId": "20160817175233002-331",
            "@type": "as.dto.sample.id.SamplePermId"
        }
        """
    
        # Tags have strange permIds...
        if entity.lower() == 'tag':
            if '/' in ident:
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                if not ident.startswith('/'):
                    ident = '/'+ident
    
                return {
                    "permId": ident,
                    "@type" : "as.dto.tag.id.TagPermId"
                }
            else:
                return {
                    "code": ident,
                    "@type": "as.dto.tag.id.TagCode"
                }
    
    
        entities = {
            "sample"            : "Sample",
            "dataset"           : "DataSet",
            "experiment"        : "Experiment",
            "plugin"            : "Plugin",
            "space"             : "Space",
            "project"           : "Project",
            "semanticannotation": "SemanticAnnotation",
        }
    
        if entity.lower() in entities:
            entity_capitalize = entities[entity.lower()]
        else:
            entity_capitalize = entity.capitalize()
    
            # people tend to omit the / prefix of an identifier...
            if not ident.startswith('/'):
                ident = '/'+ident
    
            # ELN-LIMS style contains also experiment in sample identifer, i.e. /space/project/experiment/sample_code
            # we have to remove the experiment-code
            if ident.count('/') == 4:
                codes = ident.split('/')
                ident = '/'.join([codes[0], codes[1], codes[2], codes[4]])
    
    
            search_request = {
                "identifier": ident.upper(),
    
                "@type": "as.dto.{}.id.{}Identifier".format(entity.lower(), entity_capitalize)
    
                "@type": "as.dto.{}.id.{}PermId".format(entity.lower(), entity_capitalize)
    
    def get_search_criteria(entity, **search_args):
        search_criteria = get_search_type_for_entity(entity)
    
        criteria = []
    
        attrs = openbis_definitions(entity)['attrs']
        for attr in attrs:
    
            if attr in search_args:
                sub_crit = get_search_type_for_entity(attr)
                sub_crit['fieldValue'] = get_field_value_search(attr, search_args[attr])
                criteria.append(sub_crit)
    
        search_criteria['criteria'] = criteria
        search_criteria['operator'] = "AND"
    
        return search_criteria
    
    
    def crc32(fileName):
    
        """since Python3 the zlib module returns unsigned integers (2.7: signed int)
        """
    
        prev = 0
    
        for eachLine in open(fileName, "rb"):
    
            prev = zlib.crc32(eachLine, prev)
        # return as hex
    
    def _tagIds_for_tags(tags=None, action='Add'):
    
        """creates an action item to add or remove tags. 
        Action is either 'Add', 'Remove' or 'Set'
    
        """
        if tags is None:
            return
        if not isinstance(tags, list):
            tags = [tags]
    
        items = []
        for tag in tags:
            items.append({
                "code": tag,
                "@type": "as.dto.tag.id.TagCode"
            })
    
        tagIds = {
            "actions": [
                {
                    "items": items,
                    "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
                }
            ],
            "@type": "as.dto.common.update.IdListUpdateValue"
        }
    
        return tagIds
    
    
    def _list_update(ids=None, entity=None, action='Add'):
        """creates an action item to add, set or remove ids. 
        """
        if ids is None:
            return
        if not isinstance(ids, list):
            ids = [ids]
    
        items = []
        for ids in ids:
            items.append({
                "code": ids,
                "@type": "as.dto.{}.id.{}Code".format(entity.lower(), entity)
            })
    
        list_update = {
            "actions": [
                {
                    "items": items,
                    "@type": "as.dto.common.update.ListUpdateAction{}".format(action.capitalize())
                }
            ],
            "@type": "as.dto.common.update.IdListUpdateValue"
        }
    
    def get_field_value_search(field, value, comparison="StringEqualToValue"):
        return {
            "value": value,
            "@type": "as.dto.common.search.{}".format(comparison)
        }
    
    def _common_search(search_type, value, comparison="StringEqualToValue"):
        sreq = {
    
                "@type": "as.dto.common.search.{}".format(comparison)
            }
        }
    
    def _criteria_for_code(code):
        return {
            "fieldValue": {
    
                "@type": "as.dto.common.search.StringEqualToValue"
            },
            "@type": "as.dto.common.search.CodeSearchCriteria"
        }
    
    
    def _subcriteria_for_userId(userId):
        return {
              "criteria": [
                {
                  "fieldName": "userId",
                  "fieldType": "ATTRIBUTE",
                  "fieldValue": {
                    "value": userId,
                    "@type": "as.dto.common.search.StringEqualToValue"
                  },
                  "@type": "as.dto.person.search.UserIdSearchCriteria"
                }
              ],
              "@type": "as.dto.person.search.PersonSearchCriteria",
              "operator": "AND"
            }
    
    
    def _subcriteria_for_type(code, entity):
    
            "@type": "as.dto.{}.search.{}TypeSearchCriteria".format(entity.lower(), entity),
    
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldValue": {
                        "value": code.upper(),
                        "@type": "as.dto.common.search.StringEqualToValue"
                    }
    
    def _subcriteria_for_status(status_value):
        status_value = status_value.upper()
        valid_status = "AVAILABLE LOCKED ARCHIVED UNARCHIVE_PENDING ARCHIVE_PENDING BACKUP_PENDING".split()
        if not status_value in valid_status:
            raise ValueError("status must be one of the following: " + ", ".join(valid_status))
    
        return {
            "@type": "as.dto.dataset.search.PhysicalDataSearchCriteria",
            "operator": "AND",
            "criteria": [{
                "@type":
    
                    "as.dto.dataset.search.StatusSearchCriteria",
                "fieldName": "status",
    
                "fieldType": "ATTRIBUTE",
    
    def _gen_search_criteria(req):
    
        sreq = {}
        for key, val in req.items():
            if key == "criteria":
                items = []
                for item in req['criteria']:
    
                    items.append(_gen_search_criteria(item))
    
                sreq['criteria'] = items
            elif key == "code":
    
                sreq["criteria"] = [_common_search(
                    "as.dto.common.search.CodeSearchCriteria", val.upper()
                )]
            elif key == "identifier":
    
                if is_identifier(val):
                    # if we have an identifier, we need to search in Space and Code separately
                    si = split_identifier(val)
                    sreq["criteria"] = []
                    if "space" in si:
                        sreq["criteria"].append(
                            _gen_search_criteria({"space": "Space", "code": si["space"]})
                        )
                    if "experiment" in si:
                        pass
    
                    if "code" in si:
                        sreq["criteria"].append(
                            _common_search(
                                "as.dto.common.search.CodeSearchCriteria", si["code"].upper()
                            )
    
                elif is_permid(val):
                    sreq["criteria"] = [_common_search(
                        "as.dto.common.search.PermIdSearchCriteria", val
                    )]
                else:
                    # we assume we just got a code
                    sreq["criteria"] = [_common_search(
                        "as.dto.common.search.CodeSearchCriteria", val.upper()
                    )]
    
            else:
                sreq["@type"] = "as.dto.{}.search.{}SearchCriteria".format(key, val)
        return sreq
    
    
    def _subcriteria_for_tags(tags):
        if not isinstance(tags, list):
            tags = [tags]
    
    
                "fieldName": "code",
                "fieldType": "ATTRIBUTE",
                "fieldValue": {
                    "value": tag,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "@type": "as.dto.common.search.CodeSearchCriteria"
            })
    
        return {
            "@type": "as.dto.tag.search.TagSearchCriteria",
            "operator": "AND",
    
    def _subcriteria_for_is_finished(is_finished):
        return {
            "@type": "as.dto.common.search.StringPropertySearchCriteria",
            "fieldName": "FINISHED_FLAG",
            "fieldType": "PROPERTY",
            "fieldValue": {
                "value": is_finished,
                "@type": "as.dto.common.search.StringEqualToValue"
            }
        }
    
    
    def _subcriteria_for_properties(prop, value, entity):
    
        if isinstance(value, tuple):
            str_type = "as.dto.common.search.NumberPropertySearchCriteria"
            comparator, value = value
            if comparator == '>':
                eq_type = "as.dto.common.search.NumberGreaterThanValue"
            elif comparator == '>=':
                eq_type = "as.dto.common.search.NumberGreaterThanOrEqualToValue"
            elif comparator == '<':
                eq_type = "as.dto.common.search.NumberLessThanValue"
            elif comparator == '<=':
                eq_type = "as.dto.common.search.NumberLessThanOrEqualToValue"
            else:
                eq_type = "as.dto.common.search.NumberEqualToValue"
        else:
            eq_type = "as.dto.common.search.StringEqualToValue"
            str_type=  "as.dto.common.search.StringPropertySearchCriteria"
    
        fieldType = "PROPERTY"
    
        search_types = {
            'sample': {
                'parent'    : "as.dto.sample.search.SampleParentsSearchCriteria",
                'child'     : "as.dto.sample.search.SampleChildrenSearchCriteria",
                'container' : "as.dto.sample.search.SampleContainerSearchCriteria",
                'component' : "as.dto.sample.search.SampleContainerSearchCriteria",
            },
            'dataset': {
                'parent'    : "as.dto.dataset.search.DataSetParentsSearchCriteria",
                'child'     : "as.dto.dataset.search.DataSetChildrenSearchCriteria",
                'container' : "as.dto.dataset.search.DataSetContainerSearchCriteria",
            }
        }
    
    
    
        if 'date' in prop.lower() and re.search(r'\d+\-\d+\-\d+', value):
            
            if prop.lower() =='registrationdate':
                str_type = "as.dto.common.search.RegistrationDateSearchCriteria"
                fieldType = "ATTRIBUTE"
            elif prop.lower() =='modificationdate':
                str_type = "as.dto.common.search.ModificationDateSearchCriteria"
                fieldType = "ATTRIBUTE"
            else:
                str_type = "as.dto.common.search.DatePropertySearchCriteria"
    
            if value.startswith('>'):
                value = value[1:]
                eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
            elif value.startswith('<'):
                value = value[1:]
                eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
            else:
                eq_type = "as.dto.common.search.DateEqualToValue"
    
    
        # searching for parent/child/container identifier
        elif any(relation == prop.lower() for relation in ['parent','child','container']):
            relation=prop.lower()
            if is_identifier(value):
                identifier_search_type = "as.dto.common.search.IdentifierSearchCriteria"
            # find any parent, child or container
            elif value == '*':
                return {
                    "@type": search_types[entity][relation],
                    "criteria": [
                        {
                            "@type": "as.dto.common.search.AnyFieldSearchCriteria",
                            "fieldValue": {
                                "@type": "as.dto.common.search.AnyStringValue",
                            }
                        }
                    ]
                }
            elif is_permid(value):
                identifier_search_type = "as.dto.common.search.PermIdSearchCriteria"
            else: 
                identifier_search_type = "as.dto.common.search.CodeSearchCriteria"
            return {
                "@type": search_types[entity][relation],
                "criteria": [
                    {
                        "@type": identifier_search_type,
                        "fieldType": "ATTRIBUTE",
                        "fieldValue": {
                            "@type": "as.dto.common.search.StringEqualToValue",
                            "value": value,
                        }
                    }
                ]
            }
    
        # searching for parent/child/container property: 
        elif any(prop.lower().startswith(relation) for relation in ['parent_','child_','container_']):
            match = re.search(r'^(\w+?)_(.*)', prop.lower())
            if match:
                relation, property_name = match.groups()
                return {
                    "@type": search_types[entity][relation],
                    "criteria": [
                        {
                            "@type": str_type,
                            "fieldName": property_name.upper(),
                            "fieldType": "PROPERTY",
                            "fieldValue": {
                                "@type": eq_type,
                                "value": value,
                            }
                        }
                    ]
                }
        # searching for properties
        else:
            return {
                "@type": str_type,
                "fieldName": prop.upper(),
                "fieldType": fieldType,
                "fieldValue": {
                    "value": value,
                    "@type": eq_type
                }
    
    def _subcriteria_for(thing, entity, parents_or_children='', operator='AND'):
        """Returns the sub-search criteria for «thing», which can be either:
        - a python object (sample, dataSet, experiment)
        - a permId
        - an identifier
        - a code
        """
    
        if isinstance(thing, str):
            if is_permid(thing):
                return _subcriteria_for_permid(
                    thing, 
                    entity=entity,
                    parents_or_children=parents_or_children,
                    operator=operator
                )
            elif is_identifier(thing):
                return _subcriteria_for_identifier(
                    thing, 
                    entity=entity,
                    parents_or_children=parents_or_children,
                    operator=operator
                )
            else:
                # look for code
                return _subcriteria_for_code_new(
                    thing,
                    entity=entity,
                    parents_or_children=parents_or_children,
                    operator=operator
                )
    
        elif isinstance(thing, list):
            criteria = []
            for element in thing:
                crit = _subcriteria_for(element, entity, parents_or_children, operator)
                criteria += crit["criteria"]
    
            return {
                "criteria": criteria,
                "@type": crit["@type"],
                "operator": "OR"
            }
        elif thing is None:
            # we just need the type
            search_type = get_type_for_entity(entity, 'search', parents_or_children)
            return {
                "criteria": [],
                **search_type,
                "operator": operator
            }
        else:
            # we passed an object
            return _subcriteria_for_permid(
                thing.permId, 
                entity=entity,
                parents_or_children=parents_or_children,
                operator=operator
            )
            
    
    def _subcriteria_for_identifier(ids, entity, parents_or_children='', operator='AND'):
        if not isinstance(ids, list):
            ids = [ids]
    
        criteria = []
        for id in ids:
            criteria.append({
                "@type": "as.dto.common.search.IdentifierSearchCriteria",
                "fieldValue": {
                    "value": id,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "fieldType": "ATTRIBUTE",
                "fieldName": "identifier"
            })
    
        search_type = get_type_for_entity(entity, 'search', parents_or_children)
        return {
            "criteria": criteria,
            **search_type,
            "operator": operator
        }
        return criteria
    
    
    def _subcriteria_for_permid(permids, entity, parents_or_children='', operator='AND'):
    
        if not isinstance(permids, list):
            permids = [permids]
    
    
                "@type": "as.dto.common.search.PermIdSearchCriteria",
                "fieldValue": {
                    "value": permid,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "fieldType": "ATTRIBUTE",
                "fieldName": "code"
    
        search_type = get_type_for_entity(entity, 'search', parents_or_children)
        return {
            "criteria": criteria,
            **search_type,
            "operator": operator
        }
    
    
    def _subcriteria_for_code_new(codes, entity, parents_or_children='', operator='AND'):
        if not isinstance(codes, list):
            codes = [codes]
    
        criteria = []
        for code in codes:
            criteria.append({
                "@type": "as.dto.common.search.CodeSearchCriteria",
                "fieldValue": {
                    "value": code,
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "fieldType": "ATTRIBUTE",
                "fieldName": "code"
            })
    
        search_type = get_type_for_entity(entity, 'search', parents_or_children)
        return {
            "criteria": criteria,
            **search_type,
    
            "operator": operator
    
    def _subcriteria_for_code(code, entity):
    
        """ Creates the often used search criteria for code values. Returns a dictionary.
    
        Example::
            _subcriteria_for_code("username", "space")
    
    
        {
            "criteria": [
                {
                    "fieldType": "ATTRIBUTE",
                    "@type": "as.dto.common.search.CodeSearchCriteria",
                    "fieldName": "code",
                    "fieldValue": {
                        "@type": "as.dto.common.search.StringEqualToValue",
                        "value": "USERNAME"
                    }
                }
            ],
            "operator": "AND",
            "@type": "as.dto.space.search.SpaceSearchCriteria"
        }
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        if code is not None:
            if is_permid(code):
                fieldname = "permId"
                fieldtype = "as.dto.common.search.PermIdSearchCriteria"
            else:
                fieldname = "code"
                fieldtype = "as.dto.common.search.CodeSearchCriteria"
    
    
             
            #search_criteria = get_search_type_for_entity(entity.lower())
            search_criteria = get_type_for_entity(entity, 'search')
    
            search_criteria['criteria'] = [{
                "fieldName": fieldname,
                "fieldType": "ATTRIBUTE",
                "fieldValue": {
                    "value": code.upper(),
                    "@type": "as.dto.common.search.StringEqualToValue"
                },
                "@type": fieldtype
            }]
            
            search_criteria["operator"] = "AND"
            return search_criteria
    
            return get_type_for_entity(entity, 'search')
            #return get_search_type_for_entity(entity.lower())
    
    Swen Vermeul's avatar
    Swen Vermeul committed
    
    
        """Interface for communicating with openBIS. 
    
    
        Note:
            * A recent version of openBIS is required (minimum 16.05.2).
            * For creation of datasets, the dataset-uploader-api ingestion plugin must be present.
    
    
        def __init__(self, url=None, verify_certificates=True, token=None,
    
            allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks=False,
            token_path=None
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """Initialize a new connection to an openBIS server.
    
    
            Examples:
                o = Openbis('https://openbis.example.com')
                o_test = Openbis('https://test_openbis.example.com:8443', verify_certificates=False)
    
            Args:
                url (str): https://openbis.example.com
                verify_certificates (bool): set to False when you use self-signed certificates
                token (str): a valid openBIS token. If not set, pybis will try to read a valid token from ~/.pybis
    
                token_path: a path to a file which contains an openBIS token
    
                use_cache: make openBIS to store spaces, projects, sample types, vocabulary terms and oder more-or-less static objects to optimise speed
    
                allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks (bool): False
    
            self.as_v3 = '/openbis/openbis/rmi-application-server-v3.json'
            self.as_v1 = '/openbis/openbis/rmi-general-information-v1.json'
            self.reg_v1 = '/openbis/openbis/rmi-query-v1.json'
            self.verify_certificates = verify_certificates
    
    
                url = os.environ.get("OPENBIS_URL") or os.environ.get('OPENBIS_HOST')
                if url is None:
    
                    raise ValueError("please provide a URL you want to connect to.")
    
    
            if not url.startswith('http'):
                url = 'https://'+url
    
            url_obj = urlparse(url)
    
            if url_obj.netloc is None or url_obj.netloc == '':
    
                raise ValueError("please provide the url in this format: https://openbis.host.ch:8443")
    
            if url_obj.hostname is None:
                raise ValueError("hostname is missing")
    
            if url_obj.scheme == 'http' and not allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks:
    
            self.url = url_obj.geturl()
            self.port = url_obj.port
    
            self.hostname = url_obj.hostname
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            self.download_prefix = os.path.join('data', self.hostname)
    
            self.use_cache = use_cache
            self.cache = {}
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            self.server_information = None
    
            self.token_path = token_path or self.gen_token_path()
            self.token = token or os.environ.get("OPENBIS_TOKEN") or self._get_saved_token()
    
            if self.is_token_valid(self.token):
    
                print("Session is no longer valid. Please log in again.")
    
    
        def _get_username(self):
            if self.token:
    
                match = re.search(r'(?P<username>.*)-.*', self.token)
                username = match.groupdict()['username']
    
                'url', 'port', 'hostname', 'token',
                'login()', 
                'logout()', 
                'is_session_active()', 
                'is_token_valid()',
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "download_prefix",
                "get_mountpoint()",
    
                "get_server_information()",
    
                "get_dataset_type()",
    
                "get_dataset_types()",
                "get_datastores()",
    
                "get_experiment_type()",
    
                "get_collection_type()",
    
                "get_external_data_management_systems()",
    
                "get_external_data_management_system()",
                "get_material_type()",
    
                "get_project()",
                "get_projects()",
                "get_sample()",
                "get_object()",
    
                "get_objects()",
                "get_sample_type()",
                "get_object_type()",
    
                "get_object_types()",
    
                "get_property_types()",
                "get_property_type()",
    
                "get_semantic_annotation()",
                "get_space()",
    
                "get_tag()",
                "new_tag()",
    
                "get_term()",
                "get_vocabularies()",
                "get_vocabulary()",
    
                "get_role_assignment()",
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                "get_plugins()",
    
                "get_plugin()",
                "new_plugin()",
                "new_group()",
                'new_space()',
                'new_project()',
                'new_experiment()',
                'new_collection()',
                'new_sample()',
                'new_object()',
    
                'new_sample_type()',
                'new_object_type()',
    
                'new_dataset()',
                'new_dataset_type()',
                'new_experiment_type()',
                'new_collection_type()',
                'new_material_type()',
                'new_semantic_annotation()',
    
                'new_transaction()',
    
                'update_sample()',
                'update_object()', 
    
        def _repr_html_(self):
            html = """
                <table border="1" class="dataframe">
                <thead>
                    <tr style="text-align: right;">
                    <th>attribute</th>
                    <th>value</th>
                    </tr>
                </thead>
                <tbody>
            """
    
            attrs = ['url', 'port', 'hostname', 'verify_certificates', 'as_v3', 'as_v1', 'reg_v1', 'token']
            for attr in attrs:
                html += "<tr> <td>{}</td> <td>{}</td> </tr>".format(
                    attr, getattr(self, attr, '')
                )
    
            html += """
                </tbody>
                </table>
            """
            return html
    
    
    
        @property
        def spaces(self):
            return self.get_spaces()
    
        @property
        def projects(self):
            return self.get_projects()
    
    
        def gen_token_path(self, parent_folder=None):
            """generates a path to the token file.
            The token is usually saved in a file called
            ~/.pybis/hostname.token
            """
    
                # save token under ~/.pybis folder
                parent_folder = os.path.join(
                    os.path.expanduser("~"),
                    '.pybis'
                )
            path = os.path.join(parent_folder, self.hostname + '.token')
    
        def _save_token(self, token=None, parent_folder=None):
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            """ saves the session token to the disk, usually here: ~/.pybis/hostname.token. When a new Openbis instance is created, it tries to read this saved token by default.
    
            if token is None:
                token = self.token
    
            token_path = None;
            if parent_folder is None:
                token_path = self.gen_token_path()
            else:
                token_path = self.gen_token_path(parent_folder)
    
            # create the necessary directories, if they don't exist yet
    
            os.makedirs(os.path.dirname(token_path), exist_ok=True)
            with open(token_path, 'w') as f:
    
        def _get_saved_token(self):
            """Read the token from the .pybis 
            If the token is not valid anymore, delete it. 
    
            token_path = self.token_path or self.gen_token_path()
            if not os.path.exists(token_path):
                return None
            try:
                with open(token_path) as f:
                    token = f.read()
                    if token == "":
                        return None
                    else:
                        return token
            except FileNotFoundError:
                return None
    
        def _delete_saved_token(self):
            if self.token_path:
                try:
                    os.remove(self.token_path)
                except FileNotFoundError:
                    return None
    
    Swen Vermeul's avatar
    Swen Vermeul committed
        def _post_request(self, resource, request):
    
            """ internal method, used to handle all post requests and serializing / deserializing
            data
            """
    
            return self._post_request_full_url(urljoin(self.url,resource), request)
    
    
        def _post_request_full_url(self, full_url, request):
    
            """ internal method, used to handle all post requests and serializing / deserializing
            data
            """
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "id" not in request:
    
                request["id"] = "2"
    
    Swen Vermeul's avatar
    Swen Vermeul committed
            if "jsonrpc" not in request:
                request["jsonrpc"] = "2.0"
    
            if request["params"][0] is None:
                raise ValueError("Your session expired, please log in again")
    
    
            if DEBUG_LEVEL >=LOG_DEBUG: print(json.dumps(request))
    
            resp = requests.post(
    
                verify=self.verify_certificates
            )
    
    Swen Vermeul's avatar
    Swen Vermeul committed
                resp = resp.json()
                if 'error' in resp: