Newer
Older
return self.get_sample(resp[0]['permId'])
else:
raise ValueError("error while trying to fetch sample from server: " + str(resp))
def _get_dss_url(self, dss_code=None):
""" internal method to get the downloadURL of a datastore.
"""
Swen Vermeul
committed
dss = self.get_datastores()
if dss_code is None:
return dss['downloadUrl'][0]
else:
Swen Vermeul
committed
return dss[dss['code'] == dss_code]['downloadUrl'][0]
Swen Vermeul
committed
def upload_files(self, datastore_url=None, files=None, folder=None, wait_until_finished=False):
Swen Vermeul
committed
if datastore_url is None:
if files is None:
raise ValueError("Please provide a filename.")
Swen Vermeul
committed
if folder is None:
# create a unique foldername
folder = time.strftime('%Y-%m-%d_%H-%M-%S')
if isinstance(files, str):
files = [files]
self.files = files
self.startByte = 0
self.endByte = 0
# define a queue to handle the upload threads
queue = DataSetUploadQueue()
real_files = []
for filename in files:
if os.path.isdir(filename):
real_files.extend([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
else:
real_files.append(os.path.join(filename))
# compose the upload-URL and put URL and filename in the upload queue
for filename in real_files:
file_in_wsp = os.path.join(folder, filename)
Swen Vermeul
committed
self.files_in_wsp.append(file_in_wsp)
upload_url = (
Swen Vermeul
committed
datastore_url + '/session_workspace_file_upload'
+ '?filename=' + os.path.join(folder,filename)
+ '&id=1'
+ '&startByte=0&endByte=0'
+ '&sessionID=' + self.token
)
queue.put([upload_url, filename, self.verify_certificates])
# wait until all files have uploaded
if wait_until_finished:
queue.join()
# return files with full path in session workspace
Swen Vermeul
committed
return self.files_in_wsp
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
class DataSetUploadQueue:
def __init__(self, workers=20):
# maximum files to be uploaded at once
self.upload_queue = Queue()
# define number of threads and start them
for t in range(workers):
t = Thread(target=self.upload_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the upload queue
"""
self.upload_queue.put(things)
def join(self):
""" needs to be called if you want to wait for all uploads to be finished
"""
self.upload_queue.join()
def upload_file(self):
while True:
# get the next item in the queue
upload_url, filename, verify_certificates = self.upload_queue.get()
# upload the file to our DSS session workspace
with open(filename, 'rb') as f:
resp = requests.post(upload_url, data=f, verify=verify_certificates)
resp.raise_for_status()
# Tell the queue that we are done
self.upload_queue.task_done()
Swen Vermeul
committed
class DataSetDownloadQueue:
def __init__(self, workers=20):
# maximum files to be downloaded at once
self.download_queue = Queue()
# define number of threads
for t in range(workers):
t = Thread(target=self.download_file)
t.daemon = True
t.start()
def put(self, things):
""" expects a list [url, filename] which is put into the download queue
"""
self.download_queue.put(things)
Swen Vermeul
committed
def join(self):
""" needs to be called if you want to wait for all downloads to be finished
"""
self.download_queue.join()
def download_file(self):
while True:
url, filename, verify_certificates = self.download_queue.get()
Swen Vermeul
committed
# create the necessary directory structure if they don't exist yet
os.makedirs(os.path.dirname(filename), exist_ok=True)
# request the file in streaming mode
r = requests.get(url, stream=True, verify=verify_certificates)
Swen Vermeul
committed
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
self.download_queue.task_done()
class DataSet():
""" DataSet are openBIS objects that contain the actual files.
"""
def __init__(self, openbis_obj, permid, data):
self.openbis = openbis_obj
self.permid = permid
self.data = data
self.v1_ds = '/datastore_server/rmi-dss-api-v1.json'
self.downloadUrl = self.data['dataStore']['downloadUrl']
if self.data['physicalData'] is None:
self.shareId = None
self.location = None
else:
self.shareId = self.data['physicalData']['shareId']
self.location = self.data['physicalData']['location']
Swen Vermeul
committed
def download(self, files=None, wait_until_finished=False, workers=10):
""" download the actual files and put them by default in the following folder:
Swen Vermeul
committed
__current_dir__/hostname/dataset_permid/
If no files are specified, all files of a given dataset are downloaded.
Files are usually downloaded in parallel, using 10 workers by default. If you want to wait until
all the files are downloaded, set the wait_until_finished option to True.
Swen Vermeul
committed
"""
if files == None:
files = self.file_list()
elif isinstance(files, str):
files = [files]
base_url = self.downloadUrl + '/datastore_server/' + self.permid + '/'
Swen Vermeul
committed
queue = DataSetDownloadQueue(workers=workers)
# get file list and start download
for filename in files:
download_url = base_url + filename + '?sessionID=' + self.openbis.token
filename = os.path.join(self.openbis.hostname, self.permid, filename)
queue.put([download_url, filename, self.openbis.verify_certificates])
Swen Vermeul
committed
# wait until all files have downloaded
if wait_until_finished:
queue.join()
print("Files downloaded to: %s" % os.path.join(self.openbis.hostname, self.permid))
def get_parents(self):
""" Returns an array of the parents of the given dataset. Returns an empty array if no
parents were found.
"""
parents = []
for item in self.data['parents']:
parent = self.openbis.get_dataset(item['code'])
if parent is not None:
parents.append(parent)
return parents
def get_children(self):
""" Returns an array of the children of the given dataset. Returns an empty array if no
children were found.
"""
children = []
for item in self.data['children']:
child = self.openbis.get_dataset(item['code'])
if child is not None:
children.append(child)
return children
def file_list(self):
files = []
for file in self.get_file_list(recursive=True):
if file['isDirectory']:
pass
else:
files.append(file['pathInDataSet'])
return files
def get_files(self):
""" Returns a DataFrame of all files in this dataset
"""
def createRelativePath(pathInDataSet):
if self.shareId is None:
return ''
else:
return os.path.join(self.shareId, self.location, pathInDataSet)
files = self.get_file_list()
df = DataFrame(files)
df['relativePath'] = df['pathInDataSet'].map(createRelativePath)
return df[['isDirectory', 'pathInDataSet', 'fileSize', 'crc32Checksum']]
def get_file_list(self, recursive=True, start_folder="/"):
""" Lists all files of a given dataset. You can specifiy a start_folder other than "/".
By default, all directories and their containing files are listed recursively. You can
turn off this option by setting recursive=False.
"""
request = {
"method" : "listFilesForDataSet",
"params" : [
self.openbis.token,
self.permid,
recursive,
],
"id":"1"
}
resp = requests.post(
self.downloadUrl + self.v1_ds,
json.dumps(request),
verify=self.openbis.verify_certificates
)
data = resp.json()
if 'error' in data:
Swen Vermeul
committed
raise ValueError('Error from openBIS: ' + data['error'] )
elif 'result' in data:
return data['result']
Swen Vermeul
committed
raise ValueError('request to openBIS did not return either result nor error')
Swen Vermeul
committed
raise ValueError('internal error while performing post request')
class Sample(dict):
""" A Sample is one of the most commonly used objects in openBIS.
"""
def __init__(self, openbis_obj, *args, **kwargs):
super(Sample, self).__init__(*args, **kwargs)
self.__dict__ = self
self.openbis = openbis_obj
self.permid = self.permId['permId']
self.ident = self.identifier['identifier']
def delete(self, reason):
self.openbis.delete_sample(self.permid, reason)
def get_datasets(self):
objects = self.dataSets
cache = {}
for obj in objects:
for key in obj.keys():
if key in ('type'):
if isinstance(obj[key], dict):
cache[ obj[key]['@id'] ] = obj[key]
else:
if obj[key] in cache:
obj[key] = cache[ obj[key] ]
datasets = DataFrame(objects)
datasets['registrationDate'] = datasets['registrationDate'].map(format_timestamp)
datasets['properties'] = datasets['properties'].map(extract_properties)
datasets['type'] = datasets['type'].map(extract_code)
return datasets
#return datasets[['code','properties', 'type', 'registrationDate']]
def get_parents(self):
parents = []
for item in self.parents:
parent = self.openbis.get_sample(item['permId']['permId'])
if parent is not None:
parents.append(parent)
return parents
def get_children(self):
children = []
for item in self.children:
child = self.openbis.get_sample(item['permId']['permId'])
if child is not None:
children.append(child)
return children
class Space(dict):
""" managing openBIS spaces
"""
def __init__(self, openbis_obj, *args, **kwargs):
super(Space, self).__init__(*args, **kwargs)
self.__dict__ = self
self.openbis = openbis_obj
self.code = self.code
""" Lists all samples in a given space. A pandas DataFrame object is returned.
"""
return self.openbis.get_samples(space=self.code, *args, **kwargs)