Newer
Older
entity="property_type", code=code[0], value=pt
)
return pt
# return a list of objects
else:
return self._property_type_things(
objects=list(resp.values()),
start_with=start_with,
count=count,
totalCount=len(resp),
def get_property_types(self, code=None, start_with=None, count=None):
fetchopts = get_fetchoption_for_entity("propertyType")
fetchopts["from"] = start_with
fetchopts["count"] = count
search_criteria = get_search_criteria("propertyType", code=code)
request = {
"method": "searchPropertyTypes",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(objects)
return self._property_type_things(
objects=objects,
Adam Laskowski
committed
self, objects, start_with=None, count=None, totalCount=None
):
"""takes a list of objects and returns a Things object"""
vkovtun
committed
def create_data_frame(attrs, props, response):
attrs = openbis_definitions("propertyType")["attrs"]
if len(response) == 0:
df = DataFrame(columns=attrs)
else:
df = DataFrame(response)
df["registrationDate"] = df["registrationDate"].map(format_timestamp)
df["registrator"] = df["registrator"].map(extract_person)
df["vocabulary"] = df["vocabulary"].map(extract_code)
df["semanticAnnotations"] = df["semanticAnnotations"].map(
extract_nested_permids
)
return df[attrs]
return Things(
single_item_method=self.get_property_type,
start_with=start_with,
count=count,
totalCount=totalCount,
vkovtun
committed
response=objects,
df_initializer=create_data_frame,
)
def get_material_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available material types"""
return self.get_entity_types(
cls=MaterialType,
type=type,
start_with=start_with,
)
def get_material_type(self, type, only_data=False):
return self.get_entity_type(
cls=MaterialType,
identifier=type,
method=self.get_material_type,
only_data=only_data,
)
def get_experiment_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available experiment types"""
return self.get_entity_types(
cls=ExperimentType,
type=type,
start_with=start_with,
)
get_collection_types = get_experiment_types # Alias
def get_experiment_type(self, type, only_data=False):
return self.get_entity_type(
cls=ExperimentType,
identifier=type,
method=self.get_experiment_type,
only_data=only_data,
)
get_collection_type = get_experiment_type # Alias
def get_dataset_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available dataSet types"""
return self.get_entity_types(
cls=DataSetType,
type=type,
start_with=start_with,
)
def get_dataset_type(self, type, only_data=False):
return self.get_entity_type(
identifier=type,
cls=DataSetType,
method=self.get_dataset_type,
only_data=only_data,
)
def get_sample_types(self, type=None, start_with=None, count=None):
"""Returns a list of all available sample types"""
return self.get_entity_types(
cls=SampleType,
type=type,
start_with=start_with,
)
def get_sample_type(self, type, only_data=False, with_vocabulary=False):
identifier=type,
cls=SampleType,
with_vocabulary=with_vocabulary,
method=self.get_sample_type,
only_data=only_data,
)
def get_entity_types(
Adam Laskowski
committed
self, entity, cls, type=None, start_with=None, count=None, with_vocabulary=False
):
method_name = get_method_for_entity(entity, "search")
if type is not None:
search_request = _subcriteria_for_code(type, entity)
else:
search_request = get_type_for_entity(entity, "search")
fetch_options = get_fetchoption_for_entity(entity)
fetch_options["from"] = start_with
fetch_options["count"] = count
request = {
"method": method_name,
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
parse_jackson(response)
entity_types = []
defs = get_definition_for_entity(entity)
attrs = defs["attrs"]
objects = response["objects"]
if len(objects) == 0:
entity_types = DataFrame(columns=attrs)
else:
parse_jackson(objects)
entity_types = DataFrame(objects)
entity_types["permId"] = entity_types["permId"].map(extract_permid)
entity_types["modificationDate"] = entity_types["modificationDate"].map(
format_timestamp
)
entity_types["validationPlugin"] = entity_types["validationPlugin"].map(
extract_nested_permid
)
return entity_types[attrs]
return Things(
openbis_obj=self,
entity=entity,
start_with=start_with,
vkovtun
committed
single_item_method=getattr(self, cls._single_item_method_name),
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
)
Adam Laskowski
committed
self,
entity,
identifier,
cls,
method=None,
only_data=False,
with_vocabulary=False,
use_cache=True,
Adam Laskowski
committed
not only_data
and not isinstance(identifier, list)
and use_cache
and self._object_cache(entity=entity, code=identifier)
if et:
return et
method_name = get_method_for_entity(entity, "get")
fetch_options = get_fetchoption_for_entity(entity)
if with_vocabulary:
fetch_options["propertyAssignments"]["propertyType"]["vocabulary"] = {
"@type": "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions",
"terms": {
"@type": "as.dto.vocabulary.fetchoptions.VocabularyTermFetchOptions"
if not isinstance(identifier, list):
identifier = [identifier]
identifiers.append(
{
"permId": ident,
"@type": "as.dto.entitytype.id.EntityTypePermId",
}
)
request = {
"method": method_name,
"params": [self.token, identifiers, fetch_options],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if len(identifiers) == 1:
if len(resp) == 0:
raise ValueError(f"no such {entity}: {identifier[0]}")
for ident in resp:
if only_data:
return resp[ident]
else:
obj = cls(
openbis_obj=self,
data=resp[ident],
method=method,
if self.use_cache:
self._object_cache(entity=entity, code=ident, value=obj)
return obj
Adam Laskowski
committed
self,
method_name,
entity,
type_name=None,
start_with=None,
count=None,
additional_attributes=None,
optional_attributes=None,
"""Returns a list of all available types of an entity.
If the name of the entity-type is given, it returns a PropertyAssignments object
if additional_attributes is None:
additional_attributes = []
if optional_attributes is None:
optional_attributes = []
"@type": f"as.dto.{entity.lower()}.search.{entity}TypeSearchCriteria"
"@type": f"as.dto.{entity.lower()}.fetchoptions.{entity}TypeFetchOptions"
fetch_options["from"] = start_with
fetch_options["count"] = count
search_request = _gen_search_criteria(
{entity.lower(): entity + "Type", "operator": "AND", "code": type_name}
)
fetch_options["propertyAssignments"] = get_fetchoption_for_entity(
"propertyAssignments"
)
if self.get_server_information().api_version > "3.3":
fetch_options["validationPlugin"] = get_fetchoption_for_entity("plugin")
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
parse_jackson(response)
if type_name is not None:
if len(response["objects"]) == 1:
return EntityType(openbis_obj=self, data=response["objects"][0])
elif len(response["objects"]) == 0:
raise ValueError(f"No such {entity} type: {type_name}")
vkovtun
committed
else:
raise ValueError(
f"There is more than one entry for entity={entity} and type={type_name}"
vkovtun
committed
types = []
attrs = self._get_attributes(
type_name, types, additional_attributes, optional_attributes
)
objects = response["objects"]
if len(objects) == 0:
types = DataFrame(columns=attrs)
else:
parse_jackson(objects)
types = DataFrame(objects)
types["modificationDate"] = types["modificationDate"].map(
format_timestamp
)
vkovtun
committed
return types[attrs]
vkovtun
committed
response=resp,
df_initializer=create_data_frame,
Swen Vermeul
committed
Adam Laskowski
committed
self, type_name, types, additional_attributes, optional_attributes
):
attributes = ["code", "description"] + additional_attributes
attributes += [
attribute for attribute in optional_attributes if attribute in types
]
attributes += ["modificationDate"]
Yves Noirjean
committed
if type_name is not None:
Yves Noirjean
committed
return attributes
Swen Vermeul
committed
def is_session_active(self):
"""checks whether a session is still active. Returns true or false."""
Swen Vermeul
committed
return self.is_token_valid(self.token)
def is_token_valid(self, token: str = None):
Chandrasekhar Ramakrishnan
committed
"""Check if the connection to openBIS is valid.
This method is useful to check if a token is still valid or if it has timed out,
requiring the user to login again.
Chandrasekhar Ramakrishnan
committed
:return: Return True if the token is valid, False if it is not valid.
"""
if token is None:
token = self.token
if token is None:
return False
request = {
"method": "isSessionActive",
"params": [token],
resp = self._post_request(self.as_v3, request)
def get_session_info(self, token=None):
if token is None:
token = self.token
if token is None:
request = {"method": "getSessionInformation", "params": [token]}
try:
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
except Exception as exc:
return None
return SessionInformation(openbis_obj=self, data=resp)
def set_token(self, token, save_token=False):
"""Checks the validity of a token, sets it as the current token and (by default) saves it
to the disk, i.e. in the ~/.pybis directory
"""
raise ValueError("Session is no longer valid. Please log in again.")
if save_token:
self._save_token_to_disk()
Swen Vermeul
committed
def get_dataset(self, permIds, only_data=False, props=None, **kvals):
"""fetch a dataset and some metadata attached to it:
- properties
- sample
- parents
- children
- containers
- dataStore
- physicalData
- linkedData
:return: a DataSet object
"""
Swen Vermeul
committed
just_one = True
identifiers = []
if isinstance(permIds, list):
just_one = False
for permId in permIds:
identifiers.append(_type_for_id(permId, "dataset"))
Swen Vermeul
committed
else:
identifiers.append(_type_for_id(permIds, "dataset"))
fetchopts = get_fetchoption_for_entity("dataSet")
for option in [
"tags",
"properties",
"dataStore",
"physicalData",
"linkedData",
"experiment",
"sample",
"registrator",
"modifier",
]:
fetchopts[option] = get_fetchoption_for_entity(option)
request = {
"params": [
self.token,
Swen Vermeul
committed
identifiers,
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
if just_one:
if len(resp) == 0:
raise ValueError(f"no such dataset found: {permIds}")
Swen Vermeul
committed
parse_jackson(resp)
for permId in resp:
if only_data:
return resp[permId]
else:
return DataSet(
type=self.get_dataset_type(resp[permId]["type"]["code"]),
data=resp[permId],
Swen Vermeul
committed
)
else:
response=list(resp.values()), props=props, parsed=False
def _dataset_list_for_response(
Adam Laskowski
committed
self,
response,
attrs=None,
props=None,
start_with=None,
count=None,
totalCount=0,
objects=None,
parsed=False,
"""returns a Things object, containing a DataFrame plus some additional information"""
def extract_attribute(attribute_to_extract):
def return_attribute(obj):
return ""
return obj.get(attribute_to_extract, "")
Swen Vermeul
committed
if not parsed:
parse_jackson(response)
Swen Vermeul
committed
def extract_project(attr):
Swen Vermeul
committed
def extract_attr(obj):
Swen Vermeul
committed
if attr:
Swen Vermeul
committed
else:
return obj["project"]["identifier"]["identifier"]
Swen Vermeul
committed
except KeyError:
Swen Vermeul
committed
return extract_attr
def extract_space(attr):
Swen Vermeul
committed
def extract_attr(obj):
Swen Vermeul
committed
if attr:
Swen Vermeul
committed
else:
Swen Vermeul
committed
except KeyError:
Swen Vermeul
committed
return extract_attr
vkovtun
committed
def create_data_frame(attrs, props, response):
default_attrs = [
"permId",
"type",
"experiment",
"sample",
"registrationDate",
"modificationDate",
"location",
"status",
"presentInArchive",
"size",
]
display_attrs = default_attrs + attrs
vkovtun
committed
if props is None:
props = []
else:
if isinstance(props, str):
props = [props]
Swen Vermeul
committed
vkovtun
committed
if len(response) == 0:
for prop in props:
if prop == "*":
continue
display_attrs.append(prop)
datasets = DataFrame(columns=display_attrs)
else:
datasets = DataFrame(response)
for attr in attrs:
if "project" in attr:
datasets[attr] = datasets["experiment"].map(
extract_project(attr)
)
vkovtun
committed
elif "space" in attr:
datasets[attr] = datasets["experiment"].map(extract_space(attr))
elif "." in attr:
entity, attribute_to_extract = attr.split(".")
datasets[attr] = datasets[entity].map(
extract_attribute(attribute_to_extract)
)
for attr in attrs:
# if no dot supplied, just display the code of the space, project or experiment
if any(entity == attr for entity in ["experiment", "sample"]):
datasets[attr] = datasets[attr].map(extract_nested_identifier)
vkovtun
committed
4582
4583
4584
4585
4586
4587
4588
4589
4590
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601
4602
4603
4604
4605
4606
4607
4608
datasets["registrationDate"] = datasets["registrationDate"].map(
format_timestamp
)
datasets["modificationDate"] = datasets["modificationDate"].map(
format_timestamp
)
datasets["experiment"] = datasets["experiment"].map(
extract_nested_identifier
)
datasets["sample"] = datasets["sample"].map(extract_nested_identifier)
datasets["type"] = datasets["type"].map(extract_code)
datasets["permId"] = datasets["code"]
for column in ["parents", "children", "components", "containers"]:
if column in datasets:
datasets[column] = datasets[column].map(extract_identifiers)
datasets["size"] = datasets["physicalData"].map(
lambda x: x.get("size") if x else ""
)
datasets["status"] = datasets["physicalData"].map(
lambda x: x.get("status") if x else ""
)
datasets["presentInArchive"] = datasets["physicalData"].map(
lambda x: x.get("presentInArchive") if x else ""
)
datasets["location"] = datasets["physicalData"].map(
lambda x: x.get("location") if x else ""
)
vkovtun
committed
4610
4611
4612
4613
4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
for prop in props:
if prop == "*":
# include all properties in dataFrame.
# expand the dataFrame by adding new columns
columns = []
for i, dataSet in enumerate(response):
for prop_name, val in dataSet.get("properties", {}).items():
datasets.loc[i, prop_name.upper()] = val
columns.append(prop_name.upper())
display_attrs += set(columns)
continue
else:
# property name is provided
for i, dataSet in enumerate(response):
val = dataSet.get("properties", {}).get(
prop, ""
) or dataSet.get("properties", {}).get(prop.upper(), "")
datasets.loc[i, prop.upper()] = val
display_attrs.append(prop.upper())
return datasets[display_attrs]
def create_objects(response):
return objects
start_with=start_with,
count=count,
totalCount=totalCount,
vkovtun
committed
attrs=attrs,
vkovtun
committed
response=response,
df_initializer=create_data_frame,
objects_initializer=create_objects,
Swen Vermeul
committed
Adam Laskowski
committed
self, sample_ident, only_data=False, withAttachments=False, props=None, **kvals
Chandrasekhar Ramakrishnan
committed
"""Retrieve metadata for the sample.
Get metadata for the sample and any directly connected parents of the sample to allow access
to the same information visible in the ELN UI. The metadata will be on the file system.
:param sample_identifiers: A list of sample identifiers to retrieve.
"""
Swen Vermeul
committed
identifiers = []
if isinstance(sample_ident, list):
Swen Vermeul
committed
for ident in sample_ident:
identifiers.append(_type_for_id(ident, "sample"))
Swen Vermeul
committed
else:
identifiers.append(_type_for_id(sample_ident, "sample"))
fetchopts = get_fetchoption_for_entity("sample")
options = [
"tags",
"properties",
"attachments",
"space",
"experiment",
"registrator",
"modifier",
"dataSets",
]
for option in options:
fetchopts[option] = get_fetchoption_for_entity(option)
Swen Vermeul
committed
if withAttachments:
fetchopts["attachments"] = get_fetchoption_for_entity(
"attachmentsWithContent"
)
Swen Vermeul
committed
for key in ["parents", "children", "container", "components"]:
fetchopts[key] = {"@type": "as.dto.sample.fetchoptions.SampleFetchOptions"}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
if len(resp) == 0:
raise ValueError(f"no such sample found: {sample_ident}")
Swen Vermeul
committed
parse_jackson(resp)
for sample_ident in resp:
if only_data:
return resp[sample_ident]
else:
type=self.get_sample_type(resp[sample_ident]["type"]["code"]),
data=resp[sample_ident],
Swen Vermeul
committed
else:
return self._sample_list_for_response(
response=list(resp.values()), props=props, parsed=False
)
Swen Vermeul
committed
vkovtun
committed
def _sample_list_for_response(
Adam Laskowski
committed
self,
response,
attrs=None,
props=None,
start_with=None,
count=None,
totalCount=0,
parsed=False,
vkovtun
committed
):
if not parsed:
parse_jackson(response)
vkovtun
committed
def create_data_frame(attrs, props, response):
"""returns a Things object, containing a DataFrame plus additional information"""
def extract_attribute(attribute_to_extract):
def return_attribute(obj):
if obj is None:
return ""
return obj.get(attribute_to_extract, "")
return return_attribute
vkovtun
committed
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761
4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
if attrs is None:
attrs = []
default_attrs = [
"identifier",
"permId",
"type",
"registrator",
"registrationDate",
"modifier",
"modificationDate",
]
display_attrs = default_attrs + attrs
if props is None:
props = []
else:
if isinstance(props, str):
props = [props]
if len(response) == 0:
for prop in props:
if prop == "*":
continue
display_attrs.append(prop)
samples = DataFrame(columns=display_attrs)
else:
samples = DataFrame(response)
for attr in attrs:
if "." in attr:
entity, attribute_to_extract = attr.split(".")
samples[attr] = samples[entity].map(
extract_attribute(attribute_to_extract)
)
# if no dot supplied, just display the code of the space, project or experiment
elif attr in ["project", "experiment"]:
samples[attr] = samples[attr].map(extract_nested_identifier)
elif attr in ["space"]:
samples[attr] = samples[attr].map(extract_code)
samples["registrationDate"] = samples["registrationDate"].map(
format_timestamp
)
samples["modificationDate"] = samples["modificationDate"].map(
format_timestamp
)
samples["registrator"] = samples["registrator"].map(extract_person)
samples["modifier"] = samples["modifier"].map(extract_person)
samples["identifier"] = samples["identifier"].map(extract_identifier)
samples["container"] = samples["container"].map(
extract_nested_identifier
)
vkovtun
committed
4792
4793
4794
4795
4796
4797
4798
4799
4800
4801
4802
4803
4804
4805
4806
4807
4808
4809
4810
4811
4812
4813
4814
for column in ["parents", "children", "components"]:
if column in samples:
samples[column] = samples[column].map(extract_identifiers)
samples["permId"] = samples["permId"].map(extract_permid)
samples["type"] = samples["type"].map(extract_nested_permid)
for prop in props:
if prop == "*":
# include all properties in dataFrame.
# expand the dataFrame by adding new columns
columns = []
for i, sample in enumerate(response):
for prop_name, val in sample.get("properties", {}).items():
samples.loc[i, prop_name.upper()] = val
columns.append(prop_name.upper())
display_attrs += set(columns)
continue
else:
# property name is provided
for i, sample in enumerate(response):
if "properties" in sample:
properties = sample["properties"]
val = properties.get(prop, "") or properties.get(
prop.upper(), ""
)
vkovtun
committed
samples.loc[i, prop.upper()] = val
else:
samples.loc[i, prop.upper()] = ""
display_attrs.append(prop.upper())
vkovtun
committed
return samples[display_attrs]
vkovtun
committed
vkovtun
committed
def create_objects(response):
return list(
map(
lambda obj: Sample(
openbis_obj=self,
type=self.get_sample_type(obj["type"]["code"]),
data=obj,
Adam Laskowski
committed
attrs=attrs
),
response,
)
)
result = Things(
openbis_obj=self,
entity="sample",
identifier_name="identifier",
start_with=start_with,
count=count,
totalCount=totalCount,
response=response,
df_initializer=create_data_frame,
objects_initializer=create_objects,
attrs=attrs,
props=props,
)
vkovtun
committed
return result
@staticmethod
def decode_attribute(entity, attribute):
params = {}
attribute, *alias = re.split(r"\s+AS\s+", attribute, flags=re.IGNORECASE)
alias = alias[0] if alias else attribute
regex = re.compile(
r"""^ # beginning of the string
(?P<requested_entity>\w+) # the entity itself
(\.(?P<attribute>\w+))? # capture an optional .attribute
$ # end of string
match = re.search(regex, attribute)
params = match.groupdict()
if params["requested_entity"] == "object":
params["entity"] = "sample"
elif params["requested_entity"] == "collection":
params["entity"] = "experiment"
elif params["requested_entity"] in ["space", "project"]:
params["entity"] = params["requested_entity"]
if not params["attribute"]:
params["attribute"] = "code"
params["alias"] = alias
return params
def _decode_property(self, entity, property):
# match something like: property_name.term.label AS label_alias
regex = re.compile(
r"""^
(?P<alias_alternative>
(?P<property>[^\.]* )
(?:
\.
(?P<subentity>term|pa) \.
(?P<field>code|vocabularyCode|label|description|ordinal|dataType)
)?
)
(
\s+(?i)AS\s+
(?P<alias>\w+)
)?
\s*
$
)
match = re.search(regex, property)
if not match:
try:
params = self.decode_attribute(entity, property)
return params
except ValueError:
raise ValueError(f"unable to parse property: {property}")
params = match.groupdict()
if not params["alias"]:
params["alias"] = params["alias_alternative"]
return params
Swen Vermeul
committed
Adam Laskowski
committed
self, start_with=None, count=None, only_data=False
fetchopts = get_fetchoption_for_entity(entity)
request = {
"method": "searchExternalDataManagementSystems",
"params": [
self.token,
criteria,
fetchopts,
],
}
response = self._post_request(self.as_v3, request)
vkovtun
committed
def create_data_frame(attrs, props, response):
parse_jackson(response)
attrs = "code label address addressType urlTemplate openbis".split()
if len(response["objects"]) == 0:
entities = DataFrame(columns=attrs)
else:
objects = response["objects"]
parse_jackson(objects)
entities = DataFrame(objects)
entities["permId"] = entities["permId"].map(extract_permid)
return entities[attrs]
entity="externalDms",
identifier_name="permId",
start_with=start_with,
count=count,
vkovtun
committed
totalCount=response.get("totalCount"),
response=response,
df_initializer=create_data_frame,
def get_external_data_management_system(self, permId, only_data=False):
Chandrasekhar Ramakrishnan
committed
"""Retrieve metadata for the external data management system.
:param permId: A permId for an external DMS.
Chandrasekhar Ramakrishnan
committed
:param only_data: Return the result data as a hash-map, not an object.
"""
request = {
"method": "getExternalDataManagementSystems",
"params": [
self.token,
[
{
"@type": "as.dto.externaldms.id.ExternalDmsPermId",
"permId": permId,
}
],
{
"@type": "as.dto.externaldms.fetchoptions.ExternalDmsFetchOptions",
},
Chandrasekhar Ramakrishnan
committed
],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if resp is None or len(resp) == 0:
raise ValueError("no such external DMS found: " + permId)
Chandrasekhar Ramakrishnan
committed
else:
for ident in resp:
if only_data:
return resp[ident]
else:
return ExternalDMS(self, resp[ident])
get_externalDms = get_external_data_management_system # alias
def new_space(self, **kwargs):