Newer
Older
Adam Laskowski
committed
# Copyright ETH 2018 - 2023 Zürich, Scientific IT Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Swen Vermeul
committed
from pandas import DataFrame, Series
from tabulate import tabulate
from .definitions import (
openbis_definitions,
fetch_option,
get_method_for_entity,
get_type_for_entity,
)
from .utils import (
parse_jackson,
check_datatype,
split_identifier,
format_timestamp,
is_identifier,
is_permid,
nvl,
extract_person,
)
Swen Vermeul
committed
from .attachment import Attachment
import copy
import base64
import os
Swen Vermeul
committed
class AttrHolder:
"""General class for both samples and experiments that hold all common attributes, such as:
Swen Vermeul
committed
- space
- project
- experiment (sample)
- samples (experiment)
- dataset
- tags
"""
def __init__(self, openbis_obj, entity, type=None):
self.__dict__["_openbis"] = openbis_obj
self.__dict__["_entity"] = entity
Swen Vermeul
committed
if type is not None:
Swen Vermeul
committed
self.__dict__["_defs"] = openbis_definitions(entity)
# self.__dict__['_allowed_attrs'] = openbis_definitions(entity)['attrs']
# self.__dict__['_allowed_attrs_new'] = openbis_definitions(entity)['attrs_new']
# self.__dict__['_allowed_attrs_up'] = openbis_definitions(entity)['attrs_up']
self.__dict__["_identifier"] = None
self.__dict__["_is_new"] = True
self.__dict__["_tags"] = []
Swen Vermeul
committed
def __call__(self, data):
"""This internal method is invoked when an existing object is loaded.
Instead of invoking a special method we «call» the object with the data
self(data)
which automatically invokes this method.
Since the data comes from openBIS, we do not have to check it (hence the
self.__dict__ statements to prevent invoking the __setattr__ method)
Internally data is stored with an underscore, e.g.
Swen Vermeul
committed
'@type': 'as.dto.space.id.SpacePermId',
Swen Vermeul
committed
}
but when fetching the attribute without the underscore, we only return
the relevant data for the user:
sample.space # MATERIALS
Swen Vermeul
committed
"""
# entity is read from openBIS, so it is not new anymore
Swen Vermeul
committed
if attr in ["code", "permId", "identifier", "type"]:
Swen Vermeul
committed
# remove the @id attribute
if isinstance(self.__dict__["_" + attr], dict):
self.__dict__["_" + attr].pop("@id", None)
Swen Vermeul
committed
elif attr in ["vocabularyCode"]:
self.__dict__["_" + attr] = data.get("permId", {}).get(attr, None)
d = data.get(attr, None)
if d is not None:
Swen Vermeul
committed
elif attr in ["space"]:
d = data.get(attr, None)
if d is not None:
Swen Vermeul
committed
elif attr in ["sample", "experiment", "project", "container"]:
Swen Vermeul
committed
d = data.get(attr, None)
if d is not None:
d = d["identifier"]
self.__dict__["_" + attr] = d
Swen Vermeul
committed
elif attr in ["parents", "children", "samples", "components", "containers"]:
if data[attr] is not None:
for item in data[attr]:
try:
if "identifier" in item:
self.__dict__["_" + attr].append(item["identifier"])
elif "permId" in item:
self.__dict__["_" + attr].append(item["permId"])
except Exception:
# TODO: under certain circumstances, openBIS only delivers an integer
pass
Swen Vermeul
committed
elif attr in ["tags"]:
elif attr.endswith("Date"):
self.__dict__["_" + attr] = format_timestamp(data.get(attr))
elif attr in ["registrator", "modifier", "dataProducer", "owner"]:
self.__dict__["_" + attr] = extract_person(data.get(attr))
Swen Vermeul
committed
else:
Swen Vermeul
committed
def _new_attrs(self, method_name=None):
"""Returns the Python-equivalent JSON request when a new object is created.
It is used internally by the save() method of a newly created object.
"""
attr2ids = openbis_definitions("attr2ids")
new_obj = get_type_for_entity(self.entity, "create")
Swen Vermeul
committed
Swen Vermeul
committed
items = None
if attr == "type":
new_obj["typeId"] = self._type["permId"]
Swen Vermeul
committed
continue
# when creating a new dataset, the attribute «kind» is called «dataSetKind»
elif attr == "attachments":
attachments = getattr(self, "_new_attachments")
Swen Vermeul
committed
if attachments is None:
continue
atts_data = [attachment.get_data() for attachment in attachments]
items = atts_data
elif attr == "userIds":
if "_changed_users" not in self.__dict__:
Swen Vermeul
committed
continue
new_obj[attr] = []
for userId in self.__dict__["_changed_users"]:
if self.__dict__["_changed_users"][userId]["action"] == "Add":
new_obj[attr].append(
{"permId": userId, "@type": "as.dto.person.id.PersonPermId"}
)
Swen Vermeul
committed
else:
Swen Vermeul
committed
key = None
if attr in attr2ids:
# translate parents into parentIds, children into childIds etc.
key = attr2ids[attr]
else:
key = attr
new_obj[key] = items
# if method_name is not defined: guess the method name for creating a new entity
Swen Vermeul
committed
if method_name is None:
method_name = get_method_for_entity(self.entity, "create")
request = {"method": method_name, "params": [self.openbis.token, [new_obj]]}
Swen Vermeul
committed
return request
def _up_attrs(self, method_name=None, permId=None):
Swen Vermeul
committed
"""Returns the Python-equivalent JSON request when a new object is updated.
It is used internally by the save() method of an object to be updated.
"""
# defs = openbis_definitions(self._entity)
attr2ids = openbis_definitions("attr2ids")
Swen Vermeul
committed
up_obj = get_type_for_entity(self.entity, "update")
# for some weird reasons, the permId is called differently
# for every openBIS entity, but only when updating...
identifier_name = self._defs["identifier"]
if permId:
up_obj[identifier_name] = permId
else:
up_obj[identifier_name] = self._permId
Swen Vermeul
committed
# look at all attributes available for that entity
# that can be updated
Swen Vermeul
committed
items = None
Swen Vermeul
committed
# v3 API currently only supports adding attachments
attachments = self.__dict__.get("_new_attachments", None)
Swen Vermeul
committed
if attachments is None:
continue
atts_data = [attachment.get_data() for attachment in attachments]
up_obj["attachments"] = {
"actions": [
{
"items": atts_data,
"@type": "as.dto.common.update.ListUpdateActionAdd",
}
],
"@type": "as.dto.attachment.update.AttachmentListUpdateValue",
Swen Vermeul
committed
}
for tag in self.__dict__["_tags"]:
items.append(
{"permId": tag["permId"], "@type": "as.dto.tag.id.TagPermId"}
)
Swen Vermeul
committed
"actions": [
{
"items": items,
"@type": "as.dto.common.update.ListUpdateActionSet",
}
],
"@type": "as.dto.common.update.IdListUpdateValue",
Swen Vermeul
committed
}
# ListUpdateMapValues
metaData = self.__dict__["_" + attr]
if metaData:
items = [metaData]
data_type = "as.dto.common.update.ListUpdateActionSet"
elif metaData is not None and len(metaData) == 0:
# metaData needs to be set to {} in order to remove it.
items = ["custom_widget"]
data_type = "as.dto.common.update.ListUpdateActionRemove"
up_obj[attr] = {
"actions": [{"items": items, "@type": data_type}],
"@type": "as.dto.common.update.ListUpdateMapValues",
}
elif attr == "userIds":
Swen Vermeul
committed
actions = []
Swen Vermeul
committed
continue
for userId in self.__dict__["_changed_users"]:
actions.append(
{
"items": [
{
"permId": userId,
"@type": "as.dto.person.id.PersonPermId",
}
],
"@type": f'as.dto.common.update.ListUpdateAction{self.__dict__["_changed_users"][userId]["action"]}',
Swen Vermeul
committed
Swen Vermeul
committed
"actions": actions,
"@type": "as.dto.common.update.IdListUpdateValue",
Swen Vermeul
committed
}
elif (
attr
in "description label official ordinal autoGeneratedCode subcodeUnique listable showContainer showParents showParentMetadata disallowDeletion validationPlugin".split()
):
key = attr2ids.get(attr, attr)
up_obj[key] = {
Swen Vermeul
committed
"isModified": True,
Swen Vermeul
committed
}
Swen Vermeul
committed
# handle multivalue attributes (parents, children, tags etc.)
# we only cover the Set mechanism, which means we always update
Swen Vermeul
committed
# all items in a list
if "multi" in self._defs and attr in self._defs["multi"]:
items = self.__dict__.get("_" + attr, [])
Adam Laskowski
committed
if items is None:
# 'None' multivalue attributes should be omitted during update
continue
Swen Vermeul
committed
up_obj[attr2ids[attr]] = {
"actions": [
{
"items": items,
"@type": "as.dto.common.update.ListUpdateActionSet",
}
],
"@type": "as.dto.common.update.IdListUpdateValue",
Swen Vermeul
committed
}
else:
# handle single attributes (space, experiment, project, container, etc.)
Swen Vermeul
committed
if value is None:
pass
elif isinstance(value, bool):
# for boolean values where no type is needed
Swen Vermeul
committed
# value is {}: it means that we want this attribute to be
# deleted, not updated.
up_obj[attr2ids[attr]] = {
"@type": "as.dto.common.update.FieldUpdateValue",
"isModified": True,
}
elif "isModified" in value and value["isModified"] == True:
Swen Vermeul
committed
val = {}
Swen Vermeul
committed
if x in value:
val[x] = value[x]
up_obj[attr2ids[attr]] = {
"@type": "as.dto.common.update.FieldUpdateValue",
"isModified": True,
Swen Vermeul
committed
}
# update an existing entity
if method_name is None:
method_name = get_method_for_entity(self.entity, "update")
request = {"method": method_name, "params": [self.openbis.token, [up_obj]]}
Swen Vermeul
committed
return request
def __getattr__(self, name):
Swen Vermeul
committed
Values are returned in a sensible way, for example:
the identifiers of parents, children and components are returned as an
array of values, whereas attachments, users (of groups) and
roleAssignments are returned as an array of dictionaries.
"""
name_map = {
"group": "authorizationGroup",
"roles": "roleAssignments",
"permid": "permId",
"collection": "experiment",
"object": "sample",
Swen Vermeul
committed
}
if name in name_map:
name = name_map[name]
Swen Vermeul
committed
if int_name in self.__dict__:
Swen Vermeul
committed
attachments = []
for att in self._attachments:
attachments.append(
{
"fileName": att.get("fileName"),
"title": att.get("title"),
"description": att.get("description"),
"version": att.get("version"),
}
)
Swen Vermeul
committed
return attachments
Swen Vermeul
committed
users = []
for user in self._users:
users.append(
{
"firstName": user.get("firstName"),
"lastName": user.get("lastName"),
"email": user.get("email"),
"userId": user.get("userId"),
"space": user.get("space").get("code")
if user.get("space") is not None
else None,
}
)
Swen Vermeul
committed
return users
Swen Vermeul
committed
ras = []
for ra in self._roleAssignments:
ras.append(
{
"techId": ra.get("id").get("techId"),
"role": ra.get("role"),
"roleLevel": ra.get("roleLevel"),
"space": ra.get("space").get("code"),
"project": ra.get("role"),
}
)
Swen Vermeul
committed
return ras
Swen Vermeul
committed
# return a list of either identifiers, codes or
# permIds (whatever is available first)
elif isinstance(self.__dict__[int_name], list):
values = []
for item in self.__dict__[int_name]:
if "identifier" in item:
Swen Vermeul
committed
elif "code" in item:
Swen Vermeul
committed
elif "permId" in item:
Swen Vermeul
committed
else:
Swen Vermeul
committed
return values
# attribute contains a dictionary: same procedure as above.
elif isinstance(self.__dict__[int_name], dict):
if "identifier" in self.__dict__[int_name]:
Swen Vermeul
committed
elif "code" in self.__dict__[int_name]:
elif "name" in self.__dict__[int_name]:
elif "userId" in self.__dict__[int_name]:
Swen Vermeul
committed
elif "permId" in self.__dict__[int_name]:
Swen Vermeul
committed
elif "id" in self.__dict__[int_name]:
Swen Vermeul
committed
else:
return self.__dict__[int_name]
else:
return None
def __setattr__(self, name, value):
"""This method is always invoked whenever we assign an attribute to an
object, e.g.
new_sample.space = 'MATERIALS'
new_sample.parents = ['/MATERIALS/YEAST747']
"""
Swen Vermeul
committed
# experiment aka collection, sample aka object
name_map = {
"collection": "experiment",
"object": "sample",
"parent": "parents",
"child": "children",
Swen Vermeul
committed
}
if name in name_map:
name = name_map[name]
if self._is_new:
f'No such attribute: «{name}» for entity: {self.entity}. Allowed attributes are: {self._defs["attrs_new"]}'
Swen Vermeul
committed
else:
f'No such attribute: «{name}» for entity: {self.entity}. Allowed attributes are: {self._defs["attrs_up"]}'
Swen Vermeul
committed
if name in ["parents", "children", "components"]:
Swen Vermeul
committed
if not isinstance(value, list):
value = [value]
objs = []
for val in value:
if isinstance(val, str):
# fetch objects in openBIS, make sure they actually exists
obj = getattr(self._openbis, "get_" + self._entity.lower())(val)
Swen Vermeul
committed
objs.append(obj)
Swen Vermeul
committed
# we got an existing object
objs.append(val)
permids = []
for item in objs:
Swen Vermeul
committed
id = item._identifier
Swen Vermeul
committed
id = item._permId
else:
return
Swen Vermeul
committed
# remove any existing @id keys to prevent jackson parser errors
Swen Vermeul
committed
permids.append(id)
Swen Vermeul
committed
Swen Vermeul
committed
elif name in ["tags"]:
self.set_tags(value)
elif name in ["users"]:
self.set_users(value)
elif name in ["vocabulary"]:
if value is None or value == "":
self.__dict__["_vocabulary"] = None
"@type": "as.dto.vocabulary.id.VocabularyPermId",
elif name in ["validationPlugin"]:
if value is None or value == "":
self.__dict__["_validationPlugin"] = None
else:
"@type": "as.dto.plugin.id.PluginPermId",
elif name in ["materialType"]:
if value is None or value == "":
self.__dict__["_materialType"] = None
"@type": "as.dto.entitytype.id.EntityTypePermId",
"permId": value.upper(),
Swen Vermeul
committed
elif name in ["attachments"]:
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
self.add_attachment(**item)
else:
self.add_attachment(item)
else:
self.add_attachment(value)
elif name in ["space"]:
obj = None
if value is None:
Swen Vermeul
committed
return
if isinstance(value, str):
# fetch object in openBIS, make sure it actually exists
obj = getattr(self._openbis, "get_" + name)(value)
else:
obj = value
Swen Vermeul
committed
# mark attribute as modified, if it's an existing entity
if self.is_new:
pass
else:
Swen Vermeul
committed
elif name in ["sample", "experiment", "project"]:
obj = None
if isinstance(value, str):
# fetch object in openBIS, make sure it actually exists
obj = getattr(self._openbis, "get_" + name)(value)
elif value is None:
Swen Vermeul
committed
return
else:
obj = value
self.__dict__["_" + name] = obj.data["identifier"]
Swen Vermeul
committed
# mark attribute as modified, if it's an existing entity
if self.is_new:
pass
else:
Swen Vermeul
committed
elif name in ["identifier"]:
raise KeyError(f"you can not modify the {name}")
Swen Vermeul
committed
elif name == "code":
try:
if self._type["autoGeneratedCode"]:
raise KeyError(
f"This {self.entity}Type has auto-generated code. You cannot set a code"
Swen Vermeul
committed
except KeyError:
pass
except TypeError:
pass
Swen Vermeul
committed
elif name in ["userId", "description"]:
# values that are directly assigned
Swen Vermeul
committed
elif name in ["userIds"]:
self.add_users(value)
elif name in self._defs:
# enum: check whether value is a valid constant
if value and value not in self._defs[name]:
raise ValueError(
f"Allowed values for enum {name} are: {self._defs[name]}"
Swen Vermeul
committed
else:
Swen Vermeul
committed
def get_type(self):
return self._type
def _ident_for_whatever(self, whatever):
if isinstance(whatever, str):
# fetch parent in openBIS, we are given an identifier
obj = getattr(self._openbis, "get_" + self._entity.lower())(whatever)
Swen Vermeul
committed
else:
# we assume we got an object
obj = whatever
ident = None
Swen Vermeul
committed
ident = obj._identifier
Swen Vermeul
committed
ident = obj._permId
Swen Vermeul
committed
return ident
def get_container(self, **kwargs):
return getattr(self._openbis, "get_" + self._entity.lower())(
self.container, **kwargs
)
def get_containers(self, **kwargs):
"""get the containers and return them as a list (Things/DataFrame)
or return empty list
"""
return getattr(self._openbis, "get_" + self._entity.lower())(
self.containers, **kwargs
)
def set_containers(self, containers_to_set):
"""set the new _containers list"""
self.__dict__["_containers"] = []
self.add_containers(containers_to_set)
def add_containers(self, containers_to_add):
if not isinstance(containers_to_add, list):
containers_to_add = [containers_to_add]
for component in containers_to_add:
ident = self._ident_for_whatever(component)
if ident not in self.__dict__["_containers"]:
self.__dict__["_containers"].append(ident)
def del_containers(self, containers_to_remove):
if not isinstance(containers_to_remove, list):
containers_to_remove = [containers_to_remove]
for component in containers_to_remove:
ident = self._ident_for_whatever(component)
for i, item in enumerate(self.__dict__["_containers"]):
if (
"identifier" in ident
and "identifier" in item
and ident["identifier"] == item["identifier"]
):
self.__dict__["_containers"].pop(i, None)
elif (
"permId" in ident
and "permId" in item
and ident["permId"] == item["permId"]
):
self.__dict__["_containers"].pop(i, None)
def get_components(self, **kwargs):
"""Samples and DataSets may contain other DataSets and Samples. This function returns the
Swen Vermeul
committed
contained Samples/DataSets (a.k.a. components) as a list (Things/DataFrame)
"""
return getattr(self._openbis, "get_" + self._entity.lower())(
self.components, **kwargs
)
Swen Vermeul
committed
get_contained = get_components # Alias
def set_components(self, components_to_set):
"""Samples and DataSets may contain other DataSets and Samples. This function sets the
Swen Vermeul
committed
contained Samples/DataSets (a.k.a. components)
self.add_components(components_to_set)
Swen Vermeul
committed
set_contained = set_components # Alias
def add_components(self, components_to_add):
"""Samples and DataSets may contain other DataSets and Samples. This function adds
Swen Vermeul
committed
additional Samples/DataSets to the current object.
if not isinstance(components_to_add, list):
components_to_add = [components_to_add]
for component in components_to_add:
ident = self._ident_for_whatever(component)
if ident not in self.__dict__["_components"]:
self.__dict__["_components"].append(ident)
Swen Vermeul
committed
add_contained = add_components # Alias
def del_components(self, components_to_remove):
"""Samples and DataSets may contain other DataSets and Samples. This function removes
Swen Vermeul
committed
additional Samples/DataSets from the current object.
if not isinstance(components_to_remove, list):
components_to_remove = [components_to_remove]
for component in components_to_remove:
ident = self._ident_for_whatever(component)
for i, item in enumerate(self.__dict__["_components"]):
if (
"identifier" in ident
and "identifier" in item
and ident["identifier"] == item["identifier"]
):
self.__dict__["_components"].pop(i, None)
elif (
"permId" in ident
and "permId" in item
and ident["permId"] == item["permId"]
):
self.__dict__["_components"].pop(i, None)
Swen Vermeul
committed
del_contained = del_components # Alias
Swen Vermeul
committed
def get_parents(self, **kwargs):
"""get the current parents and return them as a list (Things/DataFrame)
Swen Vermeul
committed
or return empty list
"""
return getattr(self._openbis, "get_" + self._entity.lower())(
self.parents, **kwargs
)
Swen Vermeul
committed
def set_parents(self, parents_to_set):
"""set the new _parents list"""
self.__dict__["_parents"] = []
Swen Vermeul
committed
self.add_parents(parents_to_set)
def add_parents(self, parents_to_add):
Swen Vermeul
committed
if not isinstance(parents_to_add, list):
parents_to_add = [parents_to_add]
Adam Laskowski
committed
if self.__dict__["_parents"] is None:
self.__dict__["_parents"] = []
Swen Vermeul
committed
for parent in parents_to_add:
ident = self._ident_for_whatever(parent)
if ident not in self.__dict__["_parents"]:
self.__dict__["_parents"].append(ident)
Swen Vermeul
committed
def del_parents(self, parents_to_remove):
Swen Vermeul
committed
if not isinstance(parents_to_remove, list):
parents_to_remove = [parents_to_remove]
Adam Laskowski
committed
if self.__dict__["_parents"] is None:
self.__dict__["_parents"] = []
Swen Vermeul
committed
for parent in parents_to_remove:
Swen Vermeul
committed
ident = self._ident_for_whatever(parent)
for i, item in enumerate(self.__dict__["_parents"]):
if (
"identifier" in ident
and "identifier" in item
and ident["identifier"] == item["identifier"]
):
self.__dict__["_parents"].pop(i)
elif (
"permId" in ident
and "permId" in item
and ident["permId"] == item["permId"]
):
self.__dict__["_parents"].pop(i)
Swen Vermeul
committed
Swen Vermeul
committed
def get_children(self, **kwargs):
"""get the current children and return them as a list (Things/DataFrame)
Swen Vermeul
committed
or return empty list
"""
return getattr(self._openbis, "get_" + self._entity.lower())(
self.children, **kwargs
)
Swen Vermeul
committed
Swen Vermeul
committed
def set_children(self, children_to_set):
"""set the new _children list"""
self.__dict__["_children"] = []
Swen Vermeul
committed
self.add_children(children_to_set)
Swen Vermeul
committed
def add_children(self, children):
"""add children to _children list"""
if getattr(self, "_children") is None:
self.__dict__["_children"] = []
Adam Laskowski
committed
if self.__dict__["_children"] is None:
self.__dict__["_children"] = []
Swen Vermeul
committed
if not isinstance(children, list):
children = [children]
for child in children:
self.__dict__["_children"].append(self._ident_for_whatever(child))
Swen Vermeul
committed
def del_children(self, children):
"""remove children from _children list"""
if getattr(self, "_children") is None:
Swen Vermeul
committed
return
if not isinstance(children, list):
children = [children]
Adam Laskowski
committed
if self.__dict__["_children"] is None:
self.__dict__["_children"] = []
Swen Vermeul
committed
for child in children:
ident = self._ident_for_whatever(child)
for i, item in enumerate(self.__dict__["_children"]):
if (
"identifier" in ident
and "identifier" in item
and ident["identifier"] == item["identifier"]
):
self.__dict__["_children"].pop(i, None)
elif (
"permId" in ident
and "permId" in item
and ident["permId"] == item["permId"]
):
self.__dict__["_children"].pop(i, None)
Swen Vermeul
committed
@property
def tags(self):
if getattr(self, "_tags") is not None:
return [x["code"] for x in self._tags]
Swen Vermeul
committed
if getattr(self, "_tags") is not None:
return self._openbis.get_tag([x["permId"] for x in self._tags])
Swen Vermeul
committed
def set_tags(self, tags):
Swen Vermeul
committed
Swen Vermeul
committed
if not isinstance(tags, list):
tags = [tags]
for tag in tags:
if isinstance(tag, str):
tag_obj = self._openbis.get_tag(tag)
tag_dict = {
"code": tag_obj.code,
"permId": tag_obj.permId,
}
else:
tag_dict = {
"code": tag["code"],
"permId": tag["permId"]["permId"],
if tag_dict not in self.__dict__["_tags"]:
self.__dict__["_tags"].append(tag_dict)
if not isinstance(tags, list):
tags = [tags]
for tag in tags:
for i, tag_dict in enumerate(self.__dict__["_tags"]):
if tag in tag_dict[i]["code"] or tag in tag_dict[i]["permId"]:
Swen Vermeul
committed
def set_users(self, userIds):
if userIds is None:
return
if getattr(self, "_userIds") is None:
self.__dict__["_userIds"] = []
Swen Vermeul
committed
if not isinstance(userIds, list):
userIds = [userIds]
for userId in userIds:
self.__dict__["_userIds"].append(
{"permId": userId, "@type": "as.dto.person.id.PersonPermId"}
)
Swen Vermeul
committed
def add_users(self, userIds):
if userIds is None:
return
if getattr(self, "_changed_users") is None:
self.__dict__["_changed_users"] = {}
Swen Vermeul
committed
if not isinstance(userIds, list):
userIds = [userIds]
for userId in userIds:
self.__dict__["_changed_users"][userId] = {"action": "Add"}
Swen Vermeul
committed
def del_users(self, userIds):
if userIds is None:
return
if getattr(self, "_changed_users") is None:
self.__dict__["_changed_users"] = {}
Swen Vermeul
committed
if not isinstance(userIds, list):
userIds = [userIds]
for userId in userIds:
self.__dict__["_changed_users"][userId] = {"action": "Remove"}
del_members = del_users # Alias
Swen Vermeul
committed
def get_attachments(self):
Swen Vermeul
committed
return None
else:
return DataFrame(self._attachments)[
["fileName", "title", "description", "version"]
]
Swen Vermeul
committed
def add_attachment(self, fileName, title=None, description=None):
att = Attachment(filename=fileName, title=title, description=description)
if getattr(self, "_attachments") is None:
self.__dict__["_attachments"] = []
Swen Vermeul
committed
self._attachments.append(att.get_data_short())
if getattr(self, "_new_attachments") is None:
self.__dict__["_new_attachments"] = []
Swen Vermeul
committed
self._new_attachments.append(att)
def download_attachments(self, destination_folder=None):
method = "get" + self.entity.lower().capitalize() + "s"
Swen Vermeul
committed
request = {
"method": method,
"params": [
self._openbis.token,
[self._permId],
{
**fetch_option[self.entity.lower()],
"attachments": fetch_option["attachmentsWithContent"],
},
Swen Vermeul
committed
}
resp = self._openbis._post_request(self._openbis.as_v3, request)
Swen Vermeul
committed
file_list = []
if destination_folder is None:
destination_folder = os.getcwd()
Swen Vermeul
committed
for attachment in attachments:
filename = (
pathlib.Path(destination_folder)
/ pathlib.Path(attachment["fileName"]).name
Swen Vermeul
committed
)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "wb") as att:
content = base64.b64decode(attachment["content"])
Swen Vermeul
committed
att.write(content)
file_list.append(str(filename))
Swen Vermeul
committed
return file_list
"""Return all attributes of an entity in a dict"""
for attr in self._defs["attrs"]:
if attr == "attachments":
continue
attrs[attr] = getattr(self, attr)
return attrs
Swen Vermeul
committed
def _repr_html_(self):
Swen Vermeul
committed
if val is None:
return string
return val
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
attrs = self._defs["attrs_new"] if self.is_new else self._defs["attrs"]
Swen Vermeul
committed
for attr in attrs: