Newer
Older
fetchopts["from"] = start_with
fetchopts["count"] = count
request = {
"method": "searchVocabularyTerms",
"params": [self.token, search_request, fetchopts],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = "code vocabularyCode label description registrationDate modificationDate official ordinal".split()
vkovtun
committed
objects = response["objects"]
if len(objects) == 0:
terms = DataFrame(columns=attrs)
else:
parse_jackson(objects)
terms = DataFrame(objects)
terms["vocabularyCode"] = terms["permId"].map(
extract_attr("vocabularyCode")
)
terms["registrationDate"] = terms["registrationDate"].map(
format_timestamp
)
terms["modificationDate"] = terms["modificationDate"].map(
format_timestamp
)
vkovtun
committed
return terms[attrs]
entity="term",
identifier_name="code",
additional_identifier="vocabularyCode",
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
if (
self.use_cache
and vocabulary is not None
and start_with is None
and count is None
):
self._object_cache(entity="term", code=vocabulary, value=things)
return things
def new_term(self, code, vocabularyCode, label=None, description=None):
return VocabularyTerm(
self,
data=None,
code=code,
vocabularyCode=vocabularyCode,
label=label,
description=description,
Swen Vermeul
committed
def get_term(self, code, vocabularyCode, only_data=False):
search_request = {
"code": code,
"vocabularyCode": vocabularyCode,
"@type": "as.dto.vocabulary.id.VocabularyTermPermId",
fetchopts = get_fetchoption_for_entity("vocabularyTerm")
for opt in ["registrator"]:
fetchopts[opt] = get_fetchoption_for_entity(opt)
"method": "getVocabularyTerms",
"params": [self.token, [search_request], fetchopts],
}
resp = self._post_request(self.as_v3, request)
if resp is None or len(resp) == 0:
raise ValueError(
"no VocabularyTerm found with code='{}' and vocabularyCode='{}'".format(
code, vocabularyCode
)
)
else:
parse_jackson(resp)
for ident in resp:
if only_data:
return resp[ident]
else:
return VocabularyTerm(self, resp[ident])
def get_vocabularies(self, code=None, start_with=None, count=None):
sub_criteria = []
if code:
sub_criteria.append(_criteria_for_code(code))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.vocabulary.search.VocabularySearchCriteria",
fetchopts = fetch_option["vocabulary"]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["registrator"]:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchVocabularies",
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = "code description managedInternally chosenFromList urlTemplate registrator registrationDate modificationDate".split()
vkovtun
committed
objects = response["objects"]
if len(objects) == 0:
vocs = DataFrame(columns=attrs)
else:
parse_jackson(response)
vocs = DataFrame(objects)
vocs["registrationDate"] = vocs["registrationDate"].map(
format_timestamp
)
vocs["modificationDate"] = vocs["modificationDate"].map(
format_timestamp
)
vkovtun
committed
vocs["registrator"] = vocs["registrator"].map(extract_person)
return vocs[attrs]
entity="vocabulary",
identifier_name="code",
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
def get_vocabulary(self, code, only_data=False, use_cache=True):
"""Returns the details of a given vocabulary (including vocabulary terms)"""
code = str(code).upper()
voc = (
not only_data
and use_cache
and self._object_cache(entity="vocabulary", code=code)
)
if voc:
return voc
entity = "vocabulary"
method_name = get_method_for_entity(entity, "get")
objectIds = _type_for_id(code.upper(), entity)
fetchopts = fetch_option[entity]
request = {
"method": method_name,
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("no {} found with identifier: {}".format(entity, code))
else:
parse_jackson(resp)
for ident in resp:
data = resp[ident]
if only_data:
return data
vocabulary = Vocabulary(openbis_obj=self, data=data)
if self.use_cache:
self._object_cache(entity="vocabulary", code=code, value=vocabulary)
return vocabulary
def new_tag(self, code, description=None):
return Tag(self, code=code, description=description)
def get_tags(self, code=None, start_with=None, count=None):
search_criteria = get_search_type_for_entity("tag", "AND")
criteria = []
fetchopts = fetch_option["tag"]
fetchopts["from"] = start_with
fetchopts["count"] = count
for option in ["owner"]:
if code:
criteria.append(_criteria_for_code(code))
request = {
"method": "searchTags",
"params": [self.token, search_criteria, fetchopts],
}
resp = self._post_request(self.as_v3, request)
return self._tag_list_for_response(
response=resp["objects"], totalCount=resp["totalCount"]
)
Swen Vermeul
committed
def get_tag(self, permId, only_data=False, use_cache=True):
Swen Vermeul
committed
just_one = True
identifiers = []
if isinstance(permId, list):
just_one = False
for ident in permId:
tag = (
not only_data
and use_cache
and self._object_cache(entity="tag", code=permId)
)
if tag:
return tag
fetchopts = fetch_option["tag"]
for option in ["owner"]:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
request = {
"method": "getTags",
Swen Vermeul
committed
}
resp = self._post_request(self.as_v3, request)
if just_one:
if len(resp) == 0:
raise ValueError("no such tag found: {}".format(permId))
parse_jackson(resp)
for permId in resp:
if only_data:
return resp[permId]
else:
tag = Tag(self, data=resp[permId])
if self.use_cache:
self._object_cache(entity="tag", code=permId, value=tag)
return tag
return self._tag_list_for_response(response=list(resp.values()))
def _tag_list_for_response(self, response, totalCount=0):
vkovtun
committed
def create_data_frame(attrs, props, response):
parse_jackson(response)
attrs = [
"permId",
"code",
"description",
"owner",
"private",
"registrationDate",
]
if len(response) == 0:
tags = DataFrame(columns=attrs)
else:
tags = DataFrame(response)
tags["registrationDate"] = tags["registrationDate"].map(
format_timestamp
)
vkovtun
committed
tags["permId"] = tags["permId"].map(extract_permid)
tags["description"] = tags["description"].map(
lambda x: "" if x is None else x
)
tags["owner"] = tags["owner"].map(extract_person)
return tags[attrs]
vkovtun
committed
response=response,
df_initializer=create_data_frame,
def search_semantic_annotations(
self, permId=None, entityType=None, propertyType=None, only_data=False
):
"""Get a list of semantic annotations for permId, entityType, propertyType or

yvesn
committed
property type assignment (DataFrame object).
:param permId: permId of the semantic annotation.
:param entityType: entity (sample) type to search for.
:param propertyType: property type to search for
:param only_data: return result as plain data object.
:return: Things of DataFrame objects or plain data object
"""
criteria = []
typeCriteria = []
if permId is not None:
criteria.append(
{
"@type": "as.dto.common.search.PermIdSearchCriteria",
"fieldValue": {
"@type": "as.dto.common.search.StringEqualToValue",
"value": permId,
},
if entityType is not None:
typeCriteria.append(
{
"@type": "as.dto.entitytype.search.EntityTypeSearchCriteria",
"criteria": [_criteria_for_code(entityType)],
}
)
if propertyType is not None:
typeCriteria.append(
{
"@type": "as.dto.property.search.PropertyTypeSearchCriteria",
"criteria": [_criteria_for_code(propertyType)],
}
)
if entityType is not None and propertyType is not None:
criteria.append(
{
"@type": "as.dto.property.search.PropertyAssignmentSearchCriteria",
"criteria": typeCriteria,
}
)
else:
criteria += typeCriteria
saCriteria = {
"@type": "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria",
}
objects = self._search_semantic_annotations(saCriteria)
if only_data:
return objects
Swen Vermeul
committed
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = [
"permId",
"entityType",
"propertyType",
"predicateOntologyId",
"predicateOntologyVersion",
"predicateAccessionId",
"descriptorOntologyId",
"descriptorOntologyVersion",
"descriptorAccessionId",
"creationDate",
]
if len(response) == 0:
annotations = DataFrame(columns=attrs)
else:
annotations = DataFrame(response)
return annotations[attrs]
Swen Vermeul
committed
entity="semantic_annotation",
identifier_name="permId",
vkovtun
committed
response=objects,
df_initializer=create_data_frame,
def _search_semantic_annotations(self, criteria):
fetch_options = {
"@type": "as.dto.semanticannotation.fetchoptions.SemanticAnnotationFetchOptions",
"entityType": {
"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
},
"propertyType": {
"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
},
"propertyAssignment": {
"@type": "as.dto.property.fetchoptions.PropertyAssignmentFetchOptions",
"entityType": {
"@type": "as.dto.entitytype.fetchoptions.EntityTypeFetchOptions"
"propertyType": {
"@type": "as.dto.property.fetchoptions.PropertyTypeFetchOptions"
}
request = {
"method": "searchSemanticAnnotations",
}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
return []
else:
parse_jackson(objects)
obj["permId"] = obj["permId"]["permId"]
if obj.get("entityType") is not None:
obj["entityType"] = obj["entityType"]["code"]
elif obj.get("propertyType") is not None:
obj["propertyType"] = obj["propertyType"]["code"]
elif obj.get("propertyAssignment") is not None:
obj["entityType"] = obj["propertyAssignment"]["entityType"]["code"]
obj["propertyType"] = obj["propertyAssignment"]["propertyType"][
"code"
]
obj["creationDate"] = format_timestamp(obj["creationDate"])
def get_semantic_annotations(self):
"""Get a list of all available semantic annotations (DataFrame object)."""
objects = self._search_semantic_annotations(
{
"@type": "as.dto.semanticannotation.search.SemanticAnnotationSearchCriteria"
}
)
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = [
"permId",
"entityType",
"propertyType",
"predicateOntologyId",
"predicateOntologyVersion",
"predicateAccessionId",
"descriptorOntologyId",
"descriptorOntologyVersion",
"descriptorAccessionId",
"creationDate",
]
vkovtun
committed
if len(response) == 0:
vkovtun
committed
annotations = DataFrame(columns=attrs)
else:
vkovtun
committed
annotations = DataFrame(response)
vkovtun
committed
return annotations[attrs]
entity="semantic_annotation",
identifier_name="permId",
vkovtun
committed
response=objects,
df_initializer=create_data_frame,
def get_semantic_annotation(self, permId, only_data=False):
objects = self.search_semantic_annotations(permId=permId, only_data=True)
if len(objects) == 0:
"Semantic annotation with permId " + permId + " not found."
)
obj = objects[0]
if only_data:
return obj
return SemanticAnnotation(self, isNew=False, **obj)
def get_plugins(self, start_with=None, count=None):
search_criteria = get_search_type_for_entity("plugin", "AND")
search_criteria["criteria"] = criteria
fetchopts = fetch_option["plugin"]
for option in ["registrator"]:
fetchopts["from"] = start_with
fetchopts["count"] = count
request = {
"method": "searchPlugins",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = [
"name",
"description",
"pluginType",
"pluginKind",
"entityKinds",
"registrator",
"registrationDate",
"permId",
]
vkovtun
committed
objects = response["objects"]
if len(objects) == 0:
plugins = DataFrame(columns=attrs)
else:
parse_jackson(objects)
plugins = DataFrame(objects)
plugins["permId"] = plugins["permId"].map(extract_permid)
plugins["registrator"] = plugins["registrator"].map(extract_person)
plugins["registrationDate"] = plugins["registrationDate"].map(
format_timestamp
)
plugins["description"] = plugins["description"].map(
lambda x: "" if x is None else x
)
plugins["entityKinds"] = plugins["entityKinds"].map(
lambda x: "" if x is None else x
)
return plugins[attrs]
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
def get_plugin(self, permId, only_data=False, with_script=True):
search_request = _type_for_id(permId, "plugin")
fetchopts = fetch_option["plugin"]
options = ["registrator"]
for option in options:
fetchopts[option] = fetch_option[option]
request = {
"method": "getPlugins",
"params": [self.token, [search_request], fetchopts],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if resp is None or len(resp) == 0:
raise ValueError("no such plugin found: " + permId)
else:
for permId in resp:
if only_data:
return resp[permId]
else:
return Plugin(self, data=resp[permId])
def new_plugin(self, name, pluginType, **kwargs):
name -- name of the plugin
description --
pluginType -- DYNAMIC_PROPERTY, MANAGED_PROPERTY, ENTITY_VALIDATION
entityKind -- MATERIAL, EXPERIMENT, SAMPLE, DATA_SET
script -- string of the script itself
available --
return Plugin(self, name=name, pluginType=pluginType, **kwargs)
def new_property_type(
self,
code,
label,
description,
dataType,
managedInternally=False,
vocabulary=None,
materialType=None,
schema=None,
transformation=None,
code -- name of the property type
managedInternally -- must be set to True if code starts with a $
label -- displayed label of that property
description --
dataType -- must contain any of these values:
INTEGER VARCHAR MULTILINE_VARCHAR
REAL TIMESTAMP BOOLEAN HYPERLINK
XML CONTROLLEDVOCABULARY MATERIAL
vocabulary -- if dataType is CONTROLLEDVOCABULARY, this attribute
must contain the code of the vocabulary object.
materialType --
schema --
transformation --
metaData -- used to create properties that contain either RichText or tabular, spreadsheet-like data.
use {'custom_widget' : 'Word Processor'} and MULTILINE_VARCHAR for RichText
use {'custom_widget' : 'Spreadhseet'} and XML for tabular data.
PropertyTypes can be assigned to
- sampleTypes
- dataSetTypes
- experimentTypes
- materialTypes (deprecated)
"""
if isinstance(vocabulary, Vocabulary):
vocabulary = vocabulary.code
return PropertyType(
openbis_obj=self,
code=code,
label=label,
description=description,
dataType=dataType,
managedInternally=managedInternally,
vocabulary=vocabulary,
materialType=materialType,
schema=schema,
transformation=transformation,
def get_property_type(
self, code, only_data=False, start_with=None, count=None, use_cache=True
):
if not isinstance(code, list) and start_with is None and count is None:
code = str(code).upper()
pt = (
use_cache
and self.use_cache
and self._object_cache(entity="property_type", code=code)
if pt:
if only_data:
return pt.data
else:
return pt
identifiers = []
only_one = False
if not isinstance(code, list):
code = [code]
only_one = True
for c in code:
identifiers.append(
{"permId": c.upper(), "@type": "as.dto.property.id.PropertyTypePermId"}
)
fetchopts = fetch_option["propertyType"]
options = ["vocabulary", "materialType", "semanticAnnotations", "registrator"]
for option in options:
fetchopts[option] = fetch_option[option]
request = {
"method": "getPropertyTypes",
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if only_one:
if len(resp) == 0:
raise ValueError("no such propertyType: {}".format(code))
if only_data:
return resp[ident]
else:
pt = PropertyType(openbis_obj=self, data=resp[ident])
if self.use_cache:
entity="property_type", code=code[0], value=pt
)
return pt
# return a list of objects
else:
return self._property_type_things(
objects=list(resp.values()),
start_with=start_with,
count=count,
totalCount=len(resp),
def get_property_types(self, code=None, start_with=None, count=None):
fetchopts = fetch_option["propertyType"]
fetchopts["from"] = start_with
fetchopts["count"] = count
search_criteria = get_search_criteria("propertyType", code=code)
request = {
"method": "searchPropertyTypes",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(objects)
return self._property_type_things(
objects=objects,
def _property_type_things(
self, objects, start_with=None, count=None, totalCount=None
):
"""takes a list of objects and returns a Things object"""
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = openbis_definitions("propertyType")["attrs"]
if len(response) == 0:
df = DataFrame(columns=attrs)
else:
df = DataFrame(response)
df["registrationDate"] = df["registrationDate"].map(format_timestamp)
df["registrator"] = df["registrator"].map(extract_person)
df["vocabulary"] = df["vocabulary"].map(extract_code)
df["semanticAnnotations"] = df["semanticAnnotations"].map(
extract_nested_permids
)
return df[attrs]
return Things(
single_item_method=self.get_property_type,
start_with=start_with,
count=count,
totalCount=totalCount,
vkovtun
committed
response=objects,
df_initializer=create_data_frame,
)
def get_material_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available material types"""
return self.get_entity_types(
cls=MaterialType,
type=type,
start_with=start_with,
)
def get_material_type(self, type, only_data=False):
return self.get_entity_type(
cls=MaterialType,
identifier=type,
method=self.get_material_type,
only_data=only_data,
)
def get_experiment_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available experiment types"""
return self.get_entity_types(
cls=ExperimentType,
type=type,
start_with=start_with,
)
get_collection_types = get_experiment_types # Alias
def get_experiment_type(self, type, only_data=False):
return self.get_entity_type(
cls=ExperimentType,
identifier=type,
method=self.get_experiment_type,
only_data=only_data,
)
get_collection_type = get_experiment_type # Alias
def get_dataset_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available dataSet types"""
return self.get_entity_types(
cls=DataSetType,
type=type,
start_with=start_with,
)
def get_dataset_type(self, type, only_data=False):
return self.get_entity_type(
identifier=type,
cls=DataSetType,
method=self.get_dataset_type,
only_data=only_data,
)
def get_sample_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available sample types"""
return self.get_entity_types(
cls=SampleType,
type=type,
start_with=start_with,
)
def get_sample_type(self, type, only_data=False, with_vocabulary=False):
identifier=type,
cls=SampleType,
with_vocabulary=with_vocabulary,
method=self.get_sample_type,
only_data=only_data,
)
def get_entity_types(
self, entity, cls, type=None, start_with=None, count=None, with_vocabulary=False
):
method_name = get_method_for_entity(entity, "search")
if type is not None:
search_request = _subcriteria_for_code(type, entity)
else:
search_request = get_type_for_entity(entity, "search")
fetch_options = get_fetchoption_for_entity(entity)
fetch_options["from"] = start_with
fetch_options["count"] = count
request = {
"method": method_name,
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
parse_jackson(response)
entity_types = []
defs = get_definition_for_entity(entity)
attrs = defs["attrs"]
objects = response["objects"]
if len(objects) == 0:
entity_types = DataFrame(columns=attrs)
else:
parse_jackson(objects)
entity_types = DataFrame(objects)
entity_types["permId"] = entity_types["permId"].map(extract_permid)
entity_types["modificationDate"] = entity_types["modificationDate"].map(
format_timestamp
)
entity_types["validationPlugin"] = entity_types["validationPlugin"].map(
extract_nested_permid
)
return entity_types[attrs]
return Things(
openbis_obj=self,
entity=entity,
start_with=start_with,
vkovtun
committed
single_item_method=getattr(self, cls._single_item_method_name),
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
)
def get_entity_type(
self,
entity,
identifier,
cls,
method=None,
only_data=False,
with_vocabulary=False,
et = (
not only_data
and not isinstance(identifier, list)
and self._object_cache(entity=entity, code=identifier)
)
if et:
return et
method_name = get_method_for_entity(entity, "get")
fetch_options = get_fetchoption_for_entity(entity)
if with_vocabulary:
fetch_options["propertyAssignments"]["propertyType"]["vocabulary"] = {
"@type": "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions",
"terms": {
"@type": "as.dto.vocabulary.fetchoptions.VocabularyTermFetchOptions"
if not isinstance(identifier, list):
identifier = [identifier]
identifiers.append(
{
"permId": ident,
"@type": "as.dto.entitytype.id.EntityTypePermId",
}
)
request = {
"method": method_name,
"params": [self.token, identifiers, fetch_options],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if len(identifiers) == 1:
if len(resp) == 0:
raise ValueError("no such {}: {}".format(entity, identifier[0]))
for ident in resp:
if only_data:
return resp[ident]
else:
obj = cls(
openbis_obj=self,
data=resp[ident],
method=method,
if self.use_cache:
self._object_cache(entity=entity, code=ident, value=obj)
return obj
self,
method_name,
entity,
type_name=None,
start_with=None,
count=None,
additional_attributes=None,
optional_attributes=None,
"""Returns a list of all available types of an entity.
If the name of the entity-type is given, it returns a PropertyAssignments object
if additional_attributes is None:
additional_attributes = []
if optional_attributes is None:
optional_attributes = []
"@type": "as.dto.{}.search.{}TypeSearchCriteria".format(
entity.lower(), entity
)
"@type": "as.dto.{}.fetchoptions.{}TypeFetchOptions".format(
entity.lower(), entity
)
fetch_options["from"] = start_with
fetch_options["count"] = count
search_request = _gen_search_criteria(
{entity.lower(): entity + "Type", "operator": "AND", "code": type_name}
)
fetch_options["propertyAssignments"] = fetch_option["propertyAssignments"]
if self.get_server_information().api_version > "3.3":
fetch_options["validationPlugin"] = fetch_option["plugin"]
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
parse_jackson(response)
if type_name is not None:
if len(response["objects"]) == 1:
return EntityType(openbis_obj=self, data=response["objects"][0])
elif len(response["objects"]) == 0:
raise ValueError("No such {} type: {}".format(entity, type_name))