Newer
Older
elif name == "description":
self.__dict__['_description'] = value
else:
raise KeyError("no such attribute: {}".format(name))
def get_type(self):
Swen Vermeul
committed
def _ident_for_whatever(self, whatever):
if isinstance(whatever, str):
# fetch parent in openBIS, we are given an identifier
obj = getattr(self._openbis, 'get_'+self._entity.lower())(whatever)
else:
# we assume we got an object
obj = whatever
ident = None
if getattr(obj, '_identifier'):
ident = obj._identifier
elif getattr(obj, '_permId'):
ident = obj._permId
if '@id' in ident: ident.pop('@id')
return ident
Swen Vermeul
committed
identifier = self.identifier
if identifier is None:
identifier = self.permId
if identifier is None:
# TODO: if this is a new object, return the list of the parents which have been assigned
pass
else:
return getattr(self._openbis, 'get_' + self._entity.lower() + 's')(withChildren=identifier, **kwargs)
Swen Vermeul
committed
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
def add_parents(self, parents):
if getattr(self, '_parents') is None:
self.__dict__['_parents'] = []
if not isinstance(parents, list):
parents = [parents]
for parent in parents:
self.__dict__['_parents'].append(self._ident_for_whatever(parent))
def del_parents(self, parents):
if getattr(self, '_parents') is None:
return
if not isinstance(parents, list):
parents = [parents]
for parent in parents:
ident = self._ident_for_whatever(parent)
for i, item in enumerate(self.__dict__['_parents']):
if 'identifier' in ident and 'identifier' in item and ident['identifier'] == item['identifier']:
self.__dict__['_parents'].pop(i)
elif 'permId' in ident and 'permId' in item and ident['permId'] == item['permId']:
self.__dict__['_parents'].pop(i)
Swen Vermeul
committed
identifier = self.identifier
if identifier is None:
identifier = self.permId
if identifier is None:
# TODO: if this is a new object, return the list of the children which have been assigned
pass
else:
# e.g. self._openbis.get_samples(withParents=self.identifier)
return getattr(self._openbis, 'get_' + self._entity.lower() + 's')(withParents=identifier, **kwargs)
Swen Vermeul
committed
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
def add_children(self, children):
if getattr(self, '_children') is None:
self.__dict__['_children'] = []
if not isinstance(children, list):
children = [children]
for child in children:
self.__dict__['_children'].append(self._ident_for_whatever(child))
def del_children(self, children):
if getattr(self, '_children') is None:
return
if not isinstance(children, list):
children = [children]
for child in children:
ident = self._ident_for_whatever(child)
for i, item in enumerate(self.__dict__['_children']):
if 'identifier' in ident and 'identifier' in item and ident['identifier'] == item['identifier']:
self.__dict__['_children'].pop(i)
elif 'permId' in ident and 'permId' in item and ident['permId'] == item['permId']:
self.__dict__['_children'].pop(i)
@property
def tags(self):
if getattr(self, '_tags') is not None:
return [x['code'] for x in self._tags]
def set_tags(self, tags):
if getattr(self, '_tags') is None:
self.__dict__['_tags'] = []
tagIds = _create_tagIds(tags)
# remove tags that are not in the new tags list
for tagId in self.__dict__['_tags']:
if tagId not in tagIds:
self.__dict__['_tags'].remove(tagId)
# add all new tags that are not in the list yet
for tagId in tagIds:
if tagId not in self.__dict__['_tags']:
self.__dict__['_tags'].append(tagId)
def add_tags(self, tags):
if getattr(self, '_tags') is None:
self.__dict__['_tags'] = []
# add the new tags to the _tags and _new_tags list,
# if not listed yet
tagIds = _create_tagIds(tags)
for tagId in tagIds:
if not tagId in self.__dict__['_tags']:
self.__dict__['_tags'].append(tagId)
def del_tags(self, tags):
if getattr(self, '_tags') is None:
self.__dict__['_tags'] = []
# remove the tags from the _tags and _del_tags list,
# if listed there
tagIds = _create_tagIds(tags)
for tagId in tagIds:
if tagId in self.__dict__['_tags']:
self.__dict__['_tags'].remove(tagId)
def get_attachments(self):
if getattr(self, '_attachments') is None:
return None
else:
return DataFrame(self._attachments)[['fileName', 'title', 'description', 'version']]
def add_attachment(self, fileName, title=None, description=None):
att = Attachment(filename=fileName, title=title, description=description)
if getattr(self, '_attachments') is None:
self.__dict__['_attachments'] = []
self._attachments.append(att.get_data_short())
if getattr(self, '_new_attachments') is None:
self.__dict__['_new_attachments'] = []
self._new_attachments.append(att)
def download_attachments(self):
method = 'get' + self.entity + 's'
entity = self.entity.lower()
request = {
"method": method,
"params": [self._openbis.token,
[self._permId],
dict(
attachments=fetch_option['attachmentsWithContent'],
**fetch_option[entity]
)
}
resp = self._openbis._post_request(self._openbis.as_v3, request)
attachments = resp[self.permId]['attachments']
file_list = []
for attachment in attachments:
filename = os.path.join(
self._openbis.hostname,
self.permId,
attachment['fileName']
)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as att:
content = base64.b64decode(attachment['content'])
att.write(content)
def _repr_html_(self):
Swen Vermeul
committed
def nvl(val, string=''):
if val is None:
return string
return val
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
Swen Vermeul
committed
for attr in self._allowed_attrs:
if attr == 'attachments':
continue
html += "<tr> <td>{}</td> <td>{}</td> </tr>".format(
attr, nvl(getattr(self, attr, ''), '')
Swen Vermeul
committed
)
if getattr(self, '_attachments') is not None:
Swen Vermeul
committed
html += "<tr><td>attachments</td><td>"
html += "<br/>".join(att['fileName'] for att in self._attachments)
html += "</td></tr>"
html += """
</tbody>
</table>
"""
return html
def __repr__(self):
headers = ['attribute', 'value']
lines = []
for attr in self._allowed_attrs:
if attr == 'attachments':
continue
lines.append([
attr,
nvl(getattr(self, attr, ''))
])
return tabulate(lines, headers=headers)
class Sample():
""" A Sample is one of the most commonly used objects in openBIS.
"""
Swen Vermeul
committed
def __init__(self, openbis_obj, type, data=None, props=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['type'] = type
self.__dict__['p'] = PropertyHolder(openbis_obj, type)
self.__dict__['a'] = AttrHolder(openbis_obj, 'Sample', type)
if data is not None:
self._set_data(data)
Swen Vermeul
committed
if props is not None:
for key in props:
setattr(self.p, key, props[key])
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def _set_data(self, data):
# assign the attribute data to self.a by calling it
# (invoking the AttrHolder.__call__ function)
self.a(data)
self.__dict__['data'] = data
# put the properties in the self.p namespace (without checking them)
for key, value in data['properties'].items():
self.p.__dict__[key.lower()] = value
Swen Vermeul
committed
def __dir__(self):
'props', 'get_parents()', 'get_children()',
'add_parents()', 'add_children()', 'del_parents()', 'del_children()',
'space', 'project', 'experiment', 'tags',
'set_tags()', 'add_tags()', 'del_tags()',
Swen Vermeul
committed
'add_attachment()', 'get_attachments()', 'download_attachments()',
'save()', 'delete()'
Swen Vermeul
committed
@property
def props(self):
return self.__dict__['p']
Swen Vermeul
committed
@property
def type(self):
@type.setter
def type(self, type_name):
sample_type = self.openbis.get_sample_type(type_name)
self.p.__dict__['_type'] = sample_type
self.a.__dict__['_type'] = sample_type
def __getattr__(self, name):
return getattr(self.__dict__['a'], name)
def __setattr__(self, name, value):
if name in ['set_properties', 'set_tags', 'add_tags']:
raise ValueError("These are methods which should not be overwritten")
setattr(self.__dict__['a'], name, value)
def _repr_html_(self):
return self.a._repr_html_()
def __repr__(self):
return self.a.__repr__()
def set_properties(self, properties):
self.openbis.update_sample(self.permId, properties=properties)
def save(self):
props = self.p._all_props()
Swen Vermeul
committed
if self.is_new:
request["params"][1][0]["properties"] = props
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Sample successfully created.")
new_sample_data = self.openbis.get_sample(resp[0]['permId'], only_data=True)
self._set_data(new_sample_data)
return self
else:
request = self._up_attrs()
self.openbis._post_request(self.openbis.as_v3, request)
print("Sample successfully updated.")
Swen Vermeul
committed
new_sample_data = self.openbis.get_sample(self.permId, only_data=True)
self._set_data(new_sample_data)
self.openbis.delete_entity('sample', self.permId, reason)
def get_datasets(self, **kwargs):
return self.openbis.get_datasets(sample=self.permId, **kwargs)
def get_projects(self, **kwargs):
return self.openbis.get_project(withSamples=[self.permId], **kwargs)
Swen Vermeul
committed
def get_experiment(self):
Swen Vermeul
committed
return self.openbis.get_experiment(self._experiment['identifier'])
except Exception:
pass
@property
def experiment(self):
return self.openbis.get_experiment(self._experiment['identifier'])
except Exception:
pass
Swen Vermeul
committed
""" managing openBIS spaces
"""
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['a'] = AttrHolder(openbis_obj, 'Space' )
if data is not None:
self.a(data)
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __dir__(self):
"""all the available methods and attributes that should be displayed
when using the autocompletion feature (TAB) in Jupyter
"""
return [
'code', 'permId', 'description', 'registrator', 'registrationDate', 'modificationDate',
'get_projects()',
"new_project(code='', description='')",
'get_samples()',
'delete()'
]
def __str__(self):
return self.data.get('code', None)
def get_samples(self, **kwargs):
return self.openbis.get_samples(space=self.code, **kwargs)
get_objects = get_samples #Alias
def get_projects(self, **kwargs):
return self.openbis.get_projects(space=self.code, **kwargs)
def new_project(self, code, description=None, **kwargs):
return self.openbis.new_project(self.code, code, description, **kwargs)
def delete(self, reason):
self.openbis.delete_entity('Space', self.permId, reason)
print("Space {} has been sucessfully deleted.".format(self.permId))
def save(self):
if self.is_new:
request = self._new_attrs()
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Space successfully created.")
new_space_data = self.openbis.get_space(resp[0]['permId'], only_data=True)
self._set_data(new_space_data)
return self
else:
request = self._up_attrs()
self.openbis._post_request(self.openbis.as_v3, request)
print("Space successfully updated.")
new_space_data = self.openbis.get_space(self.permId, only_data=True)
self._set_data(new_space_data)
Chandrasekhar Ramakrishnan
committed
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436
3437
3438
class ExternalDMS():
""" managing openBIS external data management systems
"""
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
if data is not None:
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __getattr__(self, name):
return self.__dict__['data'].get(name)
def __dir__(self):
"""all the available methods and attributes that should be displayed
when using the autocompletion feature (TAB) in Jupyter
"""
return ['code', 'label', 'urlTemplate', 'address', 'addressType', 'openbis']
def __str__(self):
return self.data.get('code', None)
class Things():
"""An object that contains a DataFrame object about an entity available in openBIS.
"""
def __init__(self, openbis_obj, entity, df, identifier_name='code'):
self.df = df
self.identifier_name = identifier_name
return tabulate(self.df, headers=list(self.df))
def __len__(self):
return len(self.df)
def _repr_html_(self):
return self.df._repr_html_()
if self.entity not in ['sample', 'dataset']:
Swen Vermeul
committed
raise ValueError("{}s do not have parents".format(self.entity))
if self.df is not None and len(self.df) > 0:
dfs = []
for ident in self.df[self.identifier_name]:
# get all objects that have this object as a child == parent
try:
parents = getattr(self.openbis, 'get_' + self.entity.lower() + 's')(withChildren=ident, **kwargs)
Swen Vermeul
committed
dfs.append(parents.df)
except ValueError:
pass
if len(dfs) > 0:
return Things(self.openbis, self.entity, pd.concat(dfs), self.identifier_name)
else:
return Things(self.openbis, self.entity, DataFrame(), self.identifier_name)
if self.entity not in ['sample', 'dataset']:
Swen Vermeul
committed
raise ValueError("{}s do not have children".format(self.entity))
if self.df is not None and len(self.df) > 0:
dfs = []
for ident in self.df[self.identifier_name]:
# get all objects that have this object as a child == parent
try:
parents = getattr(self.openbis, 'get_' + self.entity.lower() + 's')(withParent=ident, **kwargs)
Swen Vermeul
committed
dfs.append(parents.df)
except ValueError:
pass
if len(dfs) > 0:
return Things(self.openbis, self.entity, pd.concat(dfs), self.identifier_name)
else:
return Things(self.openbis, self.entity, DataFrame(), self.identifier_name)
Swen Vermeul
committed
if self.entity not in ['space', 'project', 'experiment']:
raise ValueError("{}s do not have samples".format(self.entity))
if self.df is not None and len(self.df) > 0:
dfs = []
for ident in self.df[self.identifier_name]:
args = {}
args[self.entity.lower()] = ident
try:
samples = self.openbis.get_samples(**args, **kwargs)
Swen Vermeul
committed
dfs.append(samples.df)
except ValueError:
pass
if len(dfs) > 0:
return Things(self.openbis, 'sample', pd.concat(dfs), 'identifier')
else:
return Things(self.openbis, 'sample', DataFrame(), 'identifier')
get_objects = get_samples # Alias
if self.entity not in ['sample', 'experiment']:
Swen Vermeul
committed
raise ValueError("{}s do not have datasets".format(self.entity))
if self.df is not None and len(self.df) > 0:
dfs = []
for ident in self.df[self.identifier_name]:
args = {}
args[self.entity.lower()] = ident
try:
datasets = self.openbis.get_datasets(**args, **kwargs)
Swen Vermeul
committed
dfs.append(datasets.df)
except ValueError:
pass
if len(dfs) > 0:
return Things(self.openbis, 'dataset', pd.concat(dfs), 'permId')
else:
return Things(self.openbis, 'dataset', DataFrame(), 'permId')
def __getitem__(self, key):
if self.df is not None and len(self.df) > 0:
row = None
if isinstance(key, int):
# get thing by rowid
row = self.df.loc[[key]]
elif isinstance(key, list):
# treat it as a normal dataframe
return self.df[key]
row = self.df[self.df[self.identifier_name] == key.upper()]
# invoke the openbis.get_entity() method
return getattr(self.openbis, 'get_' + self.entity)(row[self.identifier_name].values[0])
Swen Vermeul
committed
def __iter__(self):
for item in self.df[[self.identifier_name]][self.identifier_name].iteritems():
yield getattr(self.openbis, 'get_' + self.entity)(item[1])
Swen Vermeul
committed
# return self.df[[self.identifier_name]].to_dict()[self.identifier_name]
Swen Vermeul
committed
Swen Vermeul
committed
"""
Swen Vermeul
committed
def __init__(self, openbis_obj, type, data=None, props=None, code=None, **kwargs):
Swen Vermeul
committed
self.__dict__['openbis'] = openbis_obj
self.__dict__['type'] = type
self.__dict__['p'] = PropertyHolder(openbis_obj, type)
self.__dict__['a'] = AttrHolder(openbis_obj, 'Experiment', type)
Swen Vermeul
committed
if data is not None:
self._set_data(data)
Swen Vermeul
committed
if props is not None:
for key in props:
setattr(self.p, key, props[key])
if code is not None:
self.code = code
Swen Vermeul
committed
if kwargs is not None:
Swen Vermeul
committed
defs = _definitions('Experiment')
Swen Vermeul
committed
for key in kwargs:
Swen Vermeul
committed
if key in defs['attrs']:
setattr(self, key, kwargs[key])
else:
raise ValueError("{attr} is not a known attribute for an Experiment".format(attr=key))
Swen Vermeul
committed
def _set_data(self, data):
# assign the attribute data to self.a by calling it
# (invoking the AttrHolder.__call__ function)
self.a(data)
self.__dict__['data'] = data
Swen Vermeul
committed
# put the properties in the self.p namespace (without checking them)
for key, value in data['properties'].items():
self.p.__dict__[key.lower()] = value
Swen Vermeul
committed
def __str__(self):
return self.data['code']
Swen Vermeul
committed
def __dir__(self):
# the list of possible methods/attributes displayed
# when invoking TAB-completition
return [
'props', 'space', 'project',
'project', 'tags', 'attachments', 'data',
'get_datasets()', 'get_samples()',
'set_tags()', 'add_tags()', 'del_tags()',
Swen Vermeul
committed
'add_attachment()', 'get_attachments()', 'download_attachments()',
'save()'
Swen Vermeul
committed
]
@property
def props(self):
return self.__dict__['p']
@property
def type(self):
Swen Vermeul
committed
@type.setter
def type(self, type_name):
experiment_type = self.openbis.get_experiment_type(type_name)
self.p.__dict__['_type'] = experiment_type
self.a.__dict__['_type'] = experiment_type
Swen Vermeul
committed
def __getattr__(self, name):
return getattr(self.__dict__['a'], name)
def __setattr__(self, name, value):
if name in ['set_properties', 'add_tags()', 'del_tags()', 'set_tags()']:
Swen Vermeul
committed
raise ValueError("These are methods which should not be overwritten")
setattr(self.__dict__['a'], name, value)
def _repr_html_(self):
html = self.a._repr_html_()
return html
def set_properties(self, properties):
self.openbis.update_experiment(self.permId, properties=properties)
Swen Vermeul
committed
def save(self):
if self.is_new:
request = self._new_attrs()
props = self.p._all_props()
request["params"][1][0]["properties"] = props
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Experiment successfully created.")
new_exp_data = self.openbis.get_experiment(resp[0]['permId'], only_data=True)
self._set_data(new_exp_data)
return self
Swen Vermeul
committed
else:
request = self._up_attrs()
props = self.p._all_props()
request["params"][1][0]["properties"] = props
self.openbis._post_request(self.openbis.as_v3, request)
print("Experiment successfully updated.")
new_exp_data = self.openbis.get_experiment(resp[0]['permId'], only_data=True)
self._set_data(new_exp_data)
def delete(self, reason):
if self.permId is None:
return None
self.openbis.delete_entity('experiment', self.permId, reason)
if self.permId is None:
return None
return self.openbis.get_datasets(experiment=self.permId, **kwargs)
if self.permId is None:
return None
return self.openbis.get_project(experiment=self.permId, **kwargs)
Swen Vermeul
committed
if self.permId is None:
return None
return self.openbis.get_samples(experiment=self.permId, **kwargs)
Swen Vermeul
committed
get_objects = get_samples # Alias
Swen Vermeul
committed
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
def add_samples(self, *samples):
for sample in samples:
if isinstance(sample, str):
obj = self.openbis.get_sample(sample)
else:
# we assume we got a sample object
obj = sample
# a sample can only belong to exactly one experiment
if obj.experiment is not None:
raise ValueError(
"sample {} already belongs to experiment {}".format(
obj.code, obj.experiment
)
)
else:
if self.is_new:
raise ValueError("You need to save this experiment first before you can assign any samples to it")
else:
# update the sample directly
obj.experiment = self.identifier
obj.save()
self.a.__dict__['_samples'].append(obj._identifier)
add_objects = add_samples # Alias
Swen Vermeul
committed
def del_samples(self, samples):
if not isinstance(samples, list):
samples = [samples]
for sample in samples:
if isinstance(sample, str):
obj = self.openbis.get_sample(sample)
objects.append(obj)
else:
# we assume we got an object
objects.append(obj)
self.samples = objects
del_objects = del_samples # Alias
class Attachment():
def __init__(self, filename, title=None, description=None):
if not os.path.exists(filename):
raise ValueError("File not found: {}".format(filename))
self.fileName = filename
self.title = title
self.description = description
Swen Vermeul
committed
"fileName": self.fileName,
"title": self.title,
"description": self.description,
Swen Vermeul
committed
def get_data(self):
with open(self.fileName, 'rb') as att:
content = att.read()
contentb64 = base64.b64encode(content).decode()
return {
"fileName": self.fileName,
"title": self.title,
"description": self.description,
"content": contentb64,
"@type": "as.dto.attachment.create.AttachmentCreation",
Swen Vermeul
committed
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['a'] = AttrHolder(openbis_obj, 'Project')
if data is not None:
self.a(data)
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __dir__(self):
"""all the available methods and attributes that should be displayed
when using the autocompletion feature (TAB) in Jupyter
"""
return ['code', 'permId', 'identifier', 'description', 'space', 'registrator',
'registrationDate', 'modifier', 'modificationDate', 'add_attachment()',
'get_attachments()', 'download_attachments()',
'get_experiments()', 'get_samples()', 'get_datasets()',
Swen Vermeul
committed
'save()', 'delete()'
def get_samples(self, **kwargs):
return self.openbis.get_samples(project=self.permId, **kwargs)
get_objects = get_samples # Alias
def get_experiments(self):
return self.openbis.get_experiments(project=self.permId)
def get_datasets(self):
return self.openbis.get_datasets(project=self.permId)
def delete(self, reason):
self.openbis.delete_entity('project', self.permId, reason)
if self.is_new:
request = self._new_attrs()
self.openbis._post_request(self.openbis.as_v3, request)
self.a.__dict__['_is_new'] = False
print("Project successfully created.")
request = self._up_attrs()
self.openbis._post_request(self.openbis.as_v3, request)
print("Project successfully updated.")
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
class SemanticAnnotation():
def __init__(self, openbis_obj, isNew=True, **kwargs):
self._openbis = openbis_obj
self._isNew = isNew;
self.permId = kwargs.get('permId')
self.entityType = kwargs.get('entityType')
self.propertyType = kwargs.get('propertyType')
self.predicateOntologyId = kwargs.get('predicateOntologyId')
self.predicateOntologyVersion = kwargs.get('predicateOntologyVersion')
self.predicateAccessionId = kwargs.get('predicateAccessionId')
self.descriptorOntologyId = kwargs.get('descriptorOntologyId')
self.descriptorOntologyVersion = kwargs.get('descriptorOntologyVersion')
self.descriptorAccessionId = kwargs.get('descriptorAccessionId')
self.creationDate = kwargs.get('creationDate')
def __dir__(self):
return ['permId', 'entityType', 'propertyType', 'predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId', 'creationDate', 'save()', 'delete()' ]
def save(self):
if self._isNew:
self._create()
else:
self._update()
def _create(self):
creation = {
"@type": "as.dto.semanticannotation.create.SemanticAnnotationCreation"
}
if self.entityType is not None and self.propertyType is not None:
creation["propertyAssignmentId"] = {
"@type": "as.dto.property.id.PropertyAssignmentPermId",
"entityTypeId" : {
"@type": "as.dto.entitytype.id.EntityTypePermId",
"permId" : self.entityType,
"entityKind" : "SAMPLE"
},
"propertyTypeId" : {
"@type" : "as.dto.property.id.PropertyTypePermId",
"permId" : self.propertyType
}
}
elif self.entityType is not None:
creation["entityTypeId"] = {
"@type": "as.dto.entitytype.id.EntityTypePermId",
"permId" : self.entityType,
"entityKind" : "SAMPLE"
}
elif self.propertyType is not None:
creation["propertyTypeId"] = {
"@type" : "as.dto.property.id.PropertyTypePermId",
"permId" : self.propertyType
}
for attr in ['predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId']:
creation[attr] = getattr(self, attr)
request = {
"method": "createSemanticAnnotations",
"params": [
self._openbis.token,
[creation]
]
}
self._openbis._post_request(self._openbis.as_v3, request)
self._isNew = False
print("Semantic annotation successfully created.")
def _update(self):
update = {
"@type": "as.dto.semanticannotation.update.SemanticAnnotationUpdate",
"semanticAnnotationId" : {
"@type" : "as.dto.semanticannotation.id.SemanticAnnotationPermId",
"permId" : self.permId
}
}
for attr in ['predicateOntologyId', 'predicateOntologyVersion', 'predicateAccessionId', 'descriptorOntologyId', 'descriptorOntologyVersion', 'descriptorAccessionId']:
update[attr] = {
"@type" : "as.dto.common.update.FieldUpdateValue",
"isModified" : True,
"value" : getattr(self, attr)
}
request = {
"method": "updateSemanticAnnotations",
"params": [
self._openbis.token,
[update]
]
}
self._openbis._post_request(self._openbis.as_v3, request)
print("Semantic annotation successfully updated.")
def delete(self, reason):
self._openbis.delete_entity('SemanticAnnotation', self.permId, reason, False)
print("Semantic annotation successfully deleted.")