Newer
Older
self.openbis._post_request(self.openbis.as_v3, request)
print("DataSet successfully updated.")
class AttrHolder():
""" General class for both samples and experiments that hold all common attributes, such as:
- space
Swen Vermeul
committed
- experiment (sample)
- samples (experiment)
Swen Vermeul
committed
- parents (sample, dataset)
- children (sample, dataset)
- tags
"""
def __init__(self, openbis_obj, entity, type=None):
self.__dict__['_openbis'] = openbis_obj
self.__dict__['_entity'] = entity
if type is not None:
self.__dict__['_allowed_attrs'] = _definitions(entity)['attrs']
self.__dict__['_identifier'] = None
self.__dict__['_is_new'] = True
def __call__(self, data):
"""This internal method is invoked when an existing object is loaded.
Instead of invoking a special method we «call» the object with the data
self(data)
which automatically invokes this method.
Since the data comes from openBIS, we do not have to check it (hence the
self.__dict__ statements to prevent invoking the __setattr__ method)
Internally data is stored with an underscore, e.g.
sample._space --> { '@id': 4,
'@type': 'as.dto.space.id.SpacePermId',
'permId': 'MATERIALS' }
but when fetching the attribute without the underscore, we only return
the relevant data for the user:
sample.space --> 'MATERIALS'
"""
self.__dict__['_is_new'] = False
Swen Vermeul
committed
for attr in self._allowed_attrs:
if attr in ["code", "permId", "identifier",
"type", "container", "components"]:
self.__dict__['_' + attr] = data.get(attr, None)
Swen Vermeul
committed
d = data.get(attr, None)
Swen Vermeul
committed
if d is not None:
d = d['permId']
self.__dict__['_' + attr] = d
Swen Vermeul
committed
elif attr in ["sample", "experiment", "project"]:
d = data.get(attr, None)
Swen Vermeul
committed
if d is not None:
d = d['identifier']
self.__dict__['_' + attr] = d
Swen Vermeul
committed
elif attr in ["parents", "children", "samples"]:
self.__dict__['_' + attr] = []
Swen Vermeul
committed
for item in data[attr]:
self.__dict__['_' + attr].append(item['identifier'])
self.__dict__['_' + attr].append(item['permId'])
Swen Vermeul
committed
Swen Vermeul
committed
for item in data[attr]:
Swen Vermeul
committed
"code": item['code'],
"@type": "as.dto.tag.id.TagCode"
})
self.__dict__['_tags'] = tags
self.__dict__['_prev_tags'] = copy.deepcopy(tags)
self.__dict__['_' + attr] = data.get(attr, None)
def _new_attrs(self, method_name=None):
"""Returns the Python-equivalent JSON request when a new object is created.
It is used internally by the save() method of a newly created object.
"""
defs = _definitions(self.entity)
attr2ids = _definitions('attr2ids')
new_obj = {
"@type": "as.dto.{}.create.{}Creation".format(self.entity.lower(), self.entity)
}
for attr in defs['attrs_new']:
items = None
if attr == 'type':
new_obj['typeId'] = self._type['permId']
continue
elif attr == 'attachments':
attachments = getattr(self, '_new_attachments')
if attachments is None:
continue
atts_data = [attachment.get_data() for attachment in attachments]
items = atts_data
elif attr in defs['multi']:
# parents, children, components, container, tags, attachments
items = getattr(self, '_' + attr)
if items is None:
items = []
else:
items = getattr(self, '_' + attr)
key = None
if attr in attr2ids:
# translate parents into parentIds, children into childIds etc.
key = attr2ids[attr]
else:
key = attr
new_obj[key] = items
if method_name is None:
method_name = "create{}s".format(self.entity)
request = {
"method": method_name,
"params": [
self.openbis.token,
[new_obj]
]
}
return request
Swen Vermeul
committed
def _up_attrs(self, method_name=None):
Swen Vermeul
committed
"""Returns the Python-equivalent JSON request when a new object is updated.
It is used internally by the save() method of an object to be updated.
"""
attr2ids = _definitions('attr2ids')
"@type": "as.dto.{}.update.{}Update".format(self.entity.lower(), self.entity),
defs["identifier"]: self._permId
}
# look at all attributes available for that entity
# that can be updated
for attr in defs['attrs_up']:
items = None
if attr == 'attachments':
# v3 API currently only supports adding attachments
attachments = self.__dict__.get('_new_attachments', None)
if attachments is None:
continue
atts_data = [attachment.get_data() for attachment in attachments]
Swen Vermeul
committed
up_obj['attachments'] = {
"actions": [{
"items": atts_data,
"@type": "as.dto.common.update.ListUpdateActionAdd"
}],
"@type": "as.dto.attachment.update.AttachmentListUpdateValue"
}
elif attr == 'tags':
# look which tags have been added or removed and update them
if getattr(self, '_prev_tags') is None:
self.__dict__['_prev_tags'] = []
actions = []
for tagId in self._prev_tags:
if tagId not in self._tags:
actions.append({
"items": [tagId],
"@type": "as.dto.common.update.ListUpdateActionRemove"
})
for tagId in self._tags:
if tagId not in self._prev_tags:
actions.append({
"items": [tagId],
"@type": "as.dto.common.update.ListUpdateActionAdd"
})
"@type": "as.dto.common.update.IdListUpdateValue",
"actions": actions
}
elif '_' + attr in self.__dict__:
# handle multivalue attributes (parents, children, tags etc.)
# we only cover the Set mechanism, which means we always update
# all items in a list
if attr in defs['multi']:
items = self.__dict__.get('_' + attr, [])
if items == None:
items = []
up_obj[attr2ids[attr]] = {
"actions": [
{
"items": items,
"@type": "as.dto.common.update.ListUpdateActionSet",
}
],
"@type": "as.dto.common.update.IdListUpdateValue"
}
else:
# handle single attributes (space, experiment, project, container, etc.)
value = self.__dict__.get('_' + attr, {})
Swen Vermeul
committed
elif len(value) == 0:
# value is {}: it means that we want this attribute to be
# deleted, not updated.
up_obj[attr2ids[attr]] = {
"@type": "as.dto.common.update.FieldUpdateValue",
"isModified": True,
}
elif 'isModified' in value and value['isModified'] == True:
val = {}
for x in ['identifier','permId','@type']:
if x in value:
val[x] = value[x]
"@type": "as.dto.common.update.FieldUpdateValue",
Swen Vermeul
committed
"isModified": True,
"value": val
# update an existing entity
if method_name is None:
method_name = "update{}s".format(self.entity)
request = {
"method": method_name,
"params": [
self.openbis.token,
[up_obj]
return request
def __getattr__(self, name):
""" handles all attribute requests dynamically. Values are returned in a sensible way,
for example the identifiers of parents, children and components are returned
as an array of values.
"""
int_name = '_' + name
if int_name in self.__dict__:
if int_name in ['_attachments']:
return [
"fileName": x['fileName'],
"title": x['title'],
"description": x['description']
} for x in self._attachments
]
if int_name in ['_registrator', '_modifier', '_dataProducer']:
return self.__dict__[int_name].get('userId', None)
Swen Vermeul
committed
elif int_name in ['_registrationDate', '_modificationDate', '_accessDate', '_dataProductionDate']:
return format_timestamp(self.__dict__[int_name])
# if the attribute contains a list,
# return a list of either identifiers, codes or
# permIds (whatever is available first)
elif isinstance(self.__dict__[int_name], list):
values = []
for item in self.__dict__[int_name]:
if "identifier" in item:
values.append(item['identifier'])
elif "code" in item:
values.append(item['code'])
elif "permId" in item:
values.append(item['permId'])
else:
pass
return values
# attribute contains a dictionary: same procedure as above.
elif isinstance(self.__dict__[int_name], dict):
if "identifier" in self.__dict__[int_name]:
return self.__dict__[int_name]['identifier']
elif "code" in self.__dict__[int_name]:
return self.__dict__[int_name]['code']
elif "permId" in self.__dict__[int_name]:
return self.__dict__[int_name]['permId']
else:
return self.__dict__[int_name]
else:
return None
def __setattr__(self, name, value):
"""This method is always invoked whenever we assign an attribute to an
object, e.g.
new_sample.space = 'MATERIALS'
new_sample.parents = ['/MATERIALS/YEAST747']
"""
if name in ["parents", "parent", "children", "child", "components"]:
if name == "parent":
name = "parents"
if name == "child":
name = "children"
if not isinstance(value, list):
value = [value]
objs = []
for val in value:
if isinstance(val, str):
# fetch objects in openBIS, make sure they actually exists
obj = getattr(self._openbis, 'get_' + self._entity.lower())(val)
objs.append(obj)
elif getattr(val, '_permId'):
# we got an existing object
objs.append(val)
permids = []
for item in objs:
permid = item._permId
# remove any existing @id keys to prevent jackson parser errors
permids.append(permid)
Swen Vermeul
committed
# setting self._parents = [{
# '@type': 'as.dto.sample.id.SampleIdentifier',
# 'identifier': '/SPACE_NAME/SAMPLE_NAME'
# }]
self.__dict__['_' + name] = permids
elif name in ["tags"]:
self.set_tags(value)
elif name in ["attachments"]:
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
self.add_attachment(**item)
else:
self.add_attachment(item)
else:
self.add_attachment(value)
Swen Vermeul
committed
elif name in ["space"]:
if value is None:
self.__dict__['_'+name] = None
return
# fetch object in openBIS, make sure it actually exists
obj = getattr(self._openbis, "get_" + name)(value)
self.__dict__['_' + name] = obj.data['permId']
# mark attribute as modified, if it's an existing entity
self.__dict__['_' + name]['isModified'] = True
Swen Vermeul
committed
elif name in ["sample", "experiment", "project"]:
obj = None
if isinstance(value, str):
# fetch object in openBIS, make sure it actually exists
obj = getattr(self._openbis, "get_" + name)(value)
elif value is None:
self.__dict__['_'+name] = {}
return
else:
obj = value
self.__dict__['_' + name] = obj.data['identifier']
# mark attribute as modified, if it's an existing entity
if self.is_new:
pass
else:
self.__dict__['_' + name]['isModified'] = True
elif name in ["identifier"]:
raise KeyError("you can not modify the {}".format(name))
elif name == "code":
if self._type['autoGeneratedCode']:
raise KeyError("This {}Type has auto-generated code. You cannot set a code".format(self.entity))
elif name in [ "description", "userId" ]:
self.__dict__['_'+name] = value
else:
raise KeyError("no such attribute: {}".format(name))
def get_type(self):
Swen Vermeul
committed
def _ident_for_whatever(self, whatever):
if isinstance(whatever, str):
# fetch parent in openBIS, we are given an identifier
obj = getattr(self._openbis, 'get_'+self._entity.lower())(whatever)
else:
# we assume we got an object
obj = whatever
ident = None
if getattr(obj, '_identifier'):
ident = obj._identifier
elif getattr(obj, '_permId'):
ident = obj._permId
if '@id' in ident: ident.pop('@id')
return ident
Swen Vermeul
committed
identifier = self.identifier
if identifier is None:
identifier = self.permId
if identifier is None:
# TODO: if this is a new object, return the list of the parents which have been assigned
pass
else:
return getattr(self._openbis, 'get_' + self._entity.lower() + 's')(withChildren=identifier, **kwargs)
Swen Vermeul
committed
3433
3434
3435
3436
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
def add_parents(self, parents):
if getattr(self, '_parents') is None:
self.__dict__['_parents'] = []
if not isinstance(parents, list):
parents = [parents]
for parent in parents:
self.__dict__['_parents'].append(self._ident_for_whatever(parent))
def del_parents(self, parents):
if getattr(self, '_parents') is None:
return
if not isinstance(parents, list):
parents = [parents]
for parent in parents:
ident = self._ident_for_whatever(parent)
for i, item in enumerate(self.__dict__['_parents']):
if 'identifier' in ident and 'identifier' in item and ident['identifier'] == item['identifier']:
self.__dict__['_parents'].pop(i)
elif 'permId' in ident and 'permId' in item and ident['permId'] == item['permId']:
self.__dict__['_parents'].pop(i)
Swen Vermeul
committed
identifier = self.identifier
if identifier is None:
identifier = self.permId
if identifier is None:
# TODO: if this is a new object, return the list of the children which have been assigned
pass
else:
# e.g. self._openbis.get_samples(withParents=self.identifier)
return getattr(self._openbis, 'get_' + self._entity.lower() + 's')(withParents=identifier, **kwargs)
Swen Vermeul
committed
3466
3467
3468
3469
3470
3471
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
def add_children(self, children):
if getattr(self, '_children') is None:
self.__dict__['_children'] = []
if not isinstance(children, list):
children = [children]
for child in children:
self.__dict__['_children'].append(self._ident_for_whatever(child))
def del_children(self, children):
if getattr(self, '_children') is None:
return
if not isinstance(children, list):
children = [children]
for child in children:
ident = self._ident_for_whatever(child)
for i, item in enumerate(self.__dict__['_children']):
if 'identifier' in ident and 'identifier' in item and ident['identifier'] == item['identifier']:
self.__dict__['_children'].pop(i)
elif 'permId' in ident and 'permId' in item and ident['permId'] == item['permId']:
self.__dict__['_children'].pop(i)
@property
def tags(self):
if getattr(self, '_tags') is not None:
return [x['code'] for x in self._tags]
def set_tags(self, tags):
if getattr(self, '_tags') is None:
self.__dict__['_tags'] = []
tagIds = _create_tagIds(tags)
# remove tags that are not in the new tags list
for tagId in self.__dict__['_tags']:
if tagId not in tagIds:
self.__dict__['_tags'].remove(tagId)
# add all new tags that are not in the list yet
for tagId in tagIds:
if tagId not in self.__dict__['_tags']:
self.__dict__['_tags'].append(tagId)
def add_tags(self, tags):
if getattr(self, '_tags') is None:
self.__dict__['_tags'] = []
# add the new tags to the _tags and _new_tags list,
# if not listed yet
tagIds = _create_tagIds(tags)
for tagId in tagIds:
if not tagId in self.__dict__['_tags']:
self.__dict__['_tags'].append(tagId)
def del_tags(self, tags):
if getattr(self, '_tags') is None:
self.__dict__['_tags'] = []
# remove the tags from the _tags and _del_tags list,
# if listed there
tagIds = _create_tagIds(tags)
for tagId in tagIds:
if tagId in self.__dict__['_tags']:
self.__dict__['_tags'].remove(tagId)
def get_attachments(self):
if getattr(self, '_attachments') is None:
return None
else:
return DataFrame(self._attachments)[['fileName', 'title', 'description', 'version']]
def add_attachment(self, fileName, title=None, description=None):
att = Attachment(filename=fileName, title=title, description=description)
if getattr(self, '_attachments') is None:
self.__dict__['_attachments'] = []
self._attachments.append(att.get_data_short())
if getattr(self, '_new_attachments') is None:
self.__dict__['_new_attachments'] = []
self._new_attachments.append(att)
def download_attachments(self):
method = 'get' + self.entity + 's'
entity = self.entity.lower()
request = {
"method": method,
"params": [
self._openbis.token,
[self._permId],
dict(
attachments=fetch_option['attachmentsWithContent'],
**fetch_option[entity]
)
]
}
resp = self._openbis._post_request(self._openbis.as_v3, request)
attachments = resp[self.permId]['attachments']
file_list = []
for attachment in attachments:
filename = os.path.join(
self._openbis.hostname,
self.permId,
attachment['fileName']
)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as att:
content = base64.b64decode(attachment['content'])
att.write(content)
def _repr_html_(self):
Swen Vermeul
committed
def nvl(val, string=''):
if val is None:
return string
return val
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
Swen Vermeul
committed
for attr in self._allowed_attrs:
if attr == 'attachments':
continue
html += "<tr> <td>{}</td> <td>{}</td> </tr>".format(
attr, nvl(getattr(self, attr, ''), '')
Swen Vermeul
committed
)
if getattr(self, '_attachments') is not None:
Swen Vermeul
committed
html += "<tr><td>attachments</td><td>"
html += "<br/>".join(att['fileName'] for att in self._attachments)
html += "</td></tr>"
html += """
</tbody>
</table>
"""
return html
def __repr__(self):
headers = ['attribute', 'value']
lines = []
for attr in self._allowed_attrs:
if attr == 'attachments':
continue
lines.append([
attr,
nvl(getattr(self, attr, ''))
])
return tabulate(lines, headers=headers)
class Sample():
""" A Sample is one of the most commonly used objects in openBIS.
"""
Swen Vermeul
committed
def __init__(self, openbis_obj, type, data=None, props=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['type'] = type
self.__dict__['p'] = PropertyHolder(openbis_obj, type)
self.__dict__['a'] = AttrHolder(openbis_obj, 'Sample', type)
if data is not None:
self._set_data(data)
Swen Vermeul
committed
if props is not None:
for key in props:
setattr(self.p, key, props[key])
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def _set_data(self, data):
# assign the attribute data to self.a by calling it
# (invoking the AttrHolder.__call__ function)
self.a(data)
self.__dict__['data'] = data
# put the properties in the self.p namespace (without checking them)
for key, value in data['properties'].items():
self.p.__dict__[key.lower()] = value
Swen Vermeul
committed
def __dir__(self):
'props', 'get_parents()', 'get_children()',
'add_parents()', 'add_children()', 'del_parents()', 'del_children()',
'space', 'project', 'experiment', 'tags',
'set_tags()', 'add_tags()', 'del_tags()',
Swen Vermeul
committed
'add_attachment()', 'get_attachments()', 'download_attachments()',
'save()', 'delete()'
Swen Vermeul
committed
@property
def props(self):
return self.__dict__['p']
Swen Vermeul
committed
@property
def type(self):
@type.setter
def type(self, type_name):
sample_type = self.openbis.get_sample_type(type_name)
self.p.__dict__['_type'] = sample_type
self.a.__dict__['_type'] = sample_type
def __getattr__(self, name):
return getattr(self.__dict__['a'], name)
def __setattr__(self, name, value):
if name in ['set_properties', 'set_tags', 'add_tags']:
raise ValueError("These are methods which should not be overwritten")
setattr(self.__dict__['a'], name, value)
def _repr_html_(self):
return self.a._repr_html_()
def __repr__(self):
return self.a.__repr__()
def set_properties(self, properties):
self.openbis.update_sample(self.permId, properties=properties)
def save(self):
props = self.p._all_props()
Swen Vermeul
committed
if self.is_new:
request["params"][1][0]["properties"] = props
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Sample successfully created.")
new_sample_data = self.openbis.get_sample(resp[0]['permId'], only_data=True)
self._set_data(new_sample_data)
return self
else:
request["params"][1][0]["properties"] = props
self.openbis._post_request(self.openbis.as_v3, request)
print("Sample successfully updated.")
Swen Vermeul
committed
new_sample_data = self.openbis.get_sample(self.permId, only_data=True)
self._set_data(new_sample_data)
self.openbis.delete_entity('sample', self.permId, reason)
def get_datasets(self, **kwargs):
return self.openbis.get_datasets(sample=self.permId, **kwargs)
def get_projects(self, **kwargs):
return self.openbis.get_project(withSamples=[self.permId], **kwargs)
Swen Vermeul
committed
def get_experiment(self):
Swen Vermeul
committed
return self.openbis.get_experiment(self._experiment['identifier'])
except Exception:
pass
@property
def experiment(self):
return self.openbis.get_experiment(self._experiment['identifier'])
except Exception:
pass
Swen Vermeul
committed
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
class Person(OpenBisObject):
""" managing openBIS persons
"""
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['a'] = AttrHolder(openbis_obj, 'Person' )
if data is not None:
self.a(data)
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __dir__(self):
"""all the available methods and attributes that should be displayed
when using the autocompletion feature (TAB) in Jupyter
"""
return [
'permId', 'userId', 'firstName', 'lastName', 'email',
'registrator', 'registrationDate','roleAssignments','space'
]
def __str__(self):
return "{} {}".format(self.get('firstName'), self.get('lastName'))
def delete(self, reason):
raise ValueError("Persons cannot be deleted")
def save(self):
if self.is_new:
request = self._new_attrs()
# for new and updated objects, the parameter is
# unfortunately called homeSpaceId, spaceId throws no error
# but makes no change either
if "spaceId" in request['params'][1][0]:
request['params'][1][0]['homeSpaceId'] = request['params'][1][0]['spaceId']
del(request['params'][1][0]['spaceId'])
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Person successfully created.")
new_person_data = self.openbis.get_person(resp[0]['permId'], only_data=True)
self._set_data(new_person_data)
return self
else:
request = self._up_attrs()
# for new and updated objects, the parameter is
# unfortunately called homeSpaceId, spaceId throws no error
# but makes no change either
if "spaceId" in request['params'][1][0]:
request['params'][1][0]['homeSpaceId'] = request['params'][1][0]['spaceId']
del(request['params'][1][0]['spaceId'])
return json.dumps(request)
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
self.openbis._post_request(self.openbis.as_v3, request)
print("Person successfully updated.")
new_person_data = self.openbis.get_person(self.permId, only_data=True)
self._set_data(new_person_data)
class Group(OpenBisObject):
""" Managing openBIS authorization groups
"""
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['a'] = AttrHolder(openbis_obj, 'AuthorizationGroup')
if data is not None:
self.a(data)
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __dir__(self):
return [
'get_users()', 'add_users()', 'del_users()',
'get_roles()', 'add_roles()', 'del_roles()'
]
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
def _repr_html_(self):
def nvl(val, string=''):
if val is None:
return string
return val
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
for attr in self._allowed_attrs:
if attr in ['users','roleAssignments']:
continue
html += "<tr> <td>{}</td> <td>{}</td> </tr>".format(
attr, nvl(getattr(self, attr, ''), '')
)
if getattr(self, '_users') is not None:
html += "<tr><td>Users</td><td>"
html += ", ".join(att['userId'] for att in self._users)
html += "</td></tr>"
html += """
</tbody>
</table>
"""
if getattr(self, '_roleAssignments') is not None:
html += """
<br/>
<b>Role Assignments</b>
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>Role</th>
<th>Role Level</th>
<th>Space</th>
</tr>
</thead>
<tbody>
"""
for roleAssignment in self._roleAssignments:
html += "<tr><td>{}</td><td>{}</td><td>{}</td></tr>".format(
roleAssignment.get('role'),
roleAssignment.get('roleLevel'),
roleAssignment.get('space').get('code'),
)
html += """
</tbody>
</table>
"""
return html
def save(self):
if self.is_new:
request = self._new_attrs()
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Group successfully created.")
# re-fetch group from openBIS
new_data = self.openbis.get_person(resp[0]['permId'], only_data=True)
self._set_data(new_data)
return self
else:
request = self._up_attrs()
self.openbis._post_request(self.openbis.as_v3, request)
print("Group successfully updated.")
# re-fetch group from openBIS
new_data = self.openbis.get_group(self.permId, only_data=True)
self._set_data(new_data)
""" managing openBIS spaces
"""
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['a'] = AttrHolder(openbis_obj, 'Space' )
if data is not None:
self.a(data)
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __dir__(self):
"""all the available methods and attributes that should be displayed
when using the autocompletion feature (TAB) in Jupyter
"""
return [
'code', 'permId', 'description', 'registrator', 'registrationDate', 'modificationDate',
'get_projects()',
"new_project(code='', description='', attachments)",
'get_samples()',
'delete()'
]
def __str__(self):
return self.data.get('code', None)
def get_samples(self, **kwargs):
return self.openbis.get_samples(space=self.code, **kwargs)
get_objects = get_samples #Alias
def get_sample(self, sample_code):
if is_identifier(sample_code) or is_permid(sample_code):
return self.openbis.get_sample(sample_code)
else:
# we assume we just got the code
return self.openbis.get_sample('/{}/{}'.format(self.code,sample_code) )
def get_projects(self, **kwargs):
return self.openbis.get_projects(space=self.code, **kwargs)
def new_project(self, code, description=None, **kwargs):
return self.openbis.new_project(self.code, code, description, **kwargs)
def new_sample(self, **kwargs):
return self.openbis.new_sample(space=self, **kwargs)
def delete(self, reason):
self.openbis.delete_entity('Space', self.permId, reason)
print("Space {} has been sucessfully deleted.".format(self.permId))
def save(self):
if self.is_new:
request = self._new_attrs()
resp = self.openbis._post_request(self.openbis.as_v3, request)
print("Space successfully created.")
new_space_data = self.openbis.get_space(resp[0]['permId'], only_data=True)
self._set_data(new_space_data)
return self
else:
request = self._up_attrs()
self.openbis._post_request(self.openbis.as_v3, request)
print("Space successfully updated.")
new_space_data = self.openbis.get_space(self.permId, only_data=True)
self._set_data(new_space_data)
Chandrasekhar Ramakrishnan
committed
class ExternalDMS():
""" managing openBIS external data management systems
"""
def __init__(self, openbis_obj, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
if data is not None:
self.__dict__['data'] = data
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __getattr__(self, name):
return self.__dict__['data'].get(name)
def __dir__(self):
"""all the available methods and attributes that should be displayed
when using the autocompletion feature (TAB) in Jupyter