Newer
Older
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
def put(self, things):
""" expects a list [url, filename] which is put into the upload queue
"""
self.upload_queue.put(things)
def join(self):
""" needs to be called if you want to wait for all uploads to be finished
"""
self.upload_queue.join()
def upload_file(self):
while True:
# get the next item in the queue
upload_url, filename, verify_certificates = self.upload_queue.get()
# upload the file to our DSS session workspace
with open(filename, 'rb') as f:
resp = requests.post(upload_url, data=f, verify=verify_certificates)
resp.raise_for_status()
# Tell the queue that we are done
self.upload_queue.task_done()
Swen Vermeul
committed
class DataSetDownloadQueue:
def __init__(self, workers=20):
# maximum files to be downloaded at once
self.download_queue = Queue()
# define number of threads
for t in range(workers):
t = Thread(target=self.download_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the download queue
"""
self.download_queue.put(things)
Swen Vermeul
committed
def join(self):
""" needs to be called if you want to wait for all downloads to be finished
"""
self.download_queue.join()
def download_file(self):
while True:
url, filename, verify_certificates = self.download_queue.get()
Swen Vermeul
committed
# create the necessary directory structure if they don't exist yet
os.makedirs(os.path.dirname(filename), exist_ok=True)
# request the file in streaming mode
r = requests.get(url, stream=True, verify=verify_certificates)
Swen Vermeul
committed
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
self.download_queue.task_done()
class DataSet():
""" DataSet are openBIS objects that contain the actual files.
"""
def __init__(self, openbis_obj, permid, data):
self.openbis = openbis_obj
self.permid = permid
self.data = data
self.v1_ds = '/datastore_server/rmi-dss-api-v1.json'
self.downloadUrl = self.data['dataStore']['downloadUrl']
Swen Vermeul
committed
def download(self, files=None, wait_until_finished=False, workers=10):
""" download the actual files and put them by default in the following folder:
Swen Vermeul
committed
__current_dir__/hostname/dataset_permid/
If no files are specified, all files of a given dataset are downloaded.
Files are usually downloaded in parallel, using 10 workers by default. If you want to wait until
all the files are downloaded, set the wait_until_finished option to True.
Swen Vermeul
committed
"""
if files == None:
files = self.file_list()
elif isinstance(files, str):
files = [files]
base_url = self.downloadUrl + '/datastore_server/' + self.permid + '/'
Swen Vermeul
committed
queue = DataSetDownloadQueue(workers=workers)
# get file list and start download
for filename in files:
download_url = base_url + filename + '?sessionID=' + self.openbis.token
filename = os.path.join(self.openbis.hostname, self.permid, filename)
queue.put([download_url, filename, self.openbis.verify_certificates])
Swen Vermeul
committed
# wait until all files have downloaded
if wait_until_finished:
queue.join()
def get_parents(self):
""" Returns an array of the parents of the given dataset. Returns an empty array if no
parents were found.
"""
parents = []
for item in self.data['parents']:
parent = self.openbis.get_dataset(item['code'])
if parent is not None:
parents.append(parent)
return parents
def get_children(self):
""" Returns an array of the children of the given dataset. Returns an empty array if no
children were found.
"""
children = []
for item in self.data['children']:
child = self.openbis.get_dataset(item['code'])
if child is not None:
children.append(child)
return children
def file_list(self):
files = []
for file in self.get_file_list(recursive=True):
if file['isDirectory']:
pass
else:
files.append(file['pathInDataSet'])
return files
def get_file_list(self, recursive=True, start_folder="/"):
""" Lists all files of a given dataset. You can specifiy a start_folder other than "/".
By default, all directories and their containing files are listed recursively. You can
turn off this option by setting recursive=False.
"""
request = {
"method" : "listFilesForDataSet",
"params" : [
self.openbis.token,
self.permid,
recursive,
],
"id":"1"
}
resp = requests.post(
self.downloadUrl + self.v1_ds,
json.dumps(request),
verify=self.openbis.verify_certificates
)
data = resp.json()
if 'error' in data:
Swen Vermeul
committed
raise ValueError('Error from openBIS: ' + data['error'] )
elif 'result' in data:
return data['result']
Swen Vermeul
committed
raise ValueError('request to openBIS did not return either result nor error')
Swen Vermeul
committed
raise ValueError('internal error while performing post request')
class Sample(dict):
""" A Sample is one of the most commonly used objects in openBIS.
"""
def __init__(self, openbis_obj, *args, **kwargs):
super(Sample, self).__init__(*args, **kwargs)
self.__dict__ = self
self.openbis = openbis_obj
self.permid = self.permId['permId']
self.ident = self.identifier['identifier']
def delete(self, permid, reason):
self.openbis.delete_sample(permid, reason)
def get_datasets(self):
datasets = []
for item in self.dataSets:
datasets.append(self.openbis.get_dataset(item['permId']['permId']))
return datasets
def get_parents(self):
parents = []
for item in self.parents:
parent = self.openbis.get_sample(item['permId']['permId'])
if parent is not None:
parents.append(parent)
return parents
def get_children(self):
children = []
for item in self.children:
child = self.openbis.get_sample(item['permId']['permId'])
if child is not None:
children.append(child)
return children
class Space(dict):
""" managing openBIS spaces
"""
def __init__(self, openbis_obj, *args, **kwargs):
super(Space, self).__init__(*args, **kwargs)
self.__dict__ = self
self.openbis = openbis_obj
self.code = self.code
def get_samples(self):
""" Lists all samples in a given space. A pandas DataFrame object is returned.
"""
Swen Vermeul
committed
fields = ['spaceCode','permId', 'identifier','experimentIdentifierOrNull']
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
request = {
"method": "searchForSamples",
"params": [
self.openbis.token,
{
"matchClauses": [
{
"@type": "AttributeMatchClause",
"fieldType": "ATTRIBUTE",
"attribute": "SPACE",
"desiredValue": self.code,
}
],
"subCriterias": [],
"operator": "MATCH_ALL_CLAUSES"
},
[
"PROPERTIES",
"PARENTS"
]
],
"id": "1",
"jsonrpc": "2.0"
}
resp = self.openbis._post_request(self.openbis.as_v1, request)
Swen Vermeul
committed
if resp is not None and len(resp) > 0:
datasets = DataFrame(resp)[fields]
Swen Vermeul
committed
else:
return None