Newer
Older
self._object_cache(entity="term", code=vocabulary, value=things)
return things
def new_term(self, code, vocabularyCode, label=None, description=None):
return VocabularyTerm(
self,
data=None,
code=code,
vocabularyCode=vocabularyCode,
label=label,
description=description,
Swen Vermeul
committed
def get_term(self, code, vocabularyCode, only_data=False):
search_request = {
"code": code,
"vocabularyCode": vocabularyCode,
"@type": "as.dto.vocabulary.id.VocabularyTermPermId",
fetchopts = get_fetchoption_for_entity("vocabularyTerm")
for opt in ["registrator"]:
fetchopts[opt] = get_fetchoption_for_entity(opt)
"method": "getVocabularyTerms",
"params": [self.token, [search_request], fetchopts],
}
resp = self._post_request(self.as_v3, request)
if resp is None or len(resp) == 0:
raise ValueError(
"no VocabularyTerm found with code='{}' and vocabularyCode='{}'".format(
code, vocabularyCode
)
)
else:
parse_jackson(resp)
for ident in resp:
if only_data:
return resp[ident]
else:
return VocabularyTerm(self, resp[ident])
def get_vocabularies(self, code=None, start_with=None, count=None):
sub_criteria = []
if code:
sub_criteria.append(_criteria_for_code(code))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.vocabulary.search.VocabularySearchCriteria",
fetchopts = fetch_option["vocabulary"]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["registrator"]:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchVocabularies",
}
resp = self._post_request(self.as_v3, request)
attrs = "code description managedInternally chosenFromList urlTemplate registrator registrationDate modificationDate".split()
vocs = DataFrame(columns=attrs)
else:
parse_jackson(resp)
vocs = DataFrame(objects)
vocs["registrationDate"] = vocs["registrationDate"].map(format_timestamp)
vocs["modificationDate"] = vocs["modificationDate"].map(format_timestamp)
vocs["registrator"] = vocs["registrator"].map(extract_person)
def get_vocabulary(self, code, only_data=False, use_cache=True):
"""Returns the details of a given vocabulary (including vocabulary terms)"""
code = str(code).upper()
voc = (
not only_data
and use_cache
and self._object_cache(entity="vocabulary", code=code)
)
if voc:
return voc
entity = "vocabulary"
method_name = get_method_for_entity(entity, "get")
objectIds = _type_for_id(code.upper(), entity)
fetchopts = fetch_option[entity]
request = {
"method": method_name,
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("no {} found with identifier: {}".format(entity, code))
else:
parse_jackson(resp)
for ident in resp:
data = resp[ident]
if only_data:
return data
vocabulary = Vocabulary(openbis_obj=self, data=data)
if self.use_cache:
self._object_cache(entity="vocabulary", code=code, value=vocabulary)
return vocabulary
def new_tag(self, code, description=None):
return Tag(self, code=code, description=description)
def get_tags(self, code=None, start_with=None, count=None):
search_criteria = get_search_type_for_entity("tag", "AND")
criteria = []
fetchopts = fetch_option["tag"]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["owner"]:
if code:
criteria.append(_criteria_for_code(code))
request = {
"method": "searchTags",
"params": [self.token, search_criteria, fetchopts],
}
resp = self._post_request(self.as_v3, request)
return self._tag_list_for_response(
response=resp["objects"], totalCount=resp["totalCount"]
)
Swen Vermeul
committed
def get_tag(self, permId, only_data=False, use_cache=True):
Swen Vermeul
committed
just_one = True
identifiers = []
if isinstance(permId, list):
just_one = False
for ident in permId:
tag = (
not only_data
and use_cache
and self._object_cache(entity="tag", code=permId)
)
if tag:
return tag
fetchopts = fetch_option["tag"]
for option in ["owner"]:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
request = {
"method": "getTags",
Swen Vermeul
committed
}
resp = self._post_request(self.as_v3, request)
if just_one:
if len(resp) == 0:
raise ValueError("no such tag found: {}".format(permId))
parse_jackson(resp)
for permId in resp:
if only_data:
return resp[permId]
else:
tag = Tag(self, data=resp[permId])
if self.use_cache:
self._object_cache(entity="tag", code=permId, value=tag)
return tag
return self._tag_list_for_response(response=list(resp.values()))
def _tag_list_for_response(self, response, totalCount=0):
attrs = [
"permId",
"code",
"description",
"owner",
"private",
"registrationDate",
]
tags["registrationDate"] = tags["registrationDate"].map(format_timestamp)
tags["permId"] = tags["permId"].map(extract_permid)
tags["description"] = tags["description"].map(
lambda x: "" if x is None else x
)
tags["owner"] = tags["owner"].map(extract_person)
def search_semantic_annotations(
self, permId=None, entityType=None, propertyType=None, only_data=False
):
"""Get a list of semantic annotations for permId, entityType, propertyType or

yvesn
committed
property type assignment (DataFrame object).
:param permId: permId of the semantic annotation.
:param entityType: entity (sample) type to search for.
:param propertyType: property type to search for
:param only_data: return result as plain data object.
:return: Things of DataFrame objects or plain data object
"""
criteria = []
typeCriteria = []
if permId is not None:
criteria.append(
{
"@type": "as.dto.common.search.PermIdSearchCriteria",
"fieldValue": {
"@type": "as.dto.common.search.StringEqualToValue",
"value": permId,
},
if entityType is not None:
typeCriteria.append(
{
"@type": "as.dto.entitytype.search.EntityTypeSearchCriteria",
"criteria": [_criteria_for_code(entityType)],
}
)
if propertyType is not None:
typeCriteria.append(
{
"@type": "as.dto.property.search.PropertyTypeSearchCriteria",
"criteria": [_criteria_for_code(propertyType)],
}
)
if entityType is not None and propertyType is not None:
criteria.append(
{
"@type": "as.dto.property.search.PropertyAssignmentSearchCriteria",
"criteria": typeCriteria,
}
)
else:
criteria += typeCriteria
saCriteria = {
"@type": "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria",
}
objects = self._search_semantic_annotations(saCriteria)
if only_data:
return objects
Swen Vermeul
committed
attrs = [
"permId",
"entityType",
"propertyType",
"predicateOntologyId",
"predicateOntologyVersion",
"predicateAccessionId",
"descriptorOntologyId",
"descriptorOntologyVersion",
"descriptorAccessionId",
"creationDate",
]
Swen Vermeul
committed
if len(objects) == 0:
annotations = DataFrame(columns=attrs)
else:
annotations = DataFrame(objects)
Swen Vermeul
committed
def _search_semantic_annotations(self, criteria):
fetch_options = {
"@type": "as.dto.semanticannotation.fetchoptions.SemanticAnnotationFetchOptions",
"entityType": {
"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
},
"propertyType": {
"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
},
"propertyAssignment": {
"@type": "as.dto.property.fetchoptions.PropertyAssignmentFetchOptions",
"entityType": {
"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
"propertyType": {
"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
}
request = {
"method": "searchSemanticAnnotations",
}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
return []
else:
parse_jackson(objects)
obj["permId"] = obj["permId"]["permId"]
if obj.get("entityType") is not None:
obj["entityType"] = obj["entityType"]["code"]
elif obj.get("propertyType") is not None:
obj["propertyType"] = obj["propertyType"]["code"]
elif obj.get("propertyAssignment") is not None:
obj["entityType"] = obj["propertyAssignment"]["entityType"]["code"]
obj["propertyType"] = obj["propertyAssignment"]["propertyType"][
"code"
]
obj["creationDate"] = format_timestamp(obj["creationDate"])
def get_semantic_annotations(self):
"""Get a list of all available semantic annotations (DataFrame object)."""
objects = self._search_semantic_annotations(
{
"@type": "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria"
}
)
attrs = [
"permId",
"entityType",
"propertyType",
"predicateOntologyId",
"predicateOntologyVersion",
"predicateAccessionId",
"descriptorOntologyId",
"descriptorOntologyVersion",
"descriptorAccessionId",
"creationDate",
]
Swen Vermeul
committed
if len(objects) == 0:
annotations = DataFrame(columns=attrs)
else:
annotations = DataFrame(objects)
def get_semantic_annotation(self, permId, only_data=False):
objects = self.search_semantic_annotations(permId=permId, only_data=True)
if len(objects) == 0:
"Semantic annotation with permId " + permId + " not found."
)
obj = objects[0]
if only_data:
return obj
return SemanticAnnotation(self, isNew=False, **obj)
def get_plugins(self, start_with=None, count=None):
search_criteria = get_search_type_for_entity("plugin", "AND")
search_criteria["criteria"] = criteria
fetchopts = fetch_option["plugin"]
for option in ["registrator"]:
fetchopts["from"] = start_with
fetchopts["count"] = count
request = {
"method": "searchPlugins",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = [
"name",
"description",
"pluginType",
"pluginKind",
"entityKinds",
"registrator",
"registrationDate",
"permId",
]
parse_jackson(objects)
plugins = DataFrame(objects)
plugins["permId"] = plugins["permId"].map(extract_permid)
plugins["registrator"] = plugins["registrator"].map(extract_person)
plugins["registrationDate"] = plugins["registrationDate"].map(
format_timestamp
)
plugins["description"] = plugins["description"].map(
lambda x: "" if x is None else x
)
plugins["entityKinds"] = plugins["entityKinds"].map(
lambda x: "" if x is None else x
)
def get_plugin(self, permId, only_data=False, with_script=True):
search_request = _type_for_id(permId, "plugin")
fetchopts = fetch_option["plugin"]
options = ["registrator"]
for option in options:
fetchopts[option] = fetch_option[option]
request = {
"method": "getPlugins",
"params": [self.token, [search_request], fetchopts],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if resp is None or len(resp) == 0:
raise ValueError("no such plugin found: " + permId)
else:
for permId in resp:
if only_data:
return resp[permId]
else:
return Plugin(self, data=resp[permId])
def new_plugin(self, name, pluginType, **kwargs):
name -- name of the plugin
description --
pluginType -- DYNAMIC_PROPERTY, MANAGED_PROPERTY, ENTITY_VALIDATION
entityKind -- MATERIAL, EXPERIMENT, SAMPLE, DATA_SET
script -- string of the script itself
available --
return Plugin(self, name=name, pluginType=pluginType, **kwargs)
def new_property_type(
self,
code,
label,
description,
dataType,
managedInternally=False,
vocabulary=None,
materialType=None,
schema=None,
transformation=None,
code -- name of the property type
managedInternally -- must be set to True if code starts with a $
label -- displayed label of that property
description --
dataType -- must contain any of these values:
INTEGER VARCHAR MULTILINE_VARCHAR
REAL TIMESTAMP BOOLEAN HYPERLINK
XML CONTROLLEDVOCABULARY MATERIAL
vocabulary -- if dataType is CONTROLLEDVOCABULARY, this attribute
must contain the code of the vocabulary object.
materialType --
schema --
transformation --
metaData -- used to create properties that contain either RichText or tabular, spreadsheet-like data.
use {'custom_widget' : 'Word Processor'} and MULTILINE_VARCHAR for RichText
use {'custom_widget' : 'Spreadhseet'} and XML for tabular data.
PropertyTypes can be assigned to
- sampleTypes
- dataSetTypes
- experimentTypes
- materialTypes (deprecated)
"""
return PropertyType(
openbis_obj=self,
code=code,
label=label,
description=description,
dataType=dataType,
managedInternally=managedInternally,
vocabulary=vocabulary,
materialType=materialType,
schema=schema,
transformation=transformation,
def get_property_type(
self, code, only_data=False, start_with=None, count=None, use_cache=True
):
if not isinstance(code, list) and start_with is None and count is None:
code = str(code).upper()
pt = (
use_cache
and self.use_cache
and self._object_cache(entity="property_type", code=code)
if pt:
if only_data:
return pt.data
else:
return pt
identifiers = []
only_one = False
if not isinstance(code, list):
code = [code]
only_one = True
for c in code:
identifiers.append(
{"permId": c.upper(), "@type": "as.dto.property.id.PropertyTypePermId"}
)
fetchopts = fetch_option["propertyType"]
options = ["vocabulary", "materialType", "semanticAnnotations", "registrator"]
for option in options:
fetchopts[option] = fetch_option[option]
request = {
"method": "getPropertyTypes",
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if only_one:
if len(resp) == 0:
raise ValueError("no such propertyType: {}".format(code))
if only_data:
return resp[ident]
else:
pt = PropertyType(openbis_obj=self, data=resp[ident])
if self.use_cache:
entity="property_type", code=code[0], value=pt
)
return pt
# return a list of objects
else:
return self._property_type_things(
objects=list(resp.values()),
start_with=start_with,
count=count,
totalCount=len(resp),
def get_property_types(self, code=None, start_with=None, count=None):
fetchopts = fetch_option["propertyType"]
fetchopts["from"] = start_with
fetchopts["count"] = count
search_criteria = get_search_criteria("propertyType", code=code)
request = {
"method": "searchPropertyTypes",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(objects)
return self._property_type_things(
objects=objects,
def _property_type_things(
self, objects, start_with=None, count=None, totalCount=None
):
"""takes a list of objects and returns a Things object"""
attrs = openbis_definitions("propertyType")["attrs"]
if len(objects) == 0:
df = DataFrame(columns=attrs)
else:
df = DataFrame(objects)
df["registrationDate"] = df["registrationDate"].map(format_timestamp)
df["registrator"] = df["registrator"].map(extract_person)
df["vocabulary"] = df["vocabulary"].map(extract_code)
df["semanticAnnotations"] = df["semanticAnnotations"].map(
extract_nested_permids
)
return Things(
single_item_method=self.get_property_type,
df=df[attrs],
start_with=start_with,
count=count,
totalCount=totalCount,
)
def get_material_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available material types"""
return self.get_entity_types(
cls=MaterialType,
type=type,
start_with=start_with,
)
def get_material_type(self, type, only_data=False):
return self.get_entity_type(
cls=MaterialType,
identifier=type,
method=self.get_material_type,
only_data=only_data,
)
def get_experiment_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available experiment types"""
return self.get_entity_types(
cls=ExperimentType,
type=type,
start_with=start_with,
)
get_collection_types = get_experiment_types # Alias
def get_experiment_type(self, type, only_data=False):
return self.get_entity_type(
cls=ExperimentType,
identifier=type,
method=self.get_experiment_type,
only_data=only_data,
)
get_collection_type = get_experiment_type # Alias
def get_dataset_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available dataSet types"""
return self.get_entity_types(
cls=DataSetType,
type=type,
start_with=start_with,
)
def get_dataset_type(self, type, only_data=False):
return self.get_entity_type(
identifier=type,
cls=DataSetType,
method=self.get_dataset_type,
only_data=only_data,
)
def get_sample_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available sample types"""
return self.get_entity_types(
cls=SampleType,
type=type,
start_with=start_with,
)
def get_sample_type(self, type, only_data=False, with_vocabulary=False):
identifier=type,
cls=SampleType,
with_vocabulary=with_vocabulary,
method=self.get_sample_type,
only_data=only_data,
)
def get_entity_types(
self, entity, cls, type=None, start_with=None, count=None, with_vocabulary=False
):
method_name = get_method_for_entity(entity, "search")
if type is not None:
search_request = _subcriteria_for_code(type, entity)
else:
search_request = get_type_for_entity(entity, "search")
fetch_options = get_fetchoption_for_entity(entity)
fetch_options["from"] = start_with
fetch_options["count"] = count
request = {
"method": method_name,
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
entity_types = []
defs = get_definition_for_entity(entity)
attrs = defs["attrs"]
if len(resp["objects"]) == 0:
entity_types = DataFrame(columns=attrs)
else:
parse_jackson(objects)
entity_types = DataFrame(objects)
entity_types["permId"] = entity_types["permId"].map(extract_permid)
entity_types["modificationDate"] = entity_types["modificationDate"].map(
format_timestamp
)
entity_types["validationPlugin"] = entity_types["validationPlugin"].map(
extract_nested_permid
)
single_item_method = getattr(self, cls._single_item_method_name)
return Things(
openbis_obj=self,
entity=entity,
df=entity_types[attrs],
start_with=start_with,
single_item_method=single_item_method,
count=count,
)
def get_entity_type(
self,
entity,
identifier,
cls,
method=None,
only_data=False,
with_vocabulary=False,
et = (
not only_data
and not isinstance(identifier, list)
and self._object_cache(entity=entity, code=identifier)
)
if et:
return et
method_name = get_method_for_entity(entity, "get")
fetch_options = get_fetchoption_for_entity(entity)
if with_vocabulary:
fetch_options["propertyAssignments"]["propertyType"]["vocabulary"] = {
"@type": "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions",
"terms": {
"@type": "as.dto.vocabulary.fetchoptions.VocabularyTermFetchOptions"
if not isinstance(identifier, list):
identifier = [identifier]
identifiers.append(
{
"permId": ident,
"@type": "as.dto.entitytype.id.EntityTypePermId",
}
)
request = {
"method": method_name,
"params": [self.token, identifiers, fetch_options],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if len(identifiers) == 1:
if len(resp) == 0:
raise ValueError("no such {}: {}".format(entity, identifier[0]))
for ident in resp:
if only_data:
return resp[ident]
else:
obj = cls(
openbis_obj=self,
data=resp[ident],
method=method,
if self.use_cache:
self._object_cache(entity=entity, code=ident, value=obj)
return obj
self,
method_name,
entity,
type_name=None,
start_with=None,
count=None,
additional_attributes=None,
optional_attributes=None,
"""Returns a list of all available types of an entity.
If the name of the entity-type is given, it returns a PropertyAssignments object
if additional_attributes is None:
additional_attributes = []
if optional_attributes is None:
optional_attributes = []
"@type": "as.dto.{}.search.{}TypeSearchCriteria".format(
entity.lower(), entity
)
"@type": "as.dto.{}.fetchoptions.{}TypeFetchOptions".format(
entity.lower(), entity
)
fetch_options["from"] = start_with
fetch_options["count"] = count
search_request = _gen_search_criteria(
{entity.lower(): entity + "Type", "operator": "AND", "code": type_name}
)
fetch_options["propertyAssignments"] = fetch_option["propertyAssignments"]
if self.get_server_information().api_version > "3.3":
fetch_options["validationPlugin"] = fetch_option["plugin"]
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if type_name is not None:
if len(resp["objects"]) == 1:
return EntityType(openbis_obj=self, data=resp["objects"][0])
elif len(resp["objects"]) == 0:
raise ValueError("No such {} type: {}".format(entity, type_name))
"There is more than one entry for entity={} and type={}".format(
entity, type_name
)
)
Swen Vermeul
committed
types = []
type_name, types, additional_attributes, optional_attributes
)
if len(resp["objects"]) == 0:
Swen Vermeul
committed
types = DataFrame(columns=attrs)
Swen Vermeul
committed
parse_jackson(objects)
types = DataFrame(objects)
types["modificationDate"] = types["modificationDate"].map(format_timestamp)
df=types[attrs],
start_with=start_with,
count=count,
Swen Vermeul
committed
def _get_attributes(
self, type_name, types, additional_attributes, optional_attributes
):
attributes = ["code", "description"] + additional_attributes
attributes += [
attribute for attribute in optional_attributes if attribute in types
]
attributes += ["modificationDate"]
Yves Noirjean
committed
if type_name is not None:
Yves Noirjean
committed
return attributes
Swen Vermeul
committed
def is_session_active(self):
"""checks whether a session is still active. Returns true or false."""
Swen Vermeul
committed
return self.is_token_valid(self.token)
Swen Vermeul
committed
def is_token_valid(self, token=None):
Chandrasekhar Ramakrishnan
committed
"""Check if the connection to openBIS is valid.
This method is useful to check if a token is still valid or if it has timed out,
requiring the user to login again.
Chandrasekhar Ramakrishnan
committed
:return: Return True if the token is valid, False if it is not valid.
"""
if token is None:
token = self.token
if token is None:
return False
request = {
"method": "isSessionActive",
"params": [token],
Swen Vermeul
committed
try:
resp = self._post_request(self.as_v1, request)
Swen Vermeul
committed
return False
def set_token(self, token, save_token=True):
"""Checks the validity of a token, sets it as the current token and (by default) saves it
to the disk, i.e. in the ~/.pybis directory
"""
if not self.is_token_valid(token):
raise ValueError("session token seems not to be valid.")
if os.environ.get("OPENBIS_URL") == self.url:
os.environ["OPENBIS_TOKEN"] = self.token
Swen Vermeul
committed
def get_dataset(self, permIds, only_data=False, props=None, **kvals):