Newer
Older
ds = Things(
self,
'dataset',
datasets[
['permId', 'properties', 'type', 'experiment', 'sample', 'registrationDate', 'modificationDate']],
)
return ds
Swen Vermeul
committed
def get_experiment(self, expId, withAttachments=False):
""" Returns an experiment object for a given identifier (expId).
"""
fetchopts = {
Swen Vermeul
committed
"@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
"type": {
"@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
},
search_request = search_request_for_identifier(expId, 'experiment')
Swen Vermeul
committed
for option in ['tags', 'properties', 'attachments', 'project', 'samples']:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
"method": "getExperiments",
"params": [
self.token,
[search_request],
fetchopts
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No such experiment: %s" % expId)
return Experiment(self,
self.get_experiment_type(resp[expId]["type"]["code"]),
resp[expId]
)
def new_experiment(self, type, **kwargs):
""" Creates a new experiment of a given experiment type.
"""
return Experiment(self, self.get_experiment_type(type), None, **kwargs)
def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
params = {
"experimentId": {
"permId": experimentId,
"@type": "as.dto.experiment.id.ExperimentPermId"
},
"@type": "as.dto.experiment.update.ExperimentUpdate"
}
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateExperiments",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
def create_sample(self, space_ident, code, type,
project_ident=None, experiment_ident=None, properties=None, attachments=None, tags=None):
tagIds = _create_tagIds(tags)
typeId = _create_typeId(type)
projectId = _create_projectId(project_ident)
experimentId = _create_experimentId(experiment_ident)
if properties is None:
properties = {}
request = {
"method": "createSamples",
"params": [
self.token,
[
{
"properties": properties,
"code": code,
"typeId": typeId,
"projectId": projectId,
"experimentId": experimentId,
"tagIds": tagIds,
"attachments": attachments,
"@type": "as.dto.sample.create.SampleCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_sample(resp[0]['permId'])
Chandrasekhar Ramakrishnan
committed
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
def create_external_data_management_system(self, code, label, address_type, address):
"""Create an external DMS.
:param code: An openBIS code for the external DMS.
:param label: A human-readable label.
:param address_type: One of OPENBIS, URL, or FILE_SYSTEM
:param address: The address for accessing the external DMS. E.g., a URL.
:return:
"""
request = {
"method": "createExternalDataManagementSystems",
"params": [
self.token,
[
{
"code": code,
"label": label,
"addressType": address_type,
"address": address,
"@type": "as.dto.externaldms.create.ExternalDmsCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_external_data_management_system(resp[0]['permId'])
def update_sample(self, sampleId, space=None, project=None, experiment=None,
parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
params = {
"sampleId": {
"permId": sampleId,
"@type": "as.dto.sample.id.SamplePermId"
},
"@type": "as.dto.sample.update.SampleUpdate"
}
if space is not None:
params['spaceId'] = space
if project is not None:
params['projectId'] = project
if properties is not None:
params["properties"] = properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateSamples",
"params": [
self.token,
[params]
]
}
self._post_request(self.as_v3, request)
def delete_entity(self, entity, permid, reason):
"""Deletes Spaces, Projects, Experiments, Samples and DataSets
"""
entity_type = "as.dto.{}.id.{}PermId".format(entity.lower(), entity.capitalize())
request = {
"method": "delete" + entity.capitalize() + 's',
"params": [
self.token,
[
{
"permId": permid,
"@type": entity_type
}
],
{
"reason": reason,
"@type": "as.dto.{}.delete.{}DeletionOptions".format(entity.lower(), entity.capitalize())
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
}
]
}
self._post_request(self.as_v3, request)
def get_deletions(self):
request = {
"method": "searchDeletions",
"params": [
self.token,
{},
{
"deletedObjects": {
"@type": "as.dto.deletion.fetchoptions.DeletedObjectFetchOptions"
}
}
]
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
parse_jackson(objects)
new_objs = []
for value in objects:
del_objs = extract_deletion(value)
if len(del_objs) > 0:
new_objs.append(*del_objs)
return DataFrame(new_objs)
def new_project(self, space, code, description=None, **kwargs):
return Project(self, None, space=space, code=code, description=description, **kwargs)
def _gen_fetchoptions(self, options):
fo = {}
for option in options:
fo[option] = fetch_option[option]
return fo
def get_project(self, projectId):
options = ['space', 'registrator', 'modifier', 'attachments']
if is_identifier(projectId):
request = self._create_get_request(
'getProjects', 'project', projectId, options
)
resp = self._post_request(self.as_v3, request)
return Project(self, resp[projectId])
else:
search_criteria = _gen_search_criteria({
'project': 'Project',
'operator': 'AND',
'code': projectId
})
fo = self._gen_fetchoptions(options)
request = {
"method": "searchProjects",
"params": [self.token, search_criteria, fo]
}
resp = self._post_request(self.as_v3, request)
return Project(self, resp['objects'][0])
def get_projects(self, space=None):
""" Get a list of all available projects (DataFrame object).
"""
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.project.search.ProjectSearchCriteria",
"operator": "AND"
}
fetchopts = {"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"}
for option in ['registrator', 'modifier', 'leader']:
request = {
"method": "searchProjects",
"params": [self.token,
criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
objects = resp['objects']
parse_jackson(objects)
projects = DataFrame(objects)
if len(projects) is 0:
raise ValueError("No projects found!")
projects['registrationDate'] = projects['registrationDate'].map(format_timestamp)
projects['modificationDate'] = projects['modificationDate'].map(format_timestamp)
projects['leader'] = projects['leader'].map(extract_person)
projects['registrator'] = projects['registrator'].map(extract_person)
projects['modifier'] = projects['modifier'].map(extract_person)
projects['permId'] = projects['permId'].map(extract_permid)
projects['identifier'] = projects['identifier'].map(extract_identifier)
pros = projects[['identifier', 'permId', 'leader', 'registrator', 'registrationDate',
'modifier', 'modificationDate']]
return Things(self, 'project', pros, 'identifier')
else:
raise ValueError("No projects found!")
def _create_get_request(self, method_name, entity, permids, options):
if not isinstance(permids, list):
permids = [permids]
type = "as.dto.{}.id.{}".format(entity.lower(), entity.capitalize())
search_params = []
for permid in permids:
# decide if we got a permId or an identifier
match = re.match('/', permid)
if match:
search_params.append(
{"identifier": permid, "@type": type + 'Identifier'}
else:
{"permId": permid, "@type": type + 'PermId'}
)
fo = {}
for option in options:
fo[option] = fetch_option[option]
request = {
"method": method_name,
"params": [
self.token,
search_params,
fo
],
}
return request
def get_terms(self, vocabulary=None):
""" Returns information about vocabulary, including its controlled vocabulary
search_request = {}
if vocabulary is not None:
search_request = _gen_search_criteria({
"vocabulary": "VocabularyTerm",
"criteria": [{
"vocabulary": "Vocabulary",
"code": vocabulary
}]
})
fetch_options = {
"vocabulary": {"@type": "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions"},
"@type": "as.dto.vocabulary.fetchoptions.VocabularyTermFetchOptions"
}
request = {
"method": "searchVocabularyTerms",
"params": [self.token, search_request, fetch_options]
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
return Vocabulary(resp)
def get_tags(self):
""" Returns a DataFrame of all
"""
request = {
"method": "searchTags",
"params": [self.token, {}, {}]
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
objects = DataFrame(resp['objects'])
objects['registrationDate'] = objects['registrationDate'].map(format_timestamp)
return objects[['code', 'registrationDate']]
Swen Vermeul
committed
def get_sample_types(self, type=None):
""" Returns a list of all available sample types
"""
return self._get_types_of(
"searchSampleTypes",
"Sample",
type,
Swen Vermeul
committed
["generatedCodePrefix"]
)
def get_sample_type(self, type):
try:
return self._get_types_of(
"searchSampleTypes",
Swen Vermeul
committed
"Sample",
type,
["generatedCodePrefix"]
)
except Exception:
raise ValueError("no such sample type: {}".format(type))
def get_experiment_types(self, type=None):
""" Returns a list of all available experiment types
"""
return self._get_types_of(
"searchExperimentTypes",
"Experiment",
Swen Vermeul
committed
type
)
def get_experiment_type(self, type):
Swen Vermeul
committed
return self._get_types_of(
"searchExperimentTypes",
"Experiment",
Swen Vermeul
committed
type
)
except Exception:
raise ValueError("No such experiment type: {}".format(type))
def get_material_types(self, type=None):
""" Returns a list of all available material types
"""
return self._get_types_of("searchMaterialTypes", "Material", type)
def get_material_type(self, type):
try:
return self._get_types_of("searchMaterialTypes", "Material", type)
except Exception:
raise ValueError("No such material type: {}".format(type))
def get_dataset_types(self, type=None):
""" Returns a list (DataFrame object) of all currently available dataset types
"""
return self._get_types_of("searchDataSetTypes", "DataSet", type, ['kind'])
Swen Vermeul
committed
def get_dataset_type(self, type):
try:
return self._get_types_of("searchDataSetTypes", "DataSet", type, ['kind'])
except Exception:
raise ValueError("No such dataSet type: {}".format(type))
def _get_types_of(self, method_name, entity, type_name=None, additional_attributes=None):
""" Returns a list of all available types of an entity.
If the name of the entity-type is given, it returns a PropertyAssignments object
if additional_attributes is None:
additional_attributes = []
Swen Vermeul
committed
attributes = ['code', 'description'] + additional_attributes + ['modificationDate']
search_request = {}
fetch_options = {}
if type_name is not None:
search_request = _gen_search_criteria({
"operator": "AND",
})
fetch_options = {
Swen Vermeul
committed
"@type": "as.dto.{}.fetchoptions.{}TypeFetchOptions".format(
Swen Vermeul
committed
)
}
Swen Vermeul
committed
fetch_options['propertyAssignments'] = fetch_option['propertyAssignments']
attributes.append('propertyAssignments')
"params": [self.token, search_request, fetch_options],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if type_name is not None and len(resp['objects']) == 1:
return PropertyAssignments(self, resp['objects'][0])
if len(resp['objects']) >= 1:
types = DataFrame(resp['objects'])
Swen Vermeul
committed
types['modificationDate'] = types['modificationDate'].map(format_timestamp)
return Things(self, entity.lower() + '_type', types[attributes])
Swen Vermeul
committed
def is_session_active(self):
""" checks whether a session is still active. Returns true or false.
"""
Swen Vermeul
committed
return self.is_token_valid(self.token)
def is_token_valid(self, token=None):
Chandrasekhar Ramakrishnan
committed
"""Check if the connection to openBIS is valid.
This method is useful to check if a token is still valid or if it has timed out,
requiring the user to login again.
Chandrasekhar Ramakrishnan
committed
:return: Return True if the token is valid, False if it is not valid.
"""
if token is None:
token = self.token
if token is None:
return False
request = {
"method": "isSessionActive",
"params": [token],
resp = self._post_request(self.as_v1, request)
"""fetch a dataset and some metadata attached to it:
- properties
- sample
- parents
- children
- containers
- dataStore
- physicalData
- linkedData
:return: a DataSet object
"""
criteria = [{
"permId": permid,
"@type": "as.dto.dataset.id.DataSetPermId"
}]
fetchopts = {
"parents": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
"children": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
"containers": {"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions"},
"type": {"@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions"},
}
for option in ['tags', 'properties', 'dataStore', 'physicalData', 'linkedData',
'experiment', 'sample']:
fetchopts[option] = fetch_option[option]
request = {
"params": [self.token,
criteria,
fetchopts,
],
resp = self._post_request(self.as_v3, request)
raise ValueError('no such dataset found: ' + permid)
if resp is not None:
for permid in resp:
return DataSet(self, self.get_dataset_type(resp[permid]["type"]["code"]), resp[permid])
Swen Vermeul
committed
def get_sample(self, sample_ident, only_data=False, withAttachments=False):
Chandrasekhar Ramakrishnan
committed
"""Retrieve metadata for the sample.
Get metadata for the sample and any directly connected parents of the sample to allow access
to the same information visible in the ELN UI. The metadata will be on the file system.
:param sample_identifiers: A list of sample identifiers to retrieve.
"""
fetchopts = {"type": {"@type": "as.dto.sample.fetchoptions.SampleTypeFetchOptions"}}
Swen Vermeul
committed
search_request = search_request_for_identifier(sample_ident, 'sample')
Swen Vermeul
committed
for option in ['tags', 'properties', 'attachments', 'space', 'experiment', 'registrator', 'dataSets']:
fetchopts[option] = fetch_option[option]
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
# fetchopts["parents"] = { "@type": "as.dto.sample.fetchoptions.SampleFetchOptions" }
# fetchopts["children"] = { "@type": "as.dto.sample.fetchoptions.SampleFetchOptions" }
sample_request = {
"method": "getSamples",
"params": [
self.token,
[search_request],
Swen Vermeul
committed
fetchopts
resp = self._post_request(self.as_v3, sample_request)
parse_jackson(resp)
raise ValueError('no such sample found: ' + sample_ident)
for sample_ident in resp:
if only_data:
return resp[sample_ident]
else:
return Sample(self, self.get_sample_type(resp[sample_ident]["type"]["code"]), resp[sample_ident])
Chandrasekhar Ramakrishnan
committed
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
def get_external_data_management_system(self, perm_id, only_data=False):
"""Retrieve metadata for the external data management system.
:param perm_id: A permId for an external DMS.
:param only_data: Return the result data as a hash-map, not an object.
"""
request = {
"method": "getExternalDataManagementSystems",
"params": [
self.token,
[{
"@type": "as.dto.externaldms.id.ExternalDmsPermId",
"permId": perm_id
}],
{},
],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if resp is None or len(resp) == 0:
raise ValueError('no such external DMS found: ' + perm_id)
else:
for ident in resp:
if only_data:
return resp[ident]
else:
return ExternalDMS(self, resp[ident])
def new_space(self, name, description=None):
request = {
"method": "createSpaces",
"params": [
self.token,
"code": name,
"description": description,
"@type": "as.dto.space.create.SpaceCreation"
resp = self._post_request(self.as_v3, request)
Swen Vermeul
committed
def new_analysis(self, name, description=None, sample=None, dss_code=None, result_files=None,
notebook_files=None, parents=None):
Swen Vermeul
committed
""" An analysis contains the Jupyter notebook file(s) and some result files.
Technically this method involves uploading files to the session workspace
and activating the dropbox aka dataset ingestion service "jupyter-uploader-api"
Swen Vermeul
committed
"""
if dss_code is None:
dss_code = self.get_datastores()['code'][0]
# if a sample identifier was given, use it as a string.
# if a sample object was given, take its identifier
Swen Vermeul
committed
sampleId = None
Swen Vermeul
committed
if isinstance(sample, str):
Swen Vermeul
committed
if (is_identifier(sample)):
sampleId = {
Swen Vermeul
committed
"identifier": sample,
"@type": "as.dto.sample.id.SampleIdentifier"
}
else:
sampleId = {
Swen Vermeul
committed
"permId": sample,
"@type": "as.dto.sample.id.SamplePermId"
}
Swen Vermeul
committed
else:
sampleId = {
Swen Vermeul
committed
"identifier": sample.identifier,
"@type": "as.dto.sample.id.SampleIdentifier"
}
parentIds = []
if parents is not None:
if not isinstance(parents, list):
parants = [parents]
for parent in parents:
parentIds.append(parent.permId)
Swen Vermeul
committed
folder = time.strftime('%Y-%m-%d_%H-%M-%S')
Swen Vermeul
committed
data_sets = []
if notebook_files is not None:
notebooks_folder = os.path.join(folder, 'notebook_files')
self.upload_files(
datastore_url=datastore_url,
Swen Vermeul
committed
files=notebook_files,
folder=notebooks_folder,
Swen Vermeul
committed
wait_until_finished=True
)
data_sets.append({
"dataSetType": "JUPYTER_NOTEBOOk",
Swen Vermeul
committed
"sessionWorkspaceFolder": notebooks_folder,
"fileNames": notebook_files,
"properties": {}
Swen Vermeul
committed
})
if result_files is not None:
results_folder = os.path.join(folder, 'result_files')
self.upload_files(
datastore_url=datastore_url,
Swen Vermeul
committed
files=result_files,
folder=results_folder,
wait_until_finished=True
)
data_sets.append({
"dataSetType": "JUPYTER_RESULT",
"sessionWorkspaceFolder": results_folder,
"fileNames": result_files,
"properties": {}
Swen Vermeul
committed
})
"method": "createReportFromAggregationService",
"params": [
self.token,
dss_code,
DROPBOX_PLUGIN,
{
"sample": {"identifier": sampleId['identifier']},
"sampleId": sampleId,
"parentIds": parentIds,
"containers": [{
"dataSetType": "JUPYTER_CONTAINER",
"properties": {
"NAME": name,
"DESCRIPTION": description
}
}],
"dataSets": data_sets,
}
],
resp = self._post_request(self.reg_v1, request)
try:
if resp['rows'][0][0]['value'] == 'OK':
return resp['rows'][0][1]['value']
except:
return resp
Swen Vermeul
committed
def new_sample(self, type, **kwargs):
""" Creates a new sample of a given sample type.
return Sample(self, self.get_sample_type(type), None, **kwargs)
def new_dataset(self, type, **kwargs):
""" Creates a new dataset of a given sample type.
"""
return DataSet(self, self.get_dataset_type(type.upper()), None, **kwargs)
def _get_dss_url(self, dss_code=None):
""" internal method to get the downloadURL of a datastore.
"""
Swen Vermeul
committed
dss = self.get_datastores()
if dss_code is None:
return dss['downloadUrl'][0]
else:
Swen Vermeul
committed
return dss[dss['code'] == dss_code]['downloadUrl'][0]
Swen Vermeul
committed
def upload_files(self, datastore_url=None, files=None, folder=None, wait_until_finished=False):
Swen Vermeul
committed
if datastore_url is None:
if files is None:
raise ValueError("Please provide a filename.")
Swen Vermeul
committed
if folder is None:
# create a unique foldername
folder = time.strftime('%Y-%m-%d_%H-%M-%S')
if isinstance(files, str):
files = [files]
self.files = files
self.startByte = 0
self.endByte = 0
# define a queue to handle the upload threads
queue = DataSetUploadQueue()
real_files = []
for filename in files:
if os.path.isdir(filename):
real_files.extend(
[os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
else:
real_files.append(os.path.join(filename))
# compose the upload-URL and put URL and filename in the upload queue
for filename in real_files:
file_in_wsp = os.path.join(folder, filename)
Swen Vermeul
committed
self.files_in_wsp.append(file_in_wsp)
upload_url = (
Swen Vermeul
committed
datastore_url + '/session_workspace_file_upload'
+ '?filename=' + os.path.join(folder, filename)
+ '&id=1'
+ '&startByte=0&endByte=0'
+ '&sessionID=' + self.token
)
queue.put([upload_url, filename, self.verify_certificates])
# wait until all files have uploaded
if wait_until_finished:
queue.join()
# return files with full path in session workspace
Swen Vermeul
committed
return self.files_in_wsp
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
def __init__(self, workers=20):
# maximum files to be uploaded at once
self.upload_queue = Queue()
# define number of threads and start them
for t in range(workers):
t = Thread(target=self.upload_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the upload queue
"""
self.upload_queue.put(things)
def join(self):
""" needs to be called if you want to wait for all uploads to be finished
"""
self.upload_queue.join()
def upload_file(self):
while True:
# get the next item in the queue
upload_url, filename, verify_certificates = self.upload_queue.get()
filesize = os.path.getsize(filename)
# upload the file to our DSS session workspace
with open(filename, 'rb') as f:
resp = requests.post(upload_url, data=f, verify=verify_certificates)
resp.raise_for_status()
data = resp.json()
assert filesize == int(data['size'])
# Tell the queue that we are done
self.upload_queue.task_done()
Swen Vermeul
committed
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
def __init__(self, workers=20):
# maximum files to be downloaded at once
self.download_queue = Queue()
# define number of threads
for t in range(workers):
t = Thread(target=self.download_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the download queue
"""
self.download_queue.put(things)
def join(self):
""" needs to be called if you want to wait for all downloads to be finished
"""
self.download_queue.join()
def download_file(self):
while True:
url, filename, file_size, verify_certificates = self.download_queue.get()
Swen Vermeul
committed
# create the necessary directory structure if they don't exist yet
os.makedirs(os.path.dirname(filename), exist_ok=True)
# request the file in streaming mode
r = requests.get(url, stream=True, verify=verify_certificates)
Swen Vermeul
committed
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
Swen Vermeul
committed
f.write(chunk)
assert os.path.getsize(filename) == int(file_size)
Swen Vermeul
committed
self.download_queue.task_done()
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
class OpenBisObject():
def __init__(self, openbis_obj, type, data=None, **kwargs):
self.__dict__['openbis'] = openbis_obj
self.__dict__['type'] = type
self.__dict__['p'] = PropertyHolder(openbis_obj, type)
self.__dict__['a'] = AttrHolder(openbis_obj, 'DataSet', type)
# existing OpenBIS object
if data is not None:
self._set_data(data)
if kwargs is not None:
for key in kwargs:
setattr(self, key, kwargs[key])
def __eq__(self, other):
return str(self) == str(other)
def __ne__(self, other):
return str(self) != str(other)
def _set_data(self, data):
# assign the attribute data to self.a by calling it
# (invoking the AttrHolder.__call__ function)
self.a(data)
self.__dict__['data'] = data
# put the properties in the self.p namespace (without checking them)
for key, value in data['properties'].items():
self.p.__dict__[key.lower()] = value
return self.openbis.get_project(self._project['identifier'])
except Exception:
pass
return self.openbis.get_experiment(self._experiment['identifier'])
except Exception:
pass
return self.openbis.get_sample(self._sample['identifier'])
except Exception:
pass
def __getattr__(self, name):
return getattr(self.__dict__['a'], name)
def __setattr__(self, name, value):
if name in ['set_properties', 'set_tags', 'add_tags']:
raise ValueError("These are methods which should not be overwritten")
setattr(self.__dict__['a'], name, value)
def _repr_html_(self):
"""Print all the assigned attributes (identifier, tags, etc.) in a nicely formatted table. See
AttributeHolder class.
"""
return self.a._repr_html_()
def __repr__(self):
"""same thing as _repr_html_() but for IPython
"""
return self.a.__repr__()
Swen Vermeul
committed
class PhysicalData():
def __init__(self, data=None):
if data is None:
data = []
self.data = data
self.attrs = ['speedHint', 'complete', 'shareId', 'size',
'fileFormatType', 'storageFormat', 'location', 'presentInArchive',
'storageConfirmation', 'locatorType', 'status']
Swen Vermeul
committed
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
def __dir__(self):
return self.attrs
def __getattr__(self, name):
if name in self.attrs:
if name in self.data:
return self.data[name]
else:
return ''
def _repr_html_(self):
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
"""
for attr in self.attrs:
html += "<tr> <td>{}</td> <td>{}</td> </tr>".format(
attr, getattr(self, attr, '')
Swen Vermeul
committed
)
html += """
</tbody>
</table>
"""
return html
def __repr__(self):
headers = ['attribute', 'value']
lines = []
for attr in self.attrs: