Newer
Older
raise ValueError("login to openBIS failed")
else:
self.token = result
Swen Vermeul
committed
if save_token:
self.save_token()
return self.token
def create_permId(self):
Chandrasekhar Ramakrishnan
committed
"""Have the server generate a new permId"""
Chandrasekhar Ramakrishnan
committed
# Request just 1 permId
request = {
"method": "createPermIdStrings",
"params": [self.token, 1],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
return resp[0]
else:
raise ValueError("Could not create permId")
Chandrasekhar Ramakrishnan
committed
def get_datastores(self):
""" Get a list of all available datastores. Usually there is only one, but in some cases
there might be multiple servers. If you upload a file, you need to specifiy the datastore you want
request = {
"method": "listDataStores",
"params": [self.token],
}
resp = self._post_request(self.as_v1, request)
if resp is not None:
return DataFrame(resp)[['code', 'downloadUrl', 'hostUrl']]
raise ValueError("No datastore found!")
def new_person(self, userId, space=None):
""" creates an openBIS person
"""
try:
person = self.get_person(userId=userId)
except Exception:
return Person(self, userId=userId, space=space)
raise ValueError(
"There already exists a user with userId={}".format(userId)
)
def new_group(self, code, description=None, users=None):
""" creates an openBIS person
"""
return Group(self, code=code, description=description, users=users)
def get_group(self, groupId, only_data=False):
""" Get an openBIS AuthorizationGroup. Returns a Group object.
ids = [{
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId",
"permId": groupId
}]
fetchopts = {}
for option in ['roleAssignments', 'users', 'registrator']:
fetchopts[option] = fetch_option[option]
fetchopts['users']['space'] = fetch_option['space']
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
request = {
"method": "getAuthorizationGroups",
"params": [
self.token,
ids,
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No group found!")
for permid in resp:
group = resp[permid]
parse_jackson(group)
if only_data:
return group
else:
return Group(self, data=group)
Swen Vermeul
committed
def get_role_assignments(self, **search_args):
""" Get the assigned roles for a given group, person or space
"""
search_criteria = get_search_type_for_entity('roleAssignment', 'AND')
Swen Vermeul
committed
allowed_search_attrs = ['role', 'roleLevel', 'user', 'group', 'person', 'space']
sub_crit = []
for attr in search_args:
if attr in allowed_search_attrs:
if attr == 'space':
sub_crit.append(
_subcriteria_for_code(search_args[attr], 'space')
)
Swen Vermeul
committed
elif attr == 'person':
userId = ''
if isinstance(search_args[attr], str):
userId = search_args[attr]
else:
userId = search_args[attr].userId
sub_crit.append(
_subcriteria_for_userId(userId)
)
elif attr == 'group':
groupId = ''
if isinstance(search_args[attr], str):
groupId = search_args[attr]
else:
groupId = search_args[attr].code
_subcriteria_for_permid(groupId, 'AuthorizationGroup')
)
elif attr == 'role':
# TODO
raise ValueError("not yet implemented")
elif attr == 'roleLevel':
# TODO
raise ValueError("not yet implemented")
else:
pass
else:
Swen Vermeul
committed
raise ValueError("unknown search argument {}".format(attr))
search_criteria['criteria'] = sub_crit
fetchopts = {}
for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchRoleAssignments",
"params": [
self.token,
search_criteria,
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
raise ValueError("No assigned roles found!")
objects = resp['objects']
parse_jackson(objects)
roles = DataFrame(objects)
Swen Vermeul
committed
roles['techId'] = roles['id'].map(extract_id)
roles['user'] = roles['user'].map(extract_userId)
roles['group'] = roles['authorizationGroup'].map(extract_code)
roles['space'] = roles['space'].map(extract_code)
p = Things(
Swen Vermeul
committed
self, entity='role_assignment',
df=roles[['techId', 'role', 'roleLevel', 'user', 'group', 'space', 'project']],
identifier_name='techId'
Swen Vermeul
committed
def get_role_assignment(self, techId, only_data=False):
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
""" Fetches one assigned role by its techId.
"""
fetchopts = {}
for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
fetchopts[option] = fetch_option[option]
request = {
"method": "getRoleAssignments",
"params": [
self.token,
[{
"techId": techId,
"@type": "as.dto.roleassignment.id.RoleAssignmentTechId"
}],
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No assigned role found for techId={}".format(techId))
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
for id in resp:
data = resp[id]
parse_jackson(data)
if only_data:
return data
else:
return RoleAssignment(self, data=data)
def assign_role(self, role, **args):
""" general method to assign a role to either
- a person
- a group
The scope is either
- the whole instance
- a space
- a project
"""
userId = None
groupId = None
spaceId = None
projectId = None
for arg in args:
if arg in ['person', 'group', 'space', 'project']:
permId = args[arg] if isinstance(args[arg],str) else args[arg].permId
if arg == 'person':
userId = {
"permId": permId,
"@type": "as.dto.person.id.PersonPermId"
}
elif arg == 'group':
groupId = {
"permId": permId,
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId"
}
elif arg == 'space':
spaceId = {
"permId": permId,
"@type": "as.dto.space.id.SpacePermId"
}
elif arg == 'project':
projectId = {
"permId": permId,
"@type": "as.dto.project.id.ProjectPermId"
}
request = {
"method": "createRoleAssignments",
"params": [
self.token,
[
{
"role": role,
"userId": userId,
"authorizationGroupId": groupId,
"spaceId": spaceId,
"projectId": projectId,
"@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
}
]
]
}
resp = self._post_request(self.as_v3, request)
return
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
def get_groups(self, **search_args):
""" Get openBIS AuthorizationGroups. Returns a «Things» object.
Usage::
groups = e.get.groups()
groups[0] # select first group
groups['GROUP_NAME'] # select group with this code
for group in groups:
... # a Group object
groups.df # get a DataFrame object of the group list
print(groups) # print a nice ASCII table (eg. in IPython)
groups # HTML table (in a Jupyter notebook)
"""
criteria = []
for search_arg in ['code', 'description', 'registrator']:
if search_arg in search_args:
pass
#sub_crit = get_search_type_for_entity(search_arg)
#criteria.append(sub_crit)
search_criteria = get_search_type_for_entity('authorizationGroup')
search_criteria['criteria'] = criteria
fetchopts = fetch_option['authorizationGroup']
for option in ['roleAssignments', 'registrator', 'users']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchAuthorizationGroups",
"params": [
self.token,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
raise ValueError("No groups found!")
objects = resp['objects']
parse_jackson(objects)
groups['permId'] = groups['permId'].map(extract_permid)
groups['registrator'] = groups['registrator'].map(extract_person)
groups['users'] = groups['users'].map(extract_userId)
groups['registrationDate'] = groups['registrationDate'].map(format_timestamp)
groups['modificationDate'] = groups['modificationDate'].map(format_timestamp)
self, entity='group',
df=groups[['permId', 'code', 'description', 'users', 'registrator', 'registrationDate', 'modificationDate']],
identifier_name='permId'
)
return p
def get_persons(self, **search_args):
""" Get openBIS users
"""
search_criteria = get_search_criteria('person', **search_args)
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
fetchopts = {}
for option in ['space']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchPersons",
"params": [
self.token,
search_criteria,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
raise ValueError("No persons found!")
objects = resp['objects']
parse_jackson(objects)
persons = DataFrame(resp['objects'])
persons['permId'] = persons['permId'].map(extract_permid)
persons['registrationDate'] = persons['registrationDate'].map(format_timestamp)
persons['space'] = persons['space'].map(extract_nested_permid)
p = Things(
self, entity='person',
df=persons[['permId', 'userId', 'firstName', 'lastName', 'email', 'space', 'registrationDate', 'active']],
identifier_name='permId'
)
return p
def get_person(self, userId, only_data=False):
""" Get a person (user)
"""
ids = [{
"@type": "as.dto.person.id.PersonPermId",
"permId": userId
}]
fetchopts = {}
Swen Vermeul
committed
for option in ['space', 'project']:
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
fetchopts[option] = fetch_option[option]
request = {
"method": "getPersons",
"params": [
self.token,
ids,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No person found!")
for permid in resp:
person = resp[permid]
parse_jackson(person)
if only_data:
return person
else:
return Person(self, data=person)
def get_spaces(self, code=None):
""" Get a list of all available spaces (DataFrame object). To create a sample or a
dataset, you need to specify in which space it should live.
"""
search_criteria = _subcriteria_for_code(code, 'space')
fetchopts = {}
"params": [self.token,
search_criteria,
fetchopts,
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
spaces = DataFrame(resp['objects'])
spaces['registrationDate'] = spaces['registrationDate'].map(format_timestamp)
spaces['modificationDate'] = spaces['modificationDate'].map(format_timestamp)
sp = Things(
self,
'space',
spaces[['code', 'description', 'registrationDate', 'modificationDate']]
)
return sp
def get_space(self, code, only_data=False):
""" Returns a Space object for a given identifier.
code = str(code).upper()
fetchopts = {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"}
for option in ['registrator']:
fetchopts[option] = fetch_option[option]
"method": "getSpaces",
"params": [
self.token,
[{
"permId": code,
"@type": "as.dto.space.id.SpacePermId"
}],
fetchopts
resp = self._post_request(self.as_v3, request)
raise ValueError("No such space: %s" % code)
for permid in resp:
if only_data:
return resp[permid]
else:
return Space(self, data=resp[permid])
Chandrasekhar Ramakrishnan
committed
def get_samples(self, code=None, permId=None, space=None, project=None, experiment=None, type=None,
Swen Vermeul
committed
withParents=None, withChildren=None, tags=None, props=None, **properties):
""" Get a list of all samples for a given space/project/experiment (or any combination)
"""
sub_criteria = []
if space:
sub_criteria.append(_gen_search_criteria({
"space": "Space",
"operator": "AND",
"code": space
)
exp_crit = _subcriteria_for_code(experiment, 'experiment')
proj_crit = _subcriteria_for_code(project, 'project')
exp_crit['criteria'] = []
exp_crit['criteria'].append(proj_crit)
sub_criteria.append(exp_crit)
sub_criteria.append(_subcriteria_for_code(experiment, 'experiment'))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
if type:
sub_criteria.append(_subcriteria_for_code(type, 'sample_type'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if code:
sub_criteria.append(_criteria_for_code(code))
if permId:
sub_criteria.append(_common_search("as.dto.common.search.PermIdSearchCriteria", permId))
if withParents:
if not isinstance(withParents, list):
withParents = [withParents]
for parent in withParents:
sub_criteria.append(
_gen_search_criteria({
"sample": "SampleParents",
"identifier": parent
})
)
if withChildren:
if not isinstance(withChildren, list):
withChildren = [withChildren]
for child in withChildren:
sub_criteria.append(
_gen_search_criteria({
"sample": "SampleChildren",
"identifier": child
})
)
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.sample.search.SampleSearchCriteria",
"operator": "AND"
}
# build the various fetch options
fetchopts = fetch_option['sample']
for option in ['tags', 'properties', 'registrator', 'modifier', 'experiment']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchSamples",
"params": [self.token,
criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
if len(resp['objects']) == 0:
raise ValueError("no samples found!")
Swen Vermeul
committed
objects = resp['objects']
parse_jackson(objects)
samples = DataFrame(objects)
samples['registrationDate'] = samples['registrationDate'].map(format_timestamp)
samples['modificationDate'] = samples['modificationDate'].map(format_timestamp)
Swen Vermeul
committed
samples['registrator'] = samples['registrator'].map(extract_person)
samples['modifier'] = samples['modifier'].map(extract_person)
samples['identifier'] = samples['identifier'].map(extract_identifier)
samples['permId'] = samples['permId'].map(extract_permid)
Swen Vermeul
committed
samples['experiment'] = samples['experiment'].map(extract_nested_identifier)
samples['sample_type'] = samples['type'].map(extract_nested_permid)
attrs = ['identifier', 'permId', 'experiment', 'sample_type',
'registrator', 'registrationDate', 'modifier', 'modificationDate']
Swen Vermeul
committed
Swen Vermeul
committed
for prop in props:
samples[prop.upper()] = samples['properties'].map(lambda x: x.get(prop.upper(), ''))
Swen Vermeul
committed
attrs.append(prop.upper())
ss = samples[attrs]
return Things(self, 'sample', ss, 'identifier')
Swen Vermeul
committed
get_objects = get_samples # Alias
def get_experiments(self, code=None, type=None, space=None, project=None, tags=None, is_finished=None, props=None, **properties):
""" Searches for all experiment which match the search criteria. Returns a
«Things» object which can be used in many different situations.
Usage::
experiments = get_experiments(project='PROJECT_NAME', props=['NAME','FINISHED_FLAG'])
experiments[0] # returns first experiment
experiments['/MATERIALS/REAGENTS/ANTIBODY_COLLECTION']
for experiment in experiment:
# handle every experiment
...
experiments.df # returns DataFrame object of the experiment list
print(experiments) # prints a nice ASCII table
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
sub_criteria.append(_subcriteria_for_code(project, 'project'))
sub_criteria.append(_criteria_for_code(code))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'Experiment'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if is_finished is not None:
sub_criteria.append(_subcriteria_for_is_finished(is_finished))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
search_criteria = get_search_type_for_entity('experiment')
search_criteria['criteria'] = sub_criteria
search_criteria['operator'] = 'AND'
fetchopts = fetch_option['experiment']
for option in ['tags', 'properties', 'registrator', 'modifier', 'project']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchExperiments",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
raise ValueError("No experiments found!")
objects = resp['objects']
parse_jackson(objects)
experiments = DataFrame(objects)
experiments['registrationDate'] = experiments['registrationDate'].map(format_timestamp)
experiments['modificationDate'] = experiments['modificationDate'].map(format_timestamp)
experiments['project'] = experiments['project'].map(extract_code)
experiments['registrator'] = experiments['registrator'].map(extract_person)
experiments['modifier'] = experiments['modifier'].map(extract_person)
experiments['identifier'] = experiments['identifier'].map(extract_identifier)
experiments['permId'] = experiments['permId'].map(extract_permid)
experiments['type'] = experiments['type'].map(extract_code)
attrs = ['identifier', 'permId', 'project', 'type',
Swen Vermeul
committed
'registrator', 'registrationDate', 'modifier', 'modificationDate']
if props is not None:
for prop in props:
experiments[prop.upper()] = experiments['properties'].map(lambda x: x.get(prop.upper(), ''))
Swen Vermeul
committed
attrs.append(prop.upper())
exps = experiments[attrs]
return Things(self, 'experiment', exps, 'identifier')
def get_datasets(self,
code=None, type=None, withParents=None, withChildren=None, status=None,
sample=None, experiment=None, project=None, tags=None, props=None, **properties
sub_criteria = []
if code:
sub_criteria.append(_criteria_for_code(code))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'DataSet'))
if withParents:
sub_criteria.append(_subcriteria_for_permid(withParents, 'DataSet', 'Parents'))
if withChildren:
sub_criteria.append(_subcriteria_for_permid(withChildren, 'DataSet', 'Children'))
if sample:
sub_criteria.append(_subcriteria_for_code(sample, 'Sample'))
if experiment:
sub_criteria.append(_subcriteria_for_code(experiment, 'Experiment'))
if project:
exp_crit = _subcriteria_for_code(experiment, 'Experiment')
proj_crit = _subcriteria_for_code(project, 'Project')
exp_crit['criteria'] = []
exp_crit['criteria'].append(proj_crit)
sub_criteria.append(exp_crit)
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if status:
sub_criteria.append(_subcriteria_for_status(status))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
Swen Vermeul
committed
search_criteria = get_search_type_for_entity('dataset')
search_criteria['criteria'] = sub_criteria
search_criteria['operator'] = 'AND'
fetchopts = {
"containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
"type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"}
}
for option in ['tags', 'properties', 'sample', 'experiment', 'physicalData']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchDataSets",
"params": [self.token,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
parse_jackson(objects)
datasets = DataFrame(objects)
datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
datasets['modificationDate'] = datasets['modificationDate'].map(format_timestamp)
datasets['experiment'] = datasets['experiment'].map(extract_nested_identifier)
datasets['sample'] = datasets['sample'].map(extract_nested_identifier)
datasets['type'] = datasets['type'].map(extract_code)
datasets['permId'] = datasets['code']
datasets['location'] = datasets['physicalData'].map(lambda x: x.get('location') if x else '')
attrs = ['permId', 'properties', 'type', 'experiment', 'sample', 'registrationDate', 'modificationDate',
'location']
if props is not None:
for prop in props:
datasets[prop.upper()] = datasets['properties'].map(lambda x: x.get(prop.upper(), ''))
attrs.append(prop.upper())
return Things(self, 'dataset', datasets[attrs], 'permId')
def get_experiment(self, expId, withAttachments=False, only_data=False):
""" Returns an experiment object for a given identifier (expId).
"""
fetchopts = {
Swen Vermeul
committed
"@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
"type": {
"@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
},
search_request = search_request_for_identifier(expId, 'experiment')
Swen Vermeul
committed
for option in ['tags', 'properties', 'attachments', 'project', 'samples']:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
"method": "getExperiments",
"params": [
self.token,
[search_request],
fetchopts
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No such experiment: %s" % expId)
for id in resp:
if only_data:
return resp[id]
else:
return Experiment(
openbis_obj = self,
type = self.get_experiment_type(resp[expId]["type"]["code"]),
data = resp[id]
)
def new_experiment(self, type, code, project, props=None, **kwargs):
""" Creates a new experiment of a given experiment type.
"""
return Experiment(
openbis_obj = self,
type = self.get_experiment_type(type),
data = None,
props = props,
code = code,
**kwargs
)
def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
params = {
"experimentId": {
"permId": experimentId,
"@type": "as.dto.experiment.id.ExperimentPermId"
},
"@type": "as.dto.experiment.update.ExperimentUpdate"
}
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateExperiments",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
def create_sample(self, space_ident, code, type,
project_ident=None, experiment_ident=None, properties=None, attachments=None, tags=None):
tagIds = _create_tagIds(tags)
typeId = _create_typeId(type)
projectId = _create_projectId(project_ident)
experimentId = _create_experimentId(experiment_ident)
if properties is None:
properties = {}
request = {
"method": "createSamples",
"params": [
self.token,
[
{
"properties": properties,
"code": code,
"typeId": typeId,
"projectId": projectId,
"experimentId": experimentId,
"tagIds": tagIds,
"attachments": attachments,
"@type": "as.dto.sample.create.SampleCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_sample(resp[0]['permId'])
create_object = create_sample # Alias
Chandrasekhar Ramakrishnan
committed
def create_external_data_management_system(self, code, label, address, address_type='FILE_SYSTEM'):
Chandrasekhar Ramakrishnan
committed
"""Create an external DMS.
:param code: An openBIS code for the external DMS.
:param label: A human-readable label.
:param address: The address for accessing the external DMS. E.g., a URL.
Chandrasekhar Ramakrishnan
committed
:param address_type: One of OPENBIS, URL, or FILE_SYSTEM
Chandrasekhar Ramakrishnan
committed
:return:
"""
request = {
"method": "createExternalDataManagementSystems",
"params": [
self.token,
[
{
"code": code,
"label": label,
"addressType": address_type,
"address": address,
"@type": "as.dto.externaldms.create.ExternalDmsCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_external_data_management_system(resp[0]['permId'])
def update_sample(self, sampleId, space=None, project=None, experiment=None,
parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
params = {
"sampleId": {
"permId": sampleId,
"@type": "as.dto.sample.id.SamplePermId"
},
"@type": "as.dto.sample.update.SampleUpdate"
}
if space is not None:
params['spaceId'] = space
if project is not None:
params['projectId'] = project
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateSamples",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
update_object = update_sample # Alias
def delete_entity(self, entity, id, reason, id_name='PermId'):
"""Deletes Spaces, Projects, Experiments, Samples and DataSets
"""
entity_type = "as.dto.{}.id.{}{}{}".format(
entity.lower(), entity,
id_name[0].upper(), id_name[1:]
)
request = {
"method": "delete{}s".format(entity),
"params": [
self.token,
[
{
id_name: id,
"@type": entity_type
}
],
{
"reason": reason,
"@type": "as.dto.{}.delete.{}DeletionOptions".format(
entity.lower(), entity)
}
]
}
self._post_request(self.as_v3, request)
def get_deletions(self):
request = {
"method": "searchDeletions",
"params": [
self.token,
{},
{
"deletedObjects": {
"@type": "as.dto.deletion.fetchoptions.DeletedObjectFetchOptions"
}
}
]
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
parse_jackson(objects)
new_objs = []
for value in objects:
del_objs = extract_deletion(value)
if len(del_objs) > 0:
new_objs.append(*del_objs)
return DataFrame(new_objs)
def new_project(self, space, code, description=None, **kwargs):
return Project(self, None, space=space, code=code, description=description, **kwargs)
def _gen_fetchoptions(self, options):
fo = {}
for option in options:
fo[option] = fetch_option[option]
return fo
def get_project(self, projectId, only_data=False):
options = ['space', 'registrator', 'modifier', 'attachments']
if is_identifier(projectId) or is_permid(projectId):
request = self._create_get_request(
'getProjects', 'project', projectId, options
)
resp = self._post_request(self.as_v3, request)
if only_data:
return resp[projectId]
return Project(self, resp[projectId])
else:
search_criteria = _gen_search_criteria({
'project': 'Project',
'operator': 'AND',
'code': projectId
})
fo = self._gen_fetchoptions(options)
request = {
"method": "searchProjects",
"params": [self.token, search_criteria, fo]
}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
if len(resp['objects']) == 0:
raise ValueError("No such project: %s" % projectId)
if only_data:
return resp['objects'][0]
return Project(self, resp['objects'][0])
def get_projects(self, space=None, code=None):
""" Get a list of all available projects (DataFrame object).
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
if code:
sub_criteria.append(_criteria_for_code(code))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.project.search.ProjectSearchCriteria",
"operator": "AND"
}
fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
for option in ['registrator', 'modifier', 'leader']:
request = {
"method": "searchProjects",
"params": [self.token,
criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
if len(objects) == 0:
raise ValueError("No projects found!")