Newer
Older
"type": { "@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions" },
"@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions"
}
request = {
"method": "searchExperiments",
"params": [ self.token,
criteria,
options,
],
}
resp = self._post_request(self.as_v3, request)
if len(resp['objects']) == 0:
raise ValueError("No experiments found!")
objects = resp['objects']
parse_jackson(objects)
experiments = DataFrame(objects)
experiments['registrationDate']= experiments['registrationDate'].map(format_timestamp)
experiments['modificationDate']= experiments['modificationDate'].map(format_timestamp)
experiments['project']= experiments['project'].map(extract_code)
experiments['registrator'] = experiments['registrator'].map(extract_person)
experiments['modifier'] = experiments['modifier'].map(extract_person)
experiments['identifier'] = experiments['identifier'].map(extract_identifier)
experiments['type'] = experiments['type'].map(extract_code)
exps = experiments[['code', 'identifier', 'project', 'type', 'registrator',
'registrationDate', 'modifier', 'modificationDate']]
return Things(self, 'experiment', exps, 'identifier')
Swen Vermeul
committed
def get_datasets(self,
code=None, type=None,
withParents=None, withChildren=None,
withSamples=None, withExperiments=None
):
sub_criteria = []
if code:
sub_criteria.append(_criteria_for_code(code))
if type:
sub_criteria.append(_subcriteria_for_type(type, 'DataSet'))
if withParents:
sub_criteria.append(_subcriteria_for_permid(withParents, 'DataSet', 'Parents'))
if withChildren:
sub_criteria.append(_subcriteria_for_permid(withChildren, 'DataSet', 'Children'))
if withSamples:
sub_criteria.append(_subcriteria_for_permid(withSamples, 'Sample'))
Swen Vermeul
committed
if withExperiments:
sub_criteria.append(_subcriteria_for_permid(withExperiments, 'Experiment'))
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.dataset.search.DataSetSearchCriteria",
"operator": "AND"
}
fetchopts = {
# "parents": { "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions" },
# "children": { "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions" },
"containers": { "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions" },
"type": { "@type": "as.dto.dataset.fetchoptions.DataSetTypeFetchOptions" }
}
for option in ['tags', 'properties', 'sample']:
fetchopts[option] = fetch_option[option]
request = {
"method": "searchDataSets",
"params": [ self.token,
criteria,
fetchopts,
],
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
if len(objects) == 0:
raise ValueError("no datasets found!")
else:
parse_jackson(objects)
datasets = DataFrame(objects)
datasets['registrationDate']= datasets['registrationDate'].map(format_timestamp)
datasets['modificationDate']= datasets['modificationDate'].map(format_timestamp)
datasets['sample']= datasets['sample'].map(extract_nested_identifier)
datasets['type']= datasets['type'].map(extract_code)
ds = Things(
self,
'dataset',
datasets[['code', 'properties', 'type', 'sample', 'registrationDate', 'modificationDate']]
)
return ds
Swen Vermeul
committed
def get_experiment(self, expId, withAttachments=False):
""" Returns an experiment object for a given identifier (expId).
"""
fetchopts = {
Swen Vermeul
committed
"@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions",
"type": {
"@type": "as.dto.experiment.fetchoptions.ExperimentTypeFetchOptions",
},
search_request = search_request_for_identifier(expId, 'experiment')
Swen Vermeul
committed
for option in ['tags', 'properties', 'attachments', 'project', 'samples']:
fetchopts[option] = fetch_option[option]
Swen Vermeul
committed
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
request = {
"method": "getExperiments",
"params": [
self.token,
[ search_request ],
fetchopts
],
}
resp = self._post_request(self.as_v3, request)
if len(resp) == 0:
raise ValueError("No such experiment: %s" % expId)
Swen Vermeul
committed
return Experiment(self,
self.get_experiment_type(resp[expId]["type"]["code"]),
resp[expId]
)
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
def new_experiment(self, project_ident, code, type, properties=None, attachments=None, tags=None):
tagIds = _create_tagIds(tags)
typeId = _create_typeId(type)
projectId = _create_projectId(project_ident)
if properties is None:
properties = {}
request = {
"method": "createExperiments",
"params": [
self.token,
[
{
"properties": properties,
"code": code,
"typeId" : typeId,
"projectId": projectId,
"tagIds": tagIds,
"attachments": attachments,
"@type": "as.dto.experiment.create.ExperimentCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
return self.get_experiment(resp[0]['permId'])
def update_experiment(self, experimentId, properties=None, tagIds=None, attachments=None):
params = {
"experimentId": {
"permId": experimentId,
"@type": "as.dto.experiment.id.ExperimentPermId"
},
"@type": "as.dto.experiment.update.ExperimentUpdate"
}
if properties is not None:
params["properties"]= properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateExperiments",
"params": [
self.token,
[ params ]
]
}
self._post_request(self.as_v3, request)
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
def create_sample(self, space_ident, code, type,
project_ident=None, experiment_ident=None, properties=None, attachments=None, tags=None):
tagIds = _create_tagIds(tags)
typeId = _create_typeId(type)
projectId = _create_projectId(project_ident)
experimentId = _create_experimentId(experiment_ident)
if properties is None:
properties = {}
request = {
"method": "createSamples",
"params": [
self.token,
[
{
"properties": properties,
"code": code,
"typeId" : typeId,
"projectId": projectId,
"experimentId": experimentId,
"tagIds": tagIds,
"attachments": attachments,
"@type": "as.dto.sample.create.SampleCreation",
}
]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_sample(resp[0]['permId'])
def update_sample(self, sampleId, space=None, project=None, experiment=None,
parents=None, children=None, components=None, properties=None, tagIds=None, attachments=None):
params = {
"sampleId": {
"permId": sampleId,
"@type": "as.dto.sample.id.SamplePermId"
},
"@type": "as.dto.sample.update.SampleUpdate"
}
if space is not None:
params['spaceId'] = space
if project is not None:
params['projectId'] = project
if properties is not None:
params["properties"]= properties
if tagIds is not None:
params["tagIds"] = tagIds
if attachments is not None:
params["attachments"] = attachments
request = {
"method": "updateSamples",
"params": [
self.token,
[ params ]
]
}
self._post_request(self.as_v3, request)
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
def delete_entity(self, what, permid, reason):
"""Deletes Spaces, Projects, Experiments, Samples and DataSets
"""
entity_type = "as.dto.{}.id.{}PermId".format(what.lower(), what.capitalize())
request = {
"method": "delete" + what.capitalize() + 's',
"params": [
self.token,
[
{
"permId": permid,
"@type": entity_type
}
],
{
"reason": reason,
"@type": "as.dto.{}.delete.{}DeletionOptions".format(what.lower(), what.capitalize())
}
]
}
self._post_request(self.as_v3, request)
def get_deletions(self):
request = {
"method": "searchDeletions",
"params": [
self.token,
{},
{
"deletedObjects": {
"@type": "as.dto.deletion.fetchoptions.DeletedObjectFetchOptions"
}
}
]
}
resp = self._post_request(self.as_v3, request)
objects = resp['objects']
parse_jackson(objects)
new_objs = []
for value in objects:
del_objs = extract_deletion(value)
if len(del_objs) > 0:
new_objs.append(*del_objs)
return DataFrame(new_objs)
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
def new_project(self, space_code, code, description, leaderId):
request = {
"method": "createProjects",
"params": [
self.token,
[
{
"code": code,
"spaceId": {
"permId": space_code,
"@type": "as.dto.space.id.SpacePermId"
},
"@type": "as.dto.project.create.ProjectCreation",
"description": description,
"leaderId": leaderId,
"attachments": None
}
]
],
}
resp = self._post_request(self.as_v3, request)
return resp
def get_project(self, projectId):
request = self._create_get_request('getProjects', 'project', projectId, ['attachments'])
resp = self._post_request(self.as_v3, request)
return resp
def get_projects(self, space=None):
""" Get a list of all available projects (DataFrame object).
"""
if space is None:
space = self.default_space
sub_criteria = []
if space:
sub_criteria.append(_subcriteria_for_code(space, 'space'))
criteria = {
"criteria": sub_criteria,
"@type": "as.dto.project.search.ProjectSearchCriteria",
"operator": "AND"
}
options = {
"registrator": { "@type": "as.dto.person.fetchoptions.PersonFetchOptions" },
"modifier": { "@type": "as.dto.person.fetchoptions.PersonFetchOptions" },
"experiments": { "@type": "as.dto.experiment.fetchoptions.ExperimentFetchOptions", },
"space": { "@type": "as.dto.space.fetchoptions.SpaceFetchOptions" },
"@type": "as.dto.project.fetchoptions.ProjectFetchOptions"
}
request = {
"method": "searchProjects",
"params": [ self.token,
criteria,
options,
],
}
resp = self._post_request(self.as_v3, request)
if resp is not None:
objects = resp['objects']
parse_jackson(objects)
projects = DataFrame(objects)
if len(projects) is 0:
raise ValueError("No projects found!")
projects['registrationDate']= projects['registrationDate'].map(format_timestamp)
projects['modificationDate']= projects['modificationDate'].map(format_timestamp)
projects['registrator'] = projects['registrator'].map(extract_person)
projects['modifier'] = projects['modifier'].map(extract_person)
projects['permid'] = projects['permId'].map(extract_permid)
projects['identifier'] = projects['identifier'].map(extract_identifier)
projects['space'] = projects['space'].map(extract_code)
pros=projects[['code', 'space', 'registrator', 'registrationDate',
'modifier', 'modificationDate', 'permid', 'identifier']]
return Things(self, 'project', pros, 'identifier')
else:
raise ValueError("No projects found!")
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
def _create_get_request(self, method_name, entity_type, permids, options):
if not isinstance(permids, list):
permids = [permids]
type = "as.dto.{}.id.{}".format(entity_type.lower(), entity_type.capitalize())
search_params = []
for permid in permids:
# decide if we got a permId or an identifier
match = re.match('/', permid)
if match:
search_params.append(
{ "identifier" : permid, "@type" : type + 'Identifier' }
)
else:
search_params.append(
{ "permId" : permid, "@type": type + 'PermId' }
)
fo = {}
for option in options:
fo[option] = fetch_option[option]
request = {
"method": method_name,
"params": [
self.token,
search_params,
fo
],
}
return request
def get_terms(self, vocabulary=None):
""" Returns information about vocabulary, including its controlled vocabulary
search_request = {}
if vocabulary is not None:
search_request = _gen_search_request( {
"vocabulary": "VocabularyTerm",
"criteria" : [{
"vocabulary": "Vocabulary",
"code": vocabulary
}]
})
fetch_options = {
"vocabulary" : { "@type" : "as.dto.vocabulary.fetchoptions.VocabularyFetchOptions" },
"@type": "as.dto.vocabulary.fetchoptions.VocabularyTermFetchOptions"
}
request = {
"method": "searchVocabularyTerms",
"params": [ self.token, search_request, fetch_options ]
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
return Vocabulary(resp)
def get_tags(self):
""" Returns a DataFrame of all
"""
request = {
"method": "searchTags",
"params": [ self.token, {}, {} ]
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
objects = DataFrame(resp['objects'])
objects['registrationDate'] = objects['registrationDate'].map(format_timestamp)
return objects[['code', 'registrationDate']]
Swen Vermeul
committed
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
def get_sample_types(self, type=None):
""" Returns a list of all available sample types
"""
return self._get_types_of(
"searchSampleTypes",
"Sample",
type,
["generatedCodePrefix"]
)
def get_sample_type(self, type):
try:
return self._get_types_of(
"searchSampleTypes",
"Sample",
type,
["generatedCodePrefix"]
)
except Exception:
raise ValueError("no such sample type: {}".format(type))
def get_experiment_types(self, type=None):
""" Returns a list of all available experiment types
"""
return self._get_types_of(
"searchExperimentTypes",
"Experiment",
type
)
def get_experiment_type(self, type):
try:
return self._get_types_of(
"searchExperimentTypes",
"Experiment",
type
)
except Exception:
raise ValueError("No such experiment type: {}".format(type))
def get_material_types(self, type=None):
""" Returns a list of all available material types
"""
return self._get_types_of("searchMaterialTypes", "Material", type)
def get_material_type(self, type):
try:
return self._get_types_of("searchMaterialTypes", "Material", type)
except Exception:
raise ValueError("No such material type: {}".format(type))
def get_dataset_types(self, type=None):
""" Returns a list (DataFrame object) of all currently available dataset types
"""
return self._get_types_of("searchDataSetTypes", "DataSet", type, ['kind'])
def get_dataset_type(self, type):
try:
return self._get_types_of("searchDataSetTypes", "DataSet", type, ['kind'])
except Exception:
raise ValueError("No such dataSet type: {}".format(type))
def _get_types_of(self, method_name, entity_type, type=None, additional_attributes=[]):
""" Returns a list of all available experiment types
"""
Swen Vermeul
committed
attributes = ['code', 'description', *additional_attributes, 'modificationDate']
search_request = {}
fetch_options = {}
if type is not None:
search_request = _gen_search_request({
Swen Vermeul
committed
entity_type.lower(): entity_type + "Type",
"operator": "AND",
"code": type
})
fetch_options = {
Swen Vermeul
committed
"@type": "as.dto.{}.fetchoptions.{}TypeFetchOptions".format(
entity_type.lower(), entity_type
)
}
Swen Vermeul
committed
fetch_options['propertyAssignments'] = fetch_option['propertyAssignments']
attributes.append('propertyAssignments')
"params": [ self.token, search_request, fetch_options ],
}
resp = self._post_request(self.as_v3, request)
parse_jackson(resp)
if type is not None and len(resp['objects']) == 1:
return PropertyAssignments(self, resp['objects'][0])
if len(resp['objects']) >= 1:
types = DataFrame(resp['objects'])
Swen Vermeul
committed
types['modificationDate'] = types['modificationDate'].map(format_timestamp)
return Things(self, entity_type.lower()+'_type', types[attributes])
Swen Vermeul
committed
Swen Vermeul
committed
def is_session_active(self):
""" checks whether a session is still active. Returns true or false.
"""
Swen Vermeul
committed
return self.is_token_valid(self.token)
def is_token_valid(self, token=None):
Chandrasekhar Ramakrishnan
committed
"""Check if the connection to openBIS is valid.
This method is useful to check if a token is still valid or if it has timed out,
requiring the user to login again.
Chandrasekhar Ramakrishnan
committed
:return: Return True if the token is valid, False if it is not valid.
"""
if token is None:
token = self.token
if token is None:
return False
request = {
"method": "isSessionActive",
"params": [ token ],
resp = self._post_request(self.as_v1, request)
"""fetch a dataset and some metadata attached to it:
- properties
- sample
- parents
- children
- containers
- dataStore
- physicalData
- linkedData
:return: a DataSet object
"""
criteria = [{
"permId": permid,
"@type": "as.dto.dataset.id.DataSetPermId"
}]
fetchopts = {
"parents": { "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions" },
"children": { "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions" },
"containers": { "@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions" },
"@type": "as.dto.dataset.fetchoptions.DataSetFetchOptions",
}
for option in ['tags', 'properties', 'dataStore', 'physicalData', 'linkedData',
'experiment', 'sample']:
fetchopts[option] = fetch_option[option]
request = {
"params": [ self.token,
criteria,
fetchopts,
resp = self._post_request(self.as_v3, request)
if resp is not None:
for permid in resp:
return DataSet(self, resp[permid])
Chandrasekhar Ramakrishnan
committed
Swen Vermeul
committed
def get_sample(self, sample_ident, only_data=False, withAttachments=False):
Chandrasekhar Ramakrishnan
committed
"""Retrieve metadata for the sample.
Get metadata for the sample and any directly connected parents of the sample to allow access
to the same information visible in the ELN UI. The metadata will be on the file system.
:param sample_identifiers: A list of sample identifiers to retrieve.
"""
Swen Vermeul
committed
fetchopts = { "type": { "@type": "as.dto.sample.fetchoptions.SampleTypeFetchOptions" } }
search_request = search_request_for_identifier(sample_ident, 'sample')
Swen Vermeul
committed
for option in ['tags', 'properties', 'attachments', 'space', 'experiment', 'registrator', 'dataSets']:
fetchopts[option] = fetch_option[option]
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
fetchopts["parents"] = { "@type": "as.dto.sample.fetchoptions.SampleFetchOptions" }
fetchopts["children"] = { "@type": "as.dto.sample.fetchoptions.SampleFetchOptions" }
if withAttachments:
fetchopts['attachments'] = fetch_option['attachmentsWithContent']
sample_request = {
"method": "getSamples",
"params": [
self.token,
[ search_request ],
Swen Vermeul
committed
fetchopts
resp = self._post_request(self.as_v3, sample_request)
parse_jackson(resp)
if resp is None or len(resp) == 0:
raise ValueError('no such sample found: '+sample_ident)
else:
for sample_ident in resp:
if only_data:
return resp[sample_ident]
else:
return Sample(self, self.get_sample_type(resp[sample_ident]["type"]["code"]), resp[sample_ident])
def new_space(self, name, description=None):
""" Creates a new space in the openBIS instance. Returns a list of all spaces
"""
request = {
"method": "createSpaces",
"params": [
self.token,
[ {
"@id": 0,
"code": name,
"description": description,
"@type": "as.dto.space.create.SpaceCreation"
} ]
],
}
resp = self._post_request(self.as_v3, request)
return self.get_spaces(refresh=True)
Swen Vermeul
committed
def new_analysis(self, name, description=None, sample=None, dss_code=None, result_files=None,
Swen Vermeul
committed
notebook_files=None, parents=None):
Swen Vermeul
committed
""" An analysis contains the Jupyter notebook file(s) and some result files.
Technically this method involves uploading files to the session workspace
and activating the dropbox aka dataset ingestion service "jupyter-uploader-api"
Swen Vermeul
committed
"""
if dss_code is None:
dss_code = self.get_datastores()['code'][0]
# if a sample identifier was given, use it as a string.
# if a sample object was given, take its identifier
Swen Vermeul
committed
sampleId = None
Swen Vermeul
committed
if isinstance(sample, str):
Swen Vermeul
committed
if (is_identifier(sample)):
sampleId = {
"identifier": sample,
"@type": "as.dto.sample.id.SampleIdentifier"
}
else:
sampleId = {
"permId": sample,
"@type": "as.dto.sample.id.SamplePermId"
}
Swen Vermeul
committed
else:
Swen Vermeul
committed
sampleId = {
"identifier": sample.identifier,
"@type": "as.dto.sample.id.SampleIdentifier"
}
parentIds = []
if parents is not None:
if not isinstance(parents, list):
parants = [parents]
for parent in parents:
parentIds.append(parent.permId)
Swen Vermeul
committed
Swen Vermeul
committed
folder = time.strftime('%Y-%m-%d_%H-%M-%S')
Swen Vermeul
committed
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
data_sets = []
if notebook_files is not None:
notebooks_folder = os.path.join(folder, 'notebook_files')
self.upload_files(
datastore_url = datastore_url,
files=notebook_files,
folder= notebooks_folder,
wait_until_finished=True
)
data_sets.append({
"dataSetType" : "JUPYTER_NOTEBOOk",
"sessionWorkspaceFolder": notebooks_folder,
"fileNames" : notebook_files,
"properties" : {}
})
if result_files is not None:
results_folder = os.path.join(folder, 'result_files')
self.upload_files(
datastore_url = datastore_url,
files=result_files,
folder=results_folder,
wait_until_finished=True
)
data_sets.append({
"dataSetType" : "JUPYTER_RESULT",
"sessionWorkspaceFolder" : results_folder,
"fileNames" : result_files,
"properties" : {}
})
request = {
"method": "createReportFromAggregationService",
"params": [
self.token,
dss_code,
Swen Vermeul
committed
DROPBOX_PLUGIN,
Swen Vermeul
committed
"identifier" : sample.identifier
Swen Vermeul
committed
"sampleId": sampleId,
"parentIds": parentIds,
Swen Vermeul
committed
"containers" : [
{
"dataSetType" : "JUPYTER_CONTAINER",
"properties" : {
"NAME" : name,
"DESCRIPTION" : description
Swen Vermeul
committed
}
Swen Vermeul
committed
"dataSets" : data_sets,
resp = self._post_request(self.reg_v1, request)
try:
if resp['rows'][0][0]['value'] == 'OK':
return resp['rows'][0][1]['value']
except:
return resp
Swen Vermeul
committed
def new_sample(self, type, **kwargs):
""" Creates a new sample of a given sample type.
return Sample(self, self.get_sample_type(type), None, **kwargs)
def _get_dss_url(self, dss_code=None):
""" internal method to get the downloadURL of a datastore.
"""
Swen Vermeul
committed
dss = self.get_datastores()
if dss_code is None:
return dss['downloadUrl'][0]
else:
Swen Vermeul
committed
return dss[dss['code'] == dss_code]['downloadUrl'][0]
Swen Vermeul
committed
def upload_files(self, datastore_url=None, files=None, folder=None, wait_until_finished=False):
Swen Vermeul
committed
if datastore_url is None:
if files is None:
raise ValueError("Please provide a filename.")
Swen Vermeul
committed
if folder is None:
# create a unique foldername
folder = time.strftime('%Y-%m-%d_%H-%M-%S')
if isinstance(files, str):
files = [files]
self.files = files
self.startByte = 0
self.endByte = 0
# define a queue to handle the upload threads
queue = DataSetUploadQueue()
real_files = []
for filename in files:
if os.path.isdir(filename):
real_files.extend([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
else:
real_files.append(os.path.join(filename))
# compose the upload-URL and put URL and filename in the upload queue
for filename in real_files:
file_in_wsp = os.path.join(folder, filename)
Swen Vermeul
committed
self.files_in_wsp.append(file_in_wsp)
upload_url = (
Swen Vermeul
committed
datastore_url + '/session_workspace_file_upload'
+ '?filename=' + os.path.join(folder,filename)
+ '&id=1'
+ '&startByte=0&endByte=0'
+ '&sessionID=' + self.token
)
queue.put([upload_url, filename, self.verify_certificates])
# wait until all files have uploaded
if wait_until_finished:
queue.join()
# return files with full path in session workspace
Swen Vermeul
committed
return self.files_in_wsp
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
class DataSetUploadQueue:
def __init__(self, workers=20):
# maximum files to be uploaded at once
self.upload_queue = Queue()
# define number of threads and start them
for t in range(workers):
t = Thread(target=self.upload_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the upload queue
"""
self.upload_queue.put(things)
def join(self):
""" needs to be called if you want to wait for all uploads to be finished
"""
self.upload_queue.join()
def upload_file(self):
while True:
# get the next item in the queue
upload_url, filename, verify_certificates = self.upload_queue.get()
filesize = os.path.getsize(filename)
# upload the file to our DSS session workspace
with open(filename, 'rb') as f:
resp = requests.post(upload_url, data=f, verify=verify_certificates)
resp.raise_for_status()
data = resp.json()
assert filesize == int(data['size'])
# Tell the queue that we are done
self.upload_queue.task_done()
Swen Vermeul
committed
class DataSetDownloadQueue:
def __init__(self, workers=20):
# maximum files to be downloaded at once
self.download_queue = Queue()
# define number of threads
for t in range(workers):
t = Thread(target=self.download_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the download queue
"""
self.download_queue.put(things)
Swen Vermeul
committed
def join(self):
""" needs to be called if you want to wait for all downloads to be finished
"""
self.download_queue.join()
def download_file(self):
while True:
url, filename, file_size, verify_certificates = self.download_queue.get()
Swen Vermeul
committed
# create the necessary directory structure if they don't exist yet
os.makedirs(os.path.dirname(filename), exist_ok=True)
# request the file in streaming mode
r = requests.get(url, stream=True, verify=verify_certificates)
Swen Vermeul
committed
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
assert os.path.getsize(filename) == int(file_size)
Swen Vermeul
committed
self.download_queue.task_done()
class DataSet():
""" DataSet are openBIS objects that contain the actual files.
"""
def __init__(self, openbis_obj, data):
self.data = data
self.permid = data["code"]
self.permId = data["code"]
if data['physicalData'] is None:
self.shareId = None
self.location = None
else:
self.shareId = data['physicalData']['shareId']
self.location = data['physicalData']['location']
def _repr_html_(self):
html = """
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>attribute</th>
<th>value</th>
</tr>
</thead>
<tbody>
<tr> <th>permId</th> <td>{}</td> </tr>
<tr> <th>properties</th> <td>{}</td> </tr>
<tr> <th>tags</th> <td>{}</td> </tr>
</tbody>
</table>
"""
return html.format(self.permid, self.data['properties'], self.data['tags'])
Swen Vermeul
committed
def download(self, files=None, wait_until_finished=True, workers=10):
""" download the actual files and put them by default in the following folder:
Swen Vermeul
committed
__current_dir__/hostname/dataset_permid/
If no files are specified, all files of a given dataset are downloaded.
Files are usually downloaded in parallel, using 10 workers by default. If you want to wait until