Newer
Older
def get_role_assignments(self, start_with=None, count=None, **search_args):
""" Get the assigned roles for a given group, person or space
"""
search_criteria = get_search_type_for_entity('roleAssignment', 'AND')
Swen Vermeul
committed
allowed_search_attrs = ['role', 'roleLevel', 'user', 'group', 'person', 'space']
sub_crit = []
for attr in search_args:
if attr in allowed_search_attrs:
if attr == 'space':
sub_crit.append(
_subcriteria_for_code(search_args[attr], 'space')
)
elif attr in ['user','person']:
userId = ''
if isinstance(search_args[attr], str):
userId = search_args[attr]
else:
userId = search_args[attr].userId
sub_crit.append(
_subcriteria_for_userId(userId)
)
elif attr == 'group':
groupId = ''
if isinstance(search_args[attr], str):
groupId = search_args[attr]
else:
groupId = search_args[attr].code
_subcriteria_for_permid(groupId, 'AuthorizationGroup')
)
elif attr == 'role':
# TODO
raise ValueError("not yet implemented")
elif attr == 'roleLevel':
# TODO
raise ValueError("not yet implemented")
else:
pass
else:
Swen Vermeul
committed
raise ValueError("unknown search argument {}".format(attr))
search_criteria['criteria'] = sub_crit
fetchopts = fetch_option['roleAssignments']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['space', 'project', 'user', 'authorizationGroup','registrator']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchRoleAssignments",
"params": [
self.token,
search_criteria,
fetchopts
]
}
Swen Vermeul
committed
attrs=['techId', 'role', 'roleLevel', 'user', 'group', 'space', 'project']
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
Swen Vermeul
committed
roles = DataFrame(columns=attrs)
Swen Vermeul
committed
else:
objects = resp['objects']
parse_jackson(objects)
roles = DataFrame(objects)
roles['techId'] = roles['id'].map(extract_id)
roles['user'] = roles['user'].map(extract_userId)
roles['group'] = roles['authorizationGroup'].map(extract_code)
roles['space'] = roles['space'].map(extract_code)
roles['project'] = roles['project'].map(extract_code)
return Things(
openbis_obj = self,
entity='role_assignment',
Swen Vermeul
committed
df=roles[attrs],
identifier_name='techId',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
Swen Vermeul
committed
def get_role_assignment(self, techId, only_data=False):
""" Fetches one assigned role by its techId.
"""
fetchopts = {
"@type": "as.dto.roleassignment.fetchoptions.RoleAssignmentFetchOptions"
}
for option in ['roleAssignments', 'space', 'project', 'user', 'authorizationGroup','registrator']:
fetchopts[option] = fetch_option[option]
request = {
"method": "getRoleAssignments",
"params": [
self.token,
[{
"@type": "as.dto.roleassignment.id.RoleAssignmentTechId"
}],
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No assigned role found for techId={}".format(techId))
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
for id in resp:
data = resp[id]
parse_jackson(data)
if only_data:
return data
else:
return RoleAssignment(self, data=data)
def assign_role(self, role, **args):
""" general method to assign a role to either
- a person
- a group
The scope is either
- the whole instance
- a space
- a project
"""
userId = None
groupId = None
spaceId = None
projectId = None
for arg in args:
if arg in ['person', 'group', 'space', 'project']:
permId = args[arg] if isinstance(args[arg],str) else args[arg].permId
if arg == 'person':
userId = {
"permId": permId,
"@type": "as.dto.person.id.PersonPermId"
}
elif arg == 'group':
groupId = {
"permId": permId,
"@type": "as.dto.authorizationgroup.id.AuthorizationGroupPermId"
}
elif arg == 'space':
spaceId = {
"permId": permId,
"@type": "as.dto.space.id.SpacePermId"
}
elif arg == 'project':
projectId = {
"permId": permId,
"@type": "as.dto.project.id.ProjectPermId"
}
request = {
"method": "createRoleAssignments",
"params": [
[
{
"role": role,
"userId": userId,
"authorizationGroupId": groupId,
"spaceId": spaceId,
"projectId": projectId,
"@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
}
]
]
}
resp = self._post_request(self.as_v3, request)
return
def get_groups(self, start_with=None, count=None, **search_args):
""" Get openBIS AuthorizationGroups. Returns a «Things» object.
Usage::
groups = e.get.groups()
groups[0] # select first group
groups['GROUP_NAME'] # select group with this code
for group in groups:
... # a Group object
groups.df # get a DataFrame object of the group list
print(groups) # print a nice ASCII table (eg. in IPython)
groups # HTML table (in a Jupyter notebook)
"""
criteria = []
for search_arg in ['code']:
# unfortunately, there aren't many search possibilities yet...
if search_arg in search_args:
if search_arg == 'code':
criteria.append(_criteria_for_code(search_args[search_arg]))
search_criteria = get_search_type_for_entity('authorizationGroup')
search_criteria['criteria'] = criteria
fetchopts = fetch_option['authorizationGroup']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['roleAssignments', 'registrator', 'users']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchAuthorizationGroups",
"params": [
self.token,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'code', 'description', 'users', 'registrator', 'registrationDate', 'modificationDate']
if len(resp['objects']) == 0:
groups = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
groups = DataFrame(objects)
groups['permId'] = groups['permId'].map(extract_permid)
groups['registrator'] = groups['registrator'].map(extract_person)
groups['users'] = groups['users'].map(extract_userId)
groups['registrationDate'] = groups['registrationDate'].map(format_timestamp)
groups['modificationDate'] = groups['modificationDate'].map(format_timestamp)
return Things(
openbis_obj = self,
entity='group',
df=groups[attrs],
identifier_name='permId',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
def get_persons(self, start_with=None, count=None, **search_args):
""" Get openBIS users
"""
search_criteria = get_search_criteria('person', **search_args)
fetchopts = fetch_option['person']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['space']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchPersons",
"params": [
self.token,
search_criteria,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'userId', 'firstName', 'lastName', 'email', 'space', 'registrationDate', 'active']
if len(resp['objects']) == 0:
persons = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
persons = DataFrame(resp['objects'])
persons['permId'] = persons['permId'].map(extract_permid)
persons['registrationDate'] = persons['registrationDate'].map(format_timestamp)
persons['space'] = persons['space'].map(extract_nested_permid)
return Things(
openbis_obj = self,
entity='person',
df=persons[attrs],
identifier_name='permId',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
get_users = get_persons # Alias
def get_person(self, userId, only_data=False):
""" Get a person (user)
"""
ids = [{
"@type": "as.dto.person.id.PersonPermId",
"permId": userId
}]
fetchopts = {
"@type": "as.dto.person.fetchoptions.PersonFetchOptions"
}
Swen Vermeul
committed
for option in ['space', 'project']:
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
fetchopts[option] = fetch_option[option]
request = {
"method": "getPersons",
"params": [
self.token,
ids,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No person found!")
for permid in resp:
person = resp[permid]
parse_jackson(person)
if only_data:
return person
else:
return Person(self, data=person)
get_user = get_person # Alias
def get_spaces(self, code=None, start_with=None, count=None):
""" Get a list of all available spaces (DataFrame object). To create a sample or a
dataset, you need to specify in which space it should live.
"""
search_criteria = _subcriteria_for_code(code, 'space')
fetchopts = fetch_option['space']
fetchopts['from'] = start_with
fetchopts['count'] = count
"params": [self.token,
search_criteria,
fetchopts,
}
resp = self._post_request(self.as_v3, request)
attrs = ['code', 'description', 'registrationDate', 'modificationDate']
if len(resp['objects']) == 0:
spaces = DataFrame(columns=attrs)
else:
spaces['registrationDate'] = spaces['registrationDate'].map(format_timestamp)
spaces['modificationDate'] = spaces['modificationDate'].map(format_timestamp)
return Things(
openbis_obj = self,
entity = 'space',
df = spaces[attrs],
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
def get_space(self, code, only_data=False):
""" Returns a Space object for a given identifier.
code = str(code).upper()
fetchopts = {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"}
for option in ['registrator']:
fetchopts[option] = fetch_option[option]
"method": "getSpaces",
"params": [
self.token,
[{
"permId": code,
"@type": "as.dto.space.id.SpacePermId"
}],
fetchopts
resp = self._post_request(self.as_v3, request)
raise ValueError("No such space: %s" % code)
for permid in resp:
if only_data:
return resp[permid]
else:
return Space(self, data=resp[permid])
Chandrasekhar Ramakrishnan
committed
Swen Vermeul
committed
self, identifier=None, code=None, permId=None,
space=None, project=None, experiment=None, type=None,
start_with=None, count=None,
withParents=None, withChildren=None, tags=None, props=None, **properties
):
""" Get a list of all samples for a given space/project/experiment (or any combination)
"""
# v3 API does not offer a search for identifiers. We need to do a combined search instead:
# space && code or
# space && project && code
if identifier:
identifier = identifier.lstrip('/')
elements = identifier.split('/')
if len(elements) == 2:
space = elements[0]
code = elements[1]
elif len(elements) == 3:
space = elements[0]
code = elements[2]
else:
raise ValueError("{} is not a valid sample identifier.".format(identifier))
sub_criteria.append(_subcriteria_for_code(space, 'space'))
proj_crit = _subcriteria_for_code(project, 'project')
sub_criteria.append(proj_crit)
sub_criteria.append(_subcriteria_for_code(experiment, 'experiment'))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
if type:
sub_criteria.append(_subcriteria_for_code(type, 'sample_type'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if code:
sub_criteria.append(_criteria_for_code(code))
if permId:
sub_criteria.append(_common_search("as.dto.common.search.PermIdSearchCriteria", permId))
if withParents:
if not isinstance(withParents, list):
withParents = [withParents]
for parent in withParents:
sub_criteria.append(
_gen_search_criteria({
"sample": "SampleParents",
"identifier": parent
})
)
if withChildren:
if not isinstance(withChildren, list):
withChildren = [withChildren]
for child in withChildren:
sub_criteria.append(
_gen_search_criteria({
"sample": "SampleChildren",
"identifier": child
})
)
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.sample.search.SampleSearchCriteria",
"operator": "AND"
}
# build the various fetch options
fetchopts = fetch_option['sample']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['tags', 'properties', 'registrator', 'modifier', 'experiment']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchSamples",
"params": [self.token,
criteria,
fetchopts,
],
Swen Vermeul
committed
resp = self._post_request(self.as_v3, request)
return self._sample_list_for_response(
response=resp['objects'],
props=props,
start_with=start_with,
count=count,
totalCount=resp['totalCount'],
Swen Vermeul
committed
get_objects = get_samples # Alias
Swen Vermeul
committed
self, code=None, permId=None, type=None, space=None, project=None,
start_with=None, count=None,
tags=None, is_finished=None, props=None, **properties
):
""" Searches for all experiment which match the search criteria. Returns a
«Things» object which can be used in many different situations.
Usage::
experiments = get_experiments(project='PROJECT_NAME', props=['NAME','FINISHED_FLAG'])
experiments[0] # returns first experiment
experiments['/MATERIALS/REAGENTS/ANTIBODY_COLLECTION']
for experiment in experiment:
# handle every experiment
...
experiments.df # returns DataFrame object of the experiment list
print(experiments) # prints a nice ASCII table
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
sub_criteria.append(_subcriteria_for_code(project, 'project'))
sub_criteria.append(_criteria_for_code(code))
if permId:
sub_criteria.append(_common_search("as.dto.common.search.PermIdSearchCriteria", permId))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'Experiment'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if is_finished is not None:
sub_criteria.append(_subcriteria_for_is_finished(is_finished))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
search_criteria = get_search_type_for_entity('experiment')
search_criteria['criteria'] = sub_criteria
search_criteria['operator'] = 'AND'
fetchopts = fetch_option['experiment']
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['tags', 'properties', 'registrator', 'modifier', 'project']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchExperiments",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['identifier', 'permId', 'project', 'type',
Swen Vermeul
committed
'registrator', 'registrationDate', 'modifier', 'modificationDate']
if len(resp['objects']) == 0:
experiments = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
experiments = DataFrame(objects)
experiments['registrationDate'] = experiments['registrationDate'].map(format_timestamp)
experiments['modificationDate'] = experiments['modificationDate'].map(format_timestamp)
experiments['project'] = experiments['project'].map(extract_code)
experiments['registrator'] = experiments['registrator'].map(extract_person)
experiments['modifier'] = experiments['modifier'].map(extract_person)
experiments['identifier'] = experiments['identifier'].map(extract_identifier)
experiments['permId'] = experiments['permId'].map(extract_permid)
experiments['type'] = experiments['type'].map(extract_code)
Swen Vermeul
committed
if props is not None:
for prop in props:
experiments[prop.upper()] = experiments['properties'].map(lambda x: x.get(prop.upper(), ''))
Swen Vermeul
committed
attrs.append(prop.upper())
return Things(
openbis_obj = self,
entity = 'experiment',
df = experiments[attrs],
identifier_name ='identifier',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
get_collections = get_experiments # Alias
self, code=None, type=None, withParents=None, withChildren=None,
Swen Vermeul
committed
start_with=None, count=None, kind=None,
status=None, sample=None, experiment=None, project=None,
tags=None, props=None, **properties
):
sub_criteria = []
if code:
sub_criteria.append(_criteria_for_code(code))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'DataSet'))
if withParents:
sub_criteria.append(_subcriteria_for_permid(withParents, 'DataSet', 'Parents'))
if withChildren:
sub_criteria.append(_subcriteria_for_permid(withChildren, 'DataSet', 'Children'))
if sample:
sub_criteria.append(_subcriteria_for_code(sample, 'Sample'))
if experiment:
sub_criteria.append(_subcriteria_for_code(experiment, 'Experiment'))
if project:
exp_crit = _subcriteria_for_code(experiment, 'Experiment')
proj_crit = _subcriteria_for_code(project, 'Project')
exp_crit['criteria'] = []
exp_crit['criteria'].append(proj_crit)
sub_criteria.append(exp_crit)
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if status:
sub_criteria.append(_subcriteria_for_status(status))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
Swen Vermeul
committed
search_criteria = get_search_type_for_entity('dataset')
search_criteria['criteria'] = sub_criteria
search_criteria['operator'] = 'AND'
fetchopts = {
Swen Vermeul
committed
"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions",
"containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
"type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"}
}
fetchopts['from'] = start_with
fetchopts['count'] = count
Swen Vermeul
committed
if kind:
kind = kind.upper()
if kind not in ['PHYSICAL_DATA', 'CONTAINER', 'LINK']:
raise ValueError("unknown dataSet kind: {}. It should be one of the following: PHYSICAL_DATA, CONTAINER or LINK".format(kind))
fetchopts['kind'] = kind
raise NotImplementedError('you cannot search for dataSet kinds yet')
for option in ['tags', 'properties', 'sample', 'experiment', 'physicalData']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchDataSets",
"params": [self.token,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
return self._dataset_list_for_response(
response=resp['objects'],
props=props,
start_with=start_with,
count=count,
totalCount=resp['totalCount'],
Swen Vermeul
committed
def get_experiment(self, expId, withAttachments=False, only_data=False):
""" Returns an experiment object for a given identifier (expId).
"""
fetchopts = {
Swen Vermeul
committed
"@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
"type": {
"@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
},
Swen Vermeul
committed
search_request = _type_for_id(expId, 'experiment')
for option in ['tags', 'properties', 'attachments', 'project', 'samples', 'registrator', 'modifier']:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
"method": "getExperiments",
"params": [
self.token,
[search_request],
fetchopts
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No such experiment: %s" % expId)
parse_jackson(resp)
for id in resp:
if only_data:
return resp[id]
else:
return Experiment(
openbis_obj = self,
type = self.get_experiment_type(resp[expId]["type"]["code"]),
data = resp[id]
)
get_collection = get_experiment # Alias
def new_experiment(self, type, code, project, props=None, **kwargs):
""" Creates a new experiment of a given experiment type.
"""
openbis_obj = self,
type = self.get_experiment_type(type),
new_collection = new_experiment # Alias
def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
params = {
"experimentId": {
"permId": experimentId,
"@type": "as.dto.experiment.id.ExperimentPermId"
},
"@type": "as.dto.experiment.update.ExperimentUpdate"
}
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateExperiments",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
update_collection = update_experiment # Alias
Chandrasekhar Ramakrishnan
committed
def create_external_data_management_system(self, code, label, address, address_type='FILE_SYSTEM'):
Chandrasekhar Ramakrishnan
committed
"""Create an external DMS.
:param code: An openBIS code for the external DMS.
:param label: A human-readable label.
:param address: The address for accessing the external DMS. E.g., a URL.
Chandrasekhar Ramakrishnan
committed
:param address_type: One of OPENBIS, URL, or FILE_SYSTEM
Chandrasekhar Ramakrishnan
committed
:return:
"""
request = {
"method": "createExternalDataManagementSystems",
"params": [
self.token,
[
{
"code": code,
"label": label,
"addressType": address_type,
"address": address,
"@type": "as.dto.externaldms.create.ExternalDmsCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_external_data_management_system(resp[0]['permId'])
def update_sample(self, sampleId, space=None, project=None, experiment=None,
parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
params = {
"sampleId": {
"permId": sampleId,
"@type": "as.dto.sample.id.SamplePermId"
},
"@type": "as.dto.sample.update.SampleUpdate"
}
if space is not None:
params['spaceId'] = space
if project is not None:
params['projectId'] = project
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateSamples",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
update_object = update_sample # Alias
def delete_entity(self, entity, id, reason, id_name='permId'):
"""Deletes Spaces, Projects, Experiments, Samples and DataSets
"""
entity_type = "as.dto.{}.id.{}{}{}".format(
id_name[0].upper(), id_name[1:]
)
request = {
"method": "delete{}s".format(entity),
"params": [
self.token,
[
{
id_name: id,
"@type": entity_type
}
],
{
"reason": reason,
"@type": "as.dto.{}.delete.{}DeletionOptions".format(
entity.lower(), entity)
}
]
}
resp = self._post_request(self.as_v3, request)
def delete_openbis_entity(self, entity, objectId, reason='No reason given'):
method = get_method_for_entity(entity, 'delete')
delete_options = get_type_for_entity(entity, 'delete')
delete_options['reason'] = reason
request = {
"method": method,
"params": [
self.token,
[ objectId ],
delete_options
]
}
resp = self._post_request(self.as_v3, request)
return
def get_deletions(self, start_with=None, count=None):
search_criteria = {
"@type": "as.dto.deletion.search.DeletionSearchCriteria"
}
fetchopts = fetch_option['deletion']
fetchoptsDeleted = fetch_option['deletedObjects']
fetchoptsDeleted['from'] = start_with
fetchoptsDeleted['count'] = count
fetchopts['deletedObjects'] = fetchoptsDeleted
request = {
"method": "searchDeletions",
"params": [
self.token,
]
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
parse_jackson(objects)
new_objs = []
for value in objects:
del_objs = extract_deletion(value)
if len(del_objs) > 0:
new_objs.append(*del_objs)
return DataFrame(new_objs)
def new_project(self, space, code, description=None, **kwargs):
return Project(self, None, space=space, code=code, description=description, **kwargs)
def _gen_fetchoptions(self, options, foType):
fo = {
"@type": foType
}
for option in options:
fo[option] = fetch_option[option]
return fo
def get_project(self, projectId, only_data=False):
options = ['space', 'registrator', 'modifier', 'attachments']
if is_identifier(projectId) or is_permid(projectId):
request = self._create_get_request(
'getProjects', 'project', projectId, options,
"as.dto.project.fetchoptions.ProjectFetchOptions"
)
resp = self._post_request(self.as_v3, request)
if only_data:
return resp[projectId]
return Project(self, resp[projectId])
else:
search_criteria = _gen_search_criteria({
'project': 'Project',
'operator': 'AND',
'code': projectId
})
fo = self._gen_fetchoptions(options, foType="as.dto.project.fetchoptions.ProjectFetchOptions")
request = {
"method": "searchProjects",
"params": [self.token, search_criteria, fo]
}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
if len(resp['objects']) == 0:
raise ValueError("No such project: %s" % projectId)
if only_data:
return resp['objects'][0]
return Project(self, resp['objects'][0])
def get_projects(
self, space=None, code=None,
start_with=None, count=None,
):
""" Get a list of all available projects (DataFrame object).
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
if code:
sub_criteria.append(_criteria_for_code(code))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.project.search.ProjectSearchCriteria",
"operator": "AND"
}
fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
fetchopts['from'] = start_with
fetchopts['count'] = count
for option in ['registrator', 'modifier', 'leader']:
request = {
"method": "searchProjects",
"params": [self.token,
criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['identifier', 'permId', 'leader', 'registrator', 'registrationDate', 'modifier', 'modificationDate']
if len(resp['objects']) == 0:
projects = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
projects = DataFrame(objects)
projects['registrationDate'] = projects['registrationDate'].map(format_timestamp)
projects['modificationDate'] = projects['modificationDate'].map(format_timestamp)
projects['leader'] = projects['leader'].map(extract_person)
projects['registrator'] = projects['registrator'].map(extract_person)
projects['modifier'] = projects['modifier'].map(extract_person)
projects['permId'] = projects['permId'].map(extract_permid)
projects['identifier'] = projects['identifier'].map(extract_identifier)
return Things(
openbis_obj = self,
entity = 'project',
df = projects[attrs],
identifier_name = 'identifier',
start_with = start_with,
count = count,
totalCount = resp.get('totalCount'),
def _create_get_request(self, method_name, entity, permids, options, foType):
if not isinstance(permids, list):
permids = [permids]
type = "as.dto.{}.id.{}".format(entity.lower(), entity.capitalize())
search_params = []
for permid in permids:
# decide if we got a permId or an identifier
match = re.match('/', permid)
if match:
search_params.append(
{"identifier": permid, "@type": type + 'Identifier'}