"README.md" did not exist on "6956f1bf8dfebedd710ddcc51b041788dd2c0ebb"
Newer
Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-
Adam Laskowski
committed
# Copyright ETH 2018 - 2023 Zürich, Scientific IT Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
pybis.py
"""
Swen Vermeul
committed
import json
import os
import re
import subprocess
import time
import zlib
Swen Vermeul
committed
from urllib.parse import quote, urljoin, urlparse
import requests
import urllib3
from pandas import DataFrame
Swen Vermeul
committed
from . import data_set as pbds
Swen Vermeul
committed
get_definition_for_entity,
Swen Vermeul
committed
get_method_for_entity,
get_type_for_entity,
openbis_definitions,
Swen Vermeul
committed
EntityType,
Swen Vermeul
committed
MaterialType,
SampleType,
Swen Vermeul
committed
from .group import Group
from .openbis_object import OpenBisObject, Transaction
Swen Vermeul
committed
from .person import Person
from .project import Project
from .role_assignment import RoleAssignment
from .sample import Sample
from .semantic_annotation import SemanticAnnotation
from .space import Space
from .tag import Tag
from .things import Things
Swen Vermeul
committed
VERBOSE,
extract_attr,
extract_code,
extract_deletion,
extract_id,
extract_identifier,
extract_identifiers,
extract_nested_identifier,
extract_nested_permid,
extract_nested_permids,
extract_permid,
extract_person,
extract_userId,
extract_username_from_token,
Swen Vermeul
committed
is_number,
Swen Vermeul
committed
parse_jackson,
split_identifier,
Swen Vermeul
committed
from .vocabulary import Vocabulary, VocabularyTerm
Swen Vermeul
committed
# import the various openBIS entities
LOG_WARNING = 3
LOG_INFO = 4
LOG_ENTRY = 5
LOG_PARM = 6
LOG_DEBUG = 7
PYBIS_FOLDER = Path.home() / ".pybis"
CONFIG_FILENAME = ".pybis.json"
DEBUG_LEVEL = LOG_NONE
vkovtun
committed
def now():
vkovtun
committed
return time.time()
def get_search_type_for_entity(entity, operator=None):
"""Returns a dictionary containing the correct search criteria type
get_search_type_for_entity('space')
# returns:
{'@type': 'as.dto.space.search.SpaceSearchCriteria'}
"""
search_criteria = {
"personalAccessToken": "as.dto.pat.search.PersonalAccessTokenSearchCriteria",
"space": "as.dto.space.search.SpaceSearchCriteria",
"userId": "as.dto.person.search.UserIdSearchCriteria",
"email": "as.dto.person.search.EmailSearchCriteria",
"firstName": "as.dto.person.search.FirstNameSearchCriteria",
"lastName": "as.dto.person.search.LastNameSearchCriteria",
"project": "as.dto.project.search.ProjectSearchCriteria",
"experiment": "as.dto.experiment.search.ExperimentSearchCriteria",
"experiment_type": "as.dto.experiment.search.ExperimentTypeSearchCriteria",
"sample": "as.dto.sample.search.SampleSearchCriteria",
"sample_type": "as.dto.sample.search.SampleTypeSearchCriteria",
"dataset": "as.dto.dataset.search.DataSetSearchCriteria",
"dataset_type": "as.dto.dataset.search.DataSetTypeSearchCriteria",
"external_dms": "as.dto.externaldms.search.ExternalDmsSearchCriteria",
"material": "as.dto.material.search.MaterialSearchCriteria",
"material_type": "as.dto.material.search.MaterialTypeSearchCriteria",
"vocabulary_term": "as.dto.vocabulary.search.VocabularyTermSearchCriteria",
"tag": "as.dto.tag.search.TagSearchCriteria",
"authorizationGroup": "as.dto.authorizationgroup.search.AuthorizationGroupSearchCriteria",
"person": "as.dto.person.search.PersonSearchCriteria",
"code": "as.dto.common.search.CodeSearchCriteria",
"global": "as.dto.global.GlobalSearchObject",
"propertyType": "as.dto.property.search.PropertyTypeSearchCriteria",
if operator is not None:
sc["operator"] = operator
return sc
def is_session_token(token: str):
return not token.startswith("$pat")
def is_personal_access_token(token: str):
return token.startswith("$pat")
def get_saved_tokens():
tokens = {}
for filepath in PYBIS_FOLDER.glob("*.token"):
with open(filepath) as fh:
if filepath.is_file:
token = fh.read()
tokens[filepath.stem] = token
def get_token_for_hostname(hostname, session_token_needed=True):
"""Searches for a stored token for a given host in this order:
~/.pybis/hostname.token
"""
tokens = get_saved_tokens()
if hostname in tokens:
if session_token_needed:
if is_session_token(tokens[hostname]):
return tokens[hostname]
else:
return tokens[hostname]
return
def save_pats_to_disk(hostname: str, url: str, resp: dict) -> None:
pats = resp["objects"]
parse_jackson(pats)
path = PYBIS_FOLDER / hostname
path.mkdir(exist_ok=True)
for existing_file in path.glob("*.pat"):
existing_file.unlink()
for token in pats:
data = {
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"hostname": hostname,
"owner": token["owner"]["userId"],
"registrationDate": format_timestamp(token["owner"]["registrationDate"]),
"validFromDate": format_timestamp(token["validFromDate"]),
"validToDate": format_timestamp(token["validToDate"]),
"sessionName": token["sessionName"],
"permId": token["permId"]["permId"],
}
with open(path / (token["hash"] + ".pat"), "w", encoding="utf-8") as fh:
fh.write(json.dumps(data, indent=4))
def get_saved_pats(hostname=None, sessionName=None):
"""return all personal access tokens stored on disk."""
if hostname is None:
hostname = ""
path = PYBIS_FOLDER / hostname
tokens = []
for filepath in path.rglob("*.pat"):
with open(filepath) as fh:
if filepath.is_file:
pat = json.load(fh)
if sessionName:
if pat["sessionName"] != sessionName:
continue
tokens.append(pat)
return tokens
Swen Vermeul
committed
def _type_for_id(ident, entity):
Swen Vermeul
committed
"""Returns the data type for a given identifier/permId for use with the API call, e.g.
{
"identifier": "/DEFAULT/SAMPLE_NAME",
"@type": "as.dto.sample.id.SampleIdentifier"
}
or
{
"permId": "20160817175233002-331",
"@type": "as.dto.sample.id.SamplePermId"
}
"""
if entity.lower() == "tag":
if "/" in ident:
if not ident.startswith("/"):
ident = "/" + ident
return {"permId": ident, "@type": "as.dto.tag.id.TagPermId"}
return {"code": ident, "@type": "as.dto.tag.id.TagCode"}
if entity == "personalAccessToken":
return {"permId": ident, "@type": "as.dto.pat.id.PersonalAccessTokenPermId"}
Swen Vermeul
committed
entities = {
"sample": "Sample",
"dataset": "DataSet",
"experiment": "Experiment",
"plugin": "Plugin",
"space": "Space",
"project": "Project",
Swen Vermeul
committed
"semanticannotation": "SemanticAnnotation",
}
search_request = {}
Swen Vermeul
committed
if entity.lower() in entities:
entity_capitalize = entities[entity.lower()]
else:
entity_capitalize = entity.capitalize()
Swen Vermeul
committed
if is_identifier(ident):
# people tend to omit the / prefix of an identifier...
if not ident.startswith("/"):
ident = "/" + ident
Swen Vermeul
committed
# ELN-LIMS style contains also experiment in sample identifer, i.e. /space/project/experiment/sample_code
# we have to remove the experiment-code
if ident.count("/") == 4:
codes = ident.split("/")
ident = "/".join([codes[0], codes[1], codes[2], codes[4]])
Swen Vermeul
committed
search_request = {
"identifier": ident.upper(),
"@type": f"as.dto.{entity.lower()}.id.{entity_capitalize}Identifier",
}
else:
search_request = {
"permId": ident,
"@type": f"as.dto.{entity.lower()}.id.{entity_capitalize}PermId",
}
return search_request
def get_search_criteria(entity, **search_args):
search_criteria = get_search_type_for_entity(entity)
criteria = []
for attr in attrs:
if attr in search_args:
sub_crit = get_search_type_for_entity(attr)
sub_crit["fieldValue"] = get_field_value_search(attr, search_args[attr])
criteria.append(sub_crit)
search_criteria["criteria"] = criteria
search_criteria["operator"] = "AND"
return search_criteria
"""since Python3 the zlib module returns unsigned integers (2.7: signed int)"""
for eachLine in open(fileName, "rb"):
prev = zlib.crc32(eachLine, prev)
# return as hex
return "%x" % (prev & 0xFFFFFFFF)
def _tagIds_for_tags(tags=None, action="Add"):
"""creates an action item to add or remove tags.
Swen Vermeul
committed
Action is either 'Add', 'Remove' or 'Set'
"""
if tags is None:
return
if not isinstance(tags, list):
tags = [tags]
items = list(map(lambda tag: {"code": tag, "@type": "as.dto.tag.id.TagCode"}, tags))
tagIds = {
"actions": [
{
"items": items,
"@type": f"as.dto.common.update.ListUpdateAction{action.capitalize()}",
}
],
"@type": "as.dto.common.update.IdListUpdateValue",
}
def _list_update(ids=None, entity=None, action="Add"):
"""creates an action item to add, set or remove ids."""
if ids is None:
return
if not isinstance(ids, list):
ids = [ids]
items = list(
map(
lambda id: {
"code": id,
"@type": f"as.dto.{entity.lower()}.id.{entity}Code",
},
ids,
)
)
list_update = {
"actions": [
{
"items": items,
"@type": f"as.dto.common.update.ListUpdateAction{action.capitalize()}",
}
],
"@type": "as.dto.common.update.IdListUpdateValue",
}
return list_update
def get_field_value_search(field, value, comparison="StringEqualToValue"):
return {"value": value, "@type": f"as.dto.common.search.{comparison}"}
def _common_search(search_type, value, comparison="StringEqualToValue"):
sreq = {
"@type": search_type,
"fieldValue": {
"value": value,
"@type": f"as.dto.common.search.{comparison}",
return sreq
def _criteria_for_code(code):
return {
"fieldValue": {
"value": code.upper(),
"@type": "as.dto.common.search.StringEqualToValue",
"@type": "as.dto.common.search.CodeSearchCriteria",
def _criteria_for_permId(permId):
return {
"fieldName": "perm_id",
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": permId,
"@type": "as.dto.common.search.StringEqualToValue",
"@type": "as.dto.common.search.PermIdSearchCriteria",
def _subcriteria_for_userId(userId):
return {
"fieldName": "userId",
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": userId,
"@type": "as.dto.common.search.StringEqualToValue",
"@type": "as.dto.person.search.UserIdSearchCriteria",
],
"@type": "as.dto.person.search.PersonSearchCriteria",
def _subcriteria_for_type(code, entity):
return {
"@type": f"as.dto.{entity.lower()}.search.{entity}TypeSearchCriteria",
"criteria": [
{
"@type": "as.dto.common.search.CodeSearchCriteria",
"fieldValue": {
"value": code.upper(),
"@type": "as.dto.common.search.StringEqualToValue",
},
}
}
def _subcriteria_for_status(status_value):
status_value = status_value.upper()
valid_status = "AVAILABLE LOCKED ARCHIVED UNARCHIVE_PENDING ARCHIVE_PENDING BACKUP_PENDING".split()
if not status_value in valid_status:
"status must be one of the following: " + ", ".join(valid_status)
)
return {
"@type": "as.dto.dataset.search.PhysicalDataSearchCriteria",
"operator": "AND",
"criteria": [
{
"@type": "as.dto.dataset.search.StatusSearchCriteria",
"fieldName": "status",
"fieldType": "ATTRIBUTE",
"fieldValue": status_value,
}
],
sreq = {}
for key, val in req.items():
if key == "criteria":
sreq["criteria"] = list(
map(lambda item: _gen_search_criteria(item), req["criteria"])
)
elif key == "code":
sreq["criteria"] = [
_common_search("as.dto.common.search.CodeSearchCriteria", val.upper())
]
elif key == "identifier":
if is_identifier(val):
# if we have an identifier, we need to search in Space and Code separately
si = split_identifier(val)
sreq["criteria"] = []
if "space" in si:
sreq["criteria"].append(
_gen_search_criteria({"space": "Space", "code": si["space"]})
)
if "experiment" in si:
pass
if "code" in si:
sreq["criteria"].append(
_common_search(
"as.dto.common.search.CodeSearchCriteria",
si["code"].upper(),
sreq["criteria"] = [
_common_search("as.dto.common.search.PermIdSearchCriteria", val)
]
else:
# we assume we just got a code
sreq["criteria"] = [
_common_search(
"as.dto.common.search.CodeSearchCriteria", val.upper()
)
]
elif key == "operator":
sreq["operator"] = val.upper()
else:
sreq["@type"] = f"as.dto.{key}.search.{val}SearchCriteria"
return sreq
def _subcriteria_for_tags(tags):
if not isinstance(tags, list):
tags = [tags]
criteria = list(
map(
lambda tag: {
"fieldName": "code",
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": tag,
"@type": "as.dto.common.search.StringEqualToValue",
},
"@type": "as.dto.common.search.CodeSearchCriteria",
},
tags,
)
)
return {
"@type": "as.dto.tag.search.TagSearchCriteria",
"operator": "AND",
}
def _subcriteria_for_is_finished(is_finished):
return {
"@type": "as.dto.common.search.StringPropertySearchCriteria",
"fieldName": "FINISHED_FLAG",
"fieldType": "PROPERTY",
"fieldValue": {
"value": is_finished,
"@type": "as.dto.common.search.StringEqualToValue",
},
}
Swen Vermeul
committed
def _subcriteria_for_properties(prop, value, entity):
"""This internal method creates the JSON RPC criterias for searching
in properties. It distinguishes between numbers, dates and strings
and uses the comparative operator (< > >= <=), if available.
creationDate and modificationDate attributes can be searched as well.
To search in the properties of parents, children, etc. the user has to
prefix the propery accordingly:
- parent_propertyName
- child_propertyName
- container_propertyName
"""
if "*" in str(value):
additional_attr["useWildcards"] = True
Swen Vermeul
committed
search_types = {
"sample": {
"parent": "as.dto.sample.search.SampleParentsSearchCriteria",
"parents": "as.dto.sample.search.SampleParentsSearchCriteria",
"child": "as.dto.sample.search.SampleChildrenSearchCriteria",
"children": "as.dto.sample.search.SampleChildrenSearchCriteria",
"container": "as.dto.sample.search.SampleContainerSearchCriteria",
},
"dataset": {
"parent": "as.dto.dataset.search.DataSetParentsSearchCriteria",
"parents": "as.dto.dataset.search.DataSetParentsSearchCriteria",
"child": "as.dto.dataset.search.DataSetChildrenSearchCriteria",
"children": "as.dto.dataset.search.DataSetChildrenSearchCriteria",
"container": "as.dto.dataset.search.DataSetContainerSearchCriteria",
Swen Vermeul
committed
},
}
# default values of fieldType, str_type and eq_type
fieldType = "PROPERTY"
eq_type = "as.dto.common.search.StringEqualToValue"
str_type = "as.dto.common.search.StringPropertySearchCriteria"
if "date" in prop.lower() and re.search(r"\d{4}\-\d{2}\-\d{2}", value):
is_date = True
eq_type = "as.dto.common.search.DateEqualToValue"
str_type = "as.dto.common.search.RegistrationDateSearchCriteria"
fieldType = "ATTRIBUTE"
str_type = "as.dto.common.search.ModificationDateSearchCriteria"
fieldType = "ATTRIBUTE"
else:
str_type = "as.dto.common.search.DatePropertySearchCriteria"
if any(str(value).startswith(operator) for operator in [">", "<", "="]):
match = re.search(
r"""
^
(?P<comp_operator>\>\=|\>|\<\=|\<|\=\=|\=) # extract the comparative operator
\s*
(?P<value>.*) # extract the value
""",
comp_operator = match.groupdict()["comp_operator"]
value = match.groupdict()["value"]
eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
eq_type = "as.dto.common.search.DateLaterThanOrEqualToValue"
eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
eq_type = "as.dto.common.search.DateEarlierThanOrEqualToValue"
else:
eq_type = "as.dto.common.search.DateEqualToValue"
# numeric comparison
elif is_number(value):
str_type = "as.dto.common.search.NumberPropertySearchCriteria"
eq_type = "as.dto.common.search.NumberGreaterThanValue"
eq_type = "as.dto.common.search.NumberGreaterThanOrEqualToValue"
eq_type = "as.dto.common.search.NumberLessThanValue"
eq_type = "as.dto.common.search.NumberLessThanOrEqualToValue"
else:
eq_type = "as.dto.common.search.NumberEqualToValue"
# string comparison
else:
eq_type = "as.dto.common.search.StringGreaterThanValue"
eq_type = "as.dto.common.search.StringGreaterThanOrEqualToValue"
eq_type = "as.dto.common.search.StringLessThanValue"
eq_type = "as.dto.common.search.StringLessThanOrEqualToValue"
eq_type = "as.dto.common.search.StringEqualToValue"
else:
eq_type = "as.dto.common.search.StringEqualToValue"
Swen Vermeul
committed
# searching for parent/child/container identifier
Adam Laskowski
committed
relation == prop.lower()
for relation in [
"parent",
"child",
"container",
"parents",
"children",
"containers",
]
Swen Vermeul
committed
if is_identifier(value):
identifier_search_type = "as.dto.common.search.IdentifierSearchCriteria"
# find any parent, child, container
Swen Vermeul
committed
return {
"@type": search_types[entity][relation],
"criteria": [
{
"@type": "as.dto.common.search.AnyFieldSearchCriteria",
"fieldValue": {
"@type": "as.dto.common.search.AnyStringValue",
Swen Vermeul
committed
}
Swen Vermeul
committed
}
elif is_permid(value):
identifier_search_type = "as.dto.common.search.PermIdSearchCriteria"
Swen Vermeul
committed
identifier_search_type = "as.dto.common.search.CodeSearchCriteria"
return {
"@type": search_types[entity][relation],
"criteria": [
{
"@type": identifier_search_type,
"fieldType": "ATTRIBUTE",
"fieldValue": {
"@type": "as.dto.common.search.StringEqualToValue",
"value": value,
Swen Vermeul
committed
}
Swen Vermeul
committed
}
Adam Laskowski
committed
prop.lower().startswith(relation)
for relation in ["parent_", "child_", "container_"]
):
match = re.search(r"^(\w+?)_(.*)", prop.lower())
Swen Vermeul
committed
if match:
relation, property_name = match.groups()
return {
"@type": search_types[entity][relation],
"criteria": [
{
"@type": str_type,
"fieldName": property_name.upper(),
Swen Vermeul
committed
"fieldValue": {
"@type": eq_type,
"value": value,
Swen Vermeul
committed
}
Swen Vermeul
committed
}
Swen Vermeul
committed
# searching for properties
if prop.startswith("_"):
fieldName = "$" + prop[1:]
else:
fieldName = prop
"fieldName": fieldName.upper(),
"fieldValue": {"value": value, "@type": eq_type},
**additional_attr,
def _subcriteria_for(thing, entity, parents_or_children="", operator="AND"):
"""Returns the sub-search criteria for «thing», which can be either:
- a python object (sample, dataSet, experiment)
- a permId
- an identifier
- a code
"""
subcrit = _subcriteria_for(thing, new_entity)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": subcrit, **search_type, "operator": operator}
if isinstance(thing, str):
if is_permid(thing):
return _subcriteria_for_permid(
entity=entity,
parents_or_children=parents_or_children,
)
elif is_identifier(thing):
return _subcriteria_for_identifier(
entity=entity,
parents_or_children=parents_or_children,
)
else:
# look for code
return _subcriteria_for_code_new(
thing,
entity=entity,
parents_or_children=parents_or_children,
)
elif isinstance(thing, list):
criteria = []
for element in thing:
crit = _subcriteria_for(element, entity, parents_or_children, operator)
criteria += crit["criteria"]
return {"criteria": criteria, "@type": crit["@type"], "operator": "OR"}
elif thing is None:
# we just need the type
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": [], **search_type, "operator": operator}
else:
# we passed an object
return _subcriteria_for_permid(
entity=entity,
parents_or_children=parents_or_children,
def _subcriteria_for_identifier(ids, entity, parents_or_children="", operator="AND"):
if not isinstance(ids, list):
ids = [ids]
criteria = list(
map(
lambda id: {
"@type": "as.dto.common.search.IdentifierSearchCriteria",
"fieldValue": {
"value": id,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "identifier",
},
ids,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_permid(permids, entity, parents_or_children="", operator="AND"):
if not isinstance(permids, list):
permids = [permids]
criteria = list(
map(
lambda permid: {
"@type": "as.dto.common.search.PermIdSearchCriteria",
"fieldValue": {
"value": permid,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "perm_id",
},
permids,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_permid_new(codes, entity, parents_or_children="", operator="AND"):
if not isinstance(codes, list):
codes = [codes]
criteria = list(
map(
lambda code: {
"@type": "as.dto.common.search.PermIdSearchCriteria",
"fieldValue": {
"value": code,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "perm_id",
},
codes,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_code_new(codes, entity, parents_or_children="", operator="AND"):
if not isinstance(codes, list):
codes = [codes]
criteria = list(
map(
lambda code: {
"@type": "as.dto.common.search.CodeSearchCriteria",
"fieldValue": {
"value": code,
"@type": "as.dto.common.search.StringEqualToValue",
},
"fieldType": "ATTRIBUTE",
"fieldName": "code",
},
codes,
)
)
search_type = get_type_for_entity(entity, "search", parents_or_children)
return {"criteria": criteria, **search_type, "operator": operator}
def _subcriteria_for_code(code, entity):
"""Creates the often used search criteria for code values. Returns a dictionary.
Example::
_subcriteria_for_code("username", "space")
{
"criteria": [
{
"fieldType": "ATTRIBUTE",
"@type": "as.dto.common.search.CodeSearchCriteria",
"fieldName": "code",
"fieldValue": {
"@type": "as.dto.common.search.StringEqualToValue",
"value": "USERNAME"
}
}
],
"operator": "AND",
"@type": "as.dto.space.search.SpaceSearchCriteria"
}
if code is not None:
if is_permid(code):
fieldname = "permId"
fieldtype = "as.dto.common.search.PermIdSearchCriteria"
else:
fieldname = "code"
fieldtype = "as.dto.common.search.CodeSearchCriteria"
# search_criteria = get_search_type_for_entity(entity.lower())
search_criteria = get_type_for_entity(entity, "search")
search_criteria["criteria"] = [
{
"fieldName": fieldname,
"fieldType": "ATTRIBUTE",
"fieldValue": {
"value": code.upper(),
"@type": "as.dto.common.search.StringEqualToValue",
},
"@type": fieldtype,
}
]
search_criteria["operator"] = "AND"
return search_criteria
class Openbis:
Swen Vermeul
committed
Note:
* A recent version of openBIS is required (minimum 16.05.2).
* For creation of datasets, the dataset-uploader-api ingestion plugin must be present.
Adam Laskowski
committed
self,
url=None,
verify_certificates=True,
token=None,
use_cache=True,
allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks=False,
"""Initialize a new connection to an openBIS server.
Swen Vermeul
committed
Examples:
o = Openbis('https://openbis.example.com')
o_test = Openbis('https://test_openbis.example.com:8443', verify_certificates=False)
Args:
url (str): https://openbis.example.com
verify_certificates (bool): set to False when you use self-signed certificates
token (str): a valid openBIS token. If not set, pybis will try to read a valid token from ~/.pybis
use_cache: make openBIS to store spaces, projects, sample types, vocabulary terms and oder more-or-less static objects to optimise speed
Swen Vermeul
committed
allow_http_but_do_not_use_this_in_production_and_only_within_safe_networks (bool): False
"""
self.as_v3 = "/openbis/openbis/rmi-application-server-v3.json"
self.as_v1 = "/openbis/openbis/rmi-general-information-v1.json"
self.reg_v1 = "/openbis/openbis/rmi-query-v1.json"
self.verify_certificates = verify_certificates
if not verify_certificates:
urllib3.disable_warnings()
if url is None:
url = os.environ.get("OPENBIS_URL") or os.environ.get("OPENBIS_HOST")
raise ValueError("please provide a URL you want to connect to.")
if not url.startswith("http"):
url = "https://" + url
Swen Vermeul
committed
if url_obj.netloc is None or url_obj.netloc == "":
"please provide the url in this format: https://openbis.host.ch:8443"
)
Swen Vermeul
committed
if url_obj.hostname is None: