Newer
Older
request = {
"method": "createRoleAssignments",
"params": [
self.token,
[
{
"role": role,
"userId": userId,
"authorizationGroupId": groupId,
"spaceId": spaceId,
"projectId": projectId,
"@type": "as.dto.roleassignment.create.RoleAssignmentCreation",
}
]
]
}
resp = self._post_request(self.as_v3, request)
return
def get_groups(self, **search_args):
""" Get openBIS AuthorizationGroups. Returns a «Things» object.
Usage::
groups = e.get.groups()
groups[0] # select first group
groups['GROUP_NAME'] # select group with this code
for group in groups:
... # a Group object
groups.df # get a DataFrame object of the group list
print(groups) # print a nice ASCII table (eg. in IPython)
groups # HTML table (in a Jupyter notebook)
"""
criteria = []
for search_arg in ['code']:
# unfortunately, there aren't many search possibilities yet...
if search_arg in search_args:
if search_arg == 'code':
criteria.append(_criteria_for_code(search_args[search_arg]))
search_criteria = get_search_type_for_entity('authorizationGroup')
search_criteria['criteria'] = criteria
fetchopts = fetch_option['authorizationGroup']
for option in ['roleAssignments', 'registrator', 'users']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchAuthorizationGroups",
"params": [
self.token,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'code', 'description', 'users', 'registrator', 'registrationDate', 'modificationDate']
if len(resp['objects']) == 0:
groups = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
groups = DataFrame(objects)
groups['permId'] = groups['permId'].map(extract_permid)
groups['registrator'] = groups['registrator'].map(extract_person)
groups['users'] = groups['users'].map(extract_userId)
groups['registrationDate'] = groups['registrationDate'].map(format_timestamp)
groups['modificationDate'] = groups['modificationDate'].map(format_timestamp)
return Things(self, entity='group', df=groups[attrs], identifier_name='permId')
def get_persons(self, **search_args):
""" Get openBIS users
"""
search_criteria = get_search_criteria('person', **search_args)
fetchopts = {}
for option in ['space']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchPersons",
"params": [
self.token,
search_criteria,
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'userId', 'firstName', 'lastName', 'email', 'space', 'registrationDate', 'active']
if len(resp['objects']) == 0:
persons = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
persons = DataFrame(resp['objects'])
persons['permId'] = persons['permId'].map(extract_permid)
persons['registrationDate'] = persons['registrationDate'].map(format_timestamp)
persons['space'] = persons['space'].map(extract_nested_permid)
return Things(
self, entity='person', df=persons[attrs], identifier_name='permId'
get_users = get_persons # Alias
def get_person(self, userId, only_data=False):
""" Get a person (user)
"""
ids = [{
"@type": "as.dto.person.id.PersonPermId",
"permId": userId
}]
fetchopts = {}
Swen Vermeul
committed
for option in ['space', 'project']:
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
fetchopts[option] = fetch_option[option]
request = {
"method": "getPersons",
"params": [
self.token,
ids,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No person found!")
for permid in resp:
person = resp[permid]
parse_jackson(person)
if only_data:
return person
else:
return Person(self, data=person)
get_user = get_person # Alias
def get_spaces(self, code=None):
""" Get a list of all available spaces (DataFrame object). To create a sample or a
dataset, you need to specify in which space it should live.
"""
search_criteria = _subcriteria_for_code(code, 'space')
fetchopts = {}
"params": [self.token,
search_criteria,
fetchopts,
}
resp = self._post_request(self.as_v3, request)
attrs = ['code', 'description', 'registrationDate', 'modificationDate']
if len(resp['objects']) == 0:
spaces = DataFrame(columns=attrs)
else:
spaces['registrationDate'] = spaces['registrationDate'].map(format_timestamp)
spaces['modificationDate'] = spaces['modificationDate'].map(format_timestamp)
return Things(self, 'space', spaces[attrs])
def get_space(self, code, only_data=False):
""" Returns a Space object for a given identifier.
code = str(code).upper()
fetchopts = {"@type": "as.dto.space.fetchoptions.SpaceFetchOptions"}
for option in ['registrator']:
fetchopts[option] = fetch_option[option]
"method": "getSpaces",
"params": [
self.token,
[{
"permId": code,
"@type": "as.dto.space.id.SpacePermId"
}],
fetchopts
resp = self._post_request(self.as_v3, request)
raise ValueError("No such space: %s" % code)
for permid in resp:
if only_data:
return resp[permid]
else:
return Space(self, data=resp[permid])
Chandrasekhar Ramakrishnan
committed
Swen Vermeul
committed
def get_samples(self, code=None, permId=None, space=None, project=None, experiment=None, type=None,
Swen Vermeul
committed
withParents=None, withChildren=None, tags=None, props=None, **properties):
""" Get a list of all samples for a given space/project/experiment (or any combination)
"""
sub_criteria = []
if space:
sub_criteria.append(_gen_search_criteria({
"space": "Space",
"operator": "AND",
"code": space
)
exp_crit = _subcriteria_for_code(experiment, 'experiment')
proj_crit = _subcriteria_for_code(project, 'project')
exp_crit['criteria'] = []
exp_crit['criteria'].append(proj_crit)
sub_criteria.append(exp_crit)
sub_criteria.append(_subcriteria_for_code(experiment, 'experiment'))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
if type:
sub_criteria.append(_subcriteria_for_code(type, 'sample_type'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if code:
sub_criteria.append(_criteria_for_code(code))
if permId:
sub_criteria.append(_common_search("as.dto.common.search.PermIdSearchCriteria", permId))
if withParents:
if not isinstance(withParents, list):
withParents = [withParents]
for parent in withParents:
sub_criteria.append(
_gen_search_criteria({
"sample": "SampleParents",
"identifier": parent
})
)
if withChildren:
if not isinstance(withChildren, list):
withChildren = [withChildren]
for child in withChildren:
sub_criteria.append(
_gen_search_criteria({
"sample": "SampleChildren",
"identifier": child
})
)
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.sample.search.SampleSearchCriteria",
"operator": "AND"
}
# build the various fetch options
fetchopts = fetch_option['sample']
for option in ['tags', 'properties', 'registrator', 'modifier', 'experiment']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchSamples",
"params": [self.token,
criteria,
fetchopts,
],
Swen Vermeul
committed
resp = self._post_request(self.as_v3, request)
attrs = ['identifier', 'permId', 'experiment', 'sample_type',
'registrator', 'registrationDate', 'modifier', 'modificationDate']
Swen Vermeul
committed
if len(resp['objects']) == 0:
samples = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
samples = DataFrame(objects)
samples['registrationDate'] = samples['registrationDate'].map(format_timestamp)
samples['modificationDate'] = samples['modificationDate'].map(format_timestamp)
samples['registrator'] = samples['registrator'].map(extract_person)
samples['modifier'] = samples['modifier'].map(extract_person)
samples['identifier'] = samples['identifier'].map(extract_identifier)
samples['permId'] = samples['permId'].map(extract_permid)
samples['experiment'] = samples['experiment'].map(extract_nested_identifier)
samples['sample_type'] = samples['type'].map(extract_nested_permid)
Swen Vermeul
committed
Swen Vermeul
committed
for prop in props:
samples[prop.upper()] = samples['properties'].map(lambda x: x.get(prop.upper(), ''))
Swen Vermeul
committed
attrs.append(prop.upper())
return Things(self, 'sample', samples[attrs], 'identifier')
Swen Vermeul
committed
get_objects = get_samples # Alias
Swen Vermeul
committed
def get_experiments(self, code=None, type=None, space=None, project=None, tags=None, is_finished=None, props=None, **properties):
""" Searches for all experiment which match the search criteria. Returns a
«Things» object which can be used in many different situations.
Usage::
experiments = get_experiments(project='PROJECT_NAME', props=['NAME','FINISHED_FLAG'])
experiments[0] # returns first experiment
experiments['/MATERIALS/REAGENTS/ANTIBODY_COLLECTION']
for experiment in experiment:
# handle every experiment
...
experiments.df # returns DataFrame object of the experiment list
print(experiments) # prints a nice ASCII table
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
sub_criteria.append(_subcriteria_for_code(project, 'project'))
sub_criteria.append(_criteria_for_code(code))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'Experiment'))
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if is_finished is not None:
sub_criteria.append(_subcriteria_for_is_finished(is_finished))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
search_criteria = get_search_type_for_entity('experiment')
search_criteria['criteria'] = sub_criteria
search_criteria['operator'] = 'AND'
fetchopts = fetch_option['experiment']
for option in ['tags', 'properties', 'registrator', 'modifier', 'project']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchExperiments",
"params": [
self.token,
search_criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['identifier', 'permId', 'project', 'type',
Swen Vermeul
committed
'registrator', 'registrationDate', 'modifier', 'modificationDate']
if len(resp['objects']) == 0:
experiments = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
experiments = DataFrame(objects)
experiments['registrationDate'] = experiments['registrationDate'].map(format_timestamp)
experiments['modificationDate'] = experiments['modificationDate'].map(format_timestamp)
experiments['project'] = experiments['project'].map(extract_code)
experiments['registrator'] = experiments['registrator'].map(extract_person)
experiments['modifier'] = experiments['modifier'].map(extract_person)
experiments['identifier'] = experiments['identifier'].map(extract_identifier)
experiments['permId'] = experiments['permId'].map(extract_permid)
experiments['type'] = experiments['type'].map(extract_code)
Swen Vermeul
committed
if props is not None:
for prop in props:
experiments[prop.upper()] = experiments['properties'].map(lambda x: x.get(prop.upper(), ''))
Swen Vermeul
committed
attrs.append(prop.upper())
return Things(self, 'experiment', experiments[attrs], 'identifier')
def get_datasets(self,
code=None, type=None, withParents=None, withChildren=None, status=None,
sample=None, experiment=None, project=None, tags=None, props=None, **properties
sub_criteria = []
if code:
sub_criteria.append(_criteria_for_code(code))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'DataSet'))
if withParents:
sub_criteria.append(_subcriteria_for_permid(withParents, 'DataSet', 'Parents'))
if withChildren:
sub_criteria.append(_subcriteria_for_permid(withChildren, 'DataSet', 'Children'))
if sample:
sub_criteria.append(_subcriteria_for_code(sample, 'Sample'))
if experiment:
sub_criteria.append(_subcriteria_for_code(experiment, 'Experiment'))
if project:
exp_crit = _subcriteria_for_code(experiment, 'Experiment')
proj_crit = _subcriteria_for_code(project, 'Project')
exp_crit['criteria'] = []
exp_crit['criteria'].append(proj_crit)
sub_criteria.append(exp_crit)
if tags:
sub_criteria.append(_subcriteria_for_tags(tags))
if status:
sub_criteria.append(_subcriteria_for_status(status))
if properties is not None:
for prop in properties:
sub_criteria.append(_subcriteria_for_properties(prop, properties[prop]))
Swen Vermeul
committed
search_criteria = get_search_type_for_entity('dataset')
search_criteria['criteria'] = sub_criteria
search_criteria['operator'] = 'AND'
fetchopts = {
"containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
"type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"}
}
for option in ['tags', 'properties', 'sample', 'experiment', 'physicalData']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchDataSets",
"params": [self.token,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'properties', 'type', 'experiment', 'sample', 'registrationDate', 'modificationDate', 'location']
datasets = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
datasets = DataFrame(objects)
datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
datasets['modificationDate'] = datasets['modificationDate'].map(format_timestamp)
datasets['experiment'] = datasets['experiment'].map(extract_nested_identifier)
datasets['sample'] = datasets['sample'].map(extract_nested_identifier)
datasets['type'] = datasets['type'].map(extract_code)
datasets['permId'] = datasets['code']
datasets['location'] = datasets['physicalData'].map(lambda x: x.get('location') if x else '')
if props is not None:
for prop in props:
datasets[prop.upper()] = datasets['properties'].map(lambda x: x.get(prop.upper(), ''))
attrs.append(prop.upper())
return Things(self, 'dataset', datasets[attrs], 'permId')
def get_experiment(self, expId, withAttachments=False, only_data=False):
""" Returns an experiment object for a given identifier (expId).
"""
fetchopts = {
Swen Vermeul
committed
"@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
"type": {
"@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
},
search_request = search_request_for_identifier(expId, 'experiment')
Swen Vermeul
committed
for option in ['tags', 'properties', 'attachments', 'project', 'samples']:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
"method": "getExperiments",
"params": [
self.token,
[search_request],
fetchopts
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No such experiment: %s" % expId)
for id in resp:
if only_data:
return resp[id]
else:
return Experiment(
openbis_obj = self,
type = self.get_experiment_type(resp[expId]["type"]["code"]),
data = resp[id]
)
def new_experiment(self, type, code, project, props=None, **kwargs):
""" Creates a new experiment of a given experiment type.
"""
return Experiment(
openbis_obj = self,
type = self.get_experiment_type(type),
data = None,
props = props,
code = code,
**kwargs
)
def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
params = {
"experimentId": {
"permId": experimentId,
"@type": "as.dto.experiment.id.ExperimentPermId"
},
"@type": "as.dto.experiment.update.ExperimentUpdate"
}
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateExperiments",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
Chandrasekhar Ramakrishnan
committed
def create_external_data_management_system(self, code, label, address, address_type='FILE_SYSTEM'):
Chandrasekhar Ramakrishnan
committed
"""Create an external DMS.
:param code: An openBIS code for the external DMS.
:param label: A human-readable label.
:param address: The address for accessing the external DMS. E.g., a URL.
Chandrasekhar Ramakrishnan
committed
:param address_type: One of OPENBIS, URL, or FILE_SYSTEM
Chandrasekhar Ramakrishnan
committed
:return:
"""
request = {
"method": "createExternalDataManagementSystems",
"params": [
self.token,
[
{
"code": code,
"label": label,
"addressType": address_type,
"address": address,
"@type": "as.dto.externaldms.create.ExternalDmsCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_external_data_management_system(resp[0]['permId'])
def update_sample(self, sampleId, space=None, project=None, experiment=None,
parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
params = {
"sampleId": {
"permId": sampleId,
"@type": "as.dto.sample.id.SamplePermId"
},
"@type": "as.dto.sample.update.SampleUpdate"
}
if space is not None:
params['spaceId'] = space
if project is not None:
params['projectId'] = project
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateSamples",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
update_object = update_sample # Alias
def delete_entity(self, entity, id, reason, id_name='permId'):
"""Deletes Spaces, Projects, Experiments, Samples and DataSets
"""
entity_type = "as.dto.{}.id.{}{}{}".format(
entity.lower(), entity,
id_name[0].upper(), id_name[1:]
)
request = {
"method": "delete{}s".format(entity),
"params": [
self.token,
[
{
id_name: id,
"@type": entity_type
}
],
{
"reason": reason,
"@type": "as.dto.{}.delete.{}DeletionOptions".format(
entity.lower(), entity)
}
]
}
self._post_request(self.as_v3, request)
def get_deletions(self):
request = {
"method": "searchDeletions",
"params": [
self.token,
{},
{
"deletedObjects": {
"@type": "as.dto.deletion.fetchoptions.DeletedObjectFetchOptions"
}
}
]
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
parse_jackson(objects)
new_objs = []
for value in objects:
del_objs = extract_deletion(value)
if len(del_objs) > 0:
new_objs.append(*del_objs)
return DataFrame(new_objs)
def new_project(self, space, code, description=None, **kwargs):
return Project(self, None, space=space, code=code, description=description, **kwargs)
def _gen_fetchoptions(self, options):
fo = {}
for option in options:
fo[option] = fetch_option[option]
return fo
def get_project(self, projectId, only_data=False):
options = ['space', 'registrator', 'modifier', 'attachments']
if is_identifier(projectId) or is_permid(projectId):
request = self._create_get_request(
'getProjects', 'project', projectId, options
)
resp = self._post_request(self.as_v3, request)
if only_data:
return resp[projectId]
return Project(self, resp[projectId])
else:
search_criteria = _gen_search_criteria({
'project': 'Project',
'operator': 'AND',
'code': projectId
})
fo = self._gen_fetchoptions(options)
request = {
"method": "searchProjects",
"params": [self.token, search_criteria, fo]
}
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
if len(resp['objects']) == 0:
raise ValueError("No such project: %s" % projectId)
if only_data:
return resp['objects'][0]
return Project(self, resp['objects'][0])
def get_projects(self, space=None, code=None):
""" Get a list of all available projects (DataFrame object).
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
if code:
sub_criteria.append(_criteria_for_code(code))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.project.search.ProjectSearchCriteria",
"operator": "AND"
}
fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
for option in ['registrator', 'modifier', 'leader']:
request = {
"method": "searchProjects",
"params": [self.token,
criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
attrs = ['identifier', 'permId', 'leader', 'registrator', 'registrationDate', 'modifier', 'modificationDate']
if len(resp['objects']) == 0:
projects = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
projects = DataFrame(objects)
projects['registrationDate'] = projects['registrationDate'].map(format_timestamp)
projects['modificationDate'] = projects['modificationDate'].map(format_timestamp)
projects['leader'] = projects['leader'].map(extract_person)
projects['registrator'] = projects['registrator'].map(extract_person)
projects['modifier'] = projects['modifier'].map(extract_person)
projects['permId'] = projects['permId'].map(extract_permid)
projects['identifier'] = projects['identifier'].map(extract_identifier)
return Things(self, 'project', projects[attrs], 'identifier')
def _create_get_request(self, method_name, entity, permids, options):
if not isinstance(permids, list):
permids = [permids]
type = "as.dto.{}.id.{}".format(entity.lower(), entity.capitalize())
search_params = []
for permid in permids:
# decide if we got a permId or an identifier
match = re.match('/', permid)
if match:
search_params.append(
{"identifier": permid, "@type": type + 'Identifier'}
else:
{"permId": permid, "@type": type + 'PermId'}
)
fo = {}
for option in options:
fo[option] = fetch_option[option]
request = {
"method": method_name,
"params": [
self.token,
search_params,
fo
],
}
return request
def get_terms(self, vocabulary=None):
""" Returns information about existing vocabulary terms.
If a vocabulary code is provided, it only returns the terms of that vocabulary.
search_request = {}
if vocabulary is not None:
search_request = _gen_search_criteria({
"vocabulary": "VocabularyTerm",
"criteria": [{
"vocabulary": "Vocabulary",
"code": vocabulary
}]
})
fetchopts = fetch_option['vocabularyTerm']
request = {
"method": "searchVocabularyTerms",
"params": [self.token, search_request, fetchopts]
}
resp = self._post_request(self.as_v3, request)
attrs = 'code vocabularyCode label description registrationDate modificationDate official ordinal'.split()
if len(resp['objects']) == 0:
terms = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(objects)
terms['vocabularyCode'] = terms['permId'].map(extract_attr('vocabularyCode'))
terms['registrationDate'] = terms['registrationDate'].map(format_timestamp)
terms['modificationDate'] = terms['modificationDate'].map(format_timestamp)
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
return Things(self, 'term', terms[attrs],
identifier_name='code', additional_identifier='vocabularyCode')
def new_term(self, code, vocabularyCode, label=None, description=None):
return VocabularyTerm(
self, data=None,
code=code, vocabularyCode=vocabularyCode,
label=label, description=description
)
def get_term(self, code, vocabularyCode=None, only_data=False):
entity_def = get_definition_for_entity('VocabularyTerm')
search_request = {
"code": code,
"vocabularyCode": vocabularyCode,
"@type": "as.dto.vocabulary.id.VocabularyTermPermId"
}
fetchopts = get_fetchoption_for_entity('VocabularyTerm')
for opt in ['registrator']:
fetchopts[opt] = get_fetchoption_for_entity(opt)
request = {
"method": 'getVocabularyTerms',
"params": [
self.token,
[search_request],
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
if resp is None or len(resp) == 0:
raise ValueError("no VocabularyTerm found with code='{}' and vocabularyCode='{}'".format(code, vocabularyCode))
else:
parse_jackson(resp)
for ident in resp:
if only_data:
return resp[ident]
else:
return VocabularyTerm(self, resp[ident])
def get_any_entity(self, identifier, entity, only_data=False, method=None):
entity_def = get_definition_for_entity(entity)
# guess the method to fetch an entity
if method is None:
method = 'get' + entity + 's'
search_request = search_request_for_identifier(identifier, entity)
fetchopts = get_fetchoption_for_entity(entity)
request = {
"method": method,
"params": [
self.token,
[search_request],
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
if resp is None or len(resp) == 0:
raise ValueError('no {} found with identifier: {}'.format(entity, identifier))
else:
parse_jackson(resp)
for ident in resp:
if only_data:
return resp[ident]
else:
# get a dynamic instance of that class
klass = globals()[entity]
return klass(self, resp[ident])
def get_vocabularies(self):
""" Returns information about vocabulary
"""
search_request = {}
fetchopts = fetch_option['vocabulary']
for option in ['registrator']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchVocabularies",
"params": [self.token, search_request, fetchopts]
}
resp = self._post_request(self.as_v3, request)
attrs = 'code description managedInternally internalNameSpace chosenFromList urlTemplate registrator registrationDate modificationDate'.split()
if len(resp['objects']) == 0:
vocs = DataFrame(columns=attrs)
else:
objects = resp['objects']
parse_jackson(resp)
vocs = DataFrame(objects)
vocs['registrationDate'] = vocs['registrationDate'].map(format_timestamp)
vocs['modificationDate'] = vocs['modificationDate'].map(format_timestamp)
vocs['registrator'] = vocs['registrator'].map(extract_person)
return Things(self, 'vocabulary', vocs[attrs], 'code')
def get_vocabulary(self, code, only_data=False):
""" Returns the details of a given vocabulary (including vocabulary terms)
"""
return self.get_any_entity(code, 'Vocabulary', only_data=only_data, method='getVocabularies')
def new_tag(self, code, description=None):
""" Creates a new tag (for this user)
"""
return Tag(self, code=code, description=description)
def get_tags(self, code=None):
Swen Vermeul
committed
""" Returns a DataFrame of all tags
"""
search_criteria = get_search_type_for_entity('tag', 'AND')
criteria = []
fetchopts = fetch_option['tag']
for option in ['owner']:
fetchopts[option] = fetch_option[option]
if code:
criteria.append(_criteria_for_code(code))
search_criteria['criteria'] = criteria
request = {
"method": "searchTags",
"params": [
self.token,
search_criteria,
fetchopts
]
}
resp = self._post_request(self.as_v3, request)
attrs = ['permId', 'code', 'description', 'owner', 'private', 'registrationDate']
if len(resp['objects']) == 0:
tags = DataFrame(columns = attrs)
else:
objects = resp['objects']
parse_jackson(resp)
tags = DataFrame(objects)
tags['registrationDate'] = tags['registrationDate'].map(format_timestamp)
tags['permId'] = tags['permId'].map(extract_permid)
tags['description'] = tags['description'].map(lambda x: '' if x is None else x)
tags['owner'] = tags['owner'].map(extract_person)
Swen Vermeul
committed
return Things(self, 'tag', tags[attrs], 'permId')
Swen Vermeul
committed
def get_tag(self, permId, only_data=False):
Swen Vermeul
committed
""" Returns a specific tag
"""
fetchopts = {}
request = {
"method": "getTags",
"params": [
self.token,
[{
"permId": permId,
"@type": "as.dto.tag.id.TagPermId"
}],
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
if resp is None or len(resp) == 0:
raise ValueError('no such tag: ' + permId)
else:
parse_jackson(resp)
for permId in resp:
if only_data:
return resp[permId]
else:
return Tag(self, data=resp[permId])