From baaa8b7fae54c50d24575962870a16cc5c15aa3e Mon Sep 17 00:00:00 2001 From: vermeul <swen@ethz.ch> Date: Tue, 12 Nov 2019 16:10:52 +0100 Subject: [PATCH] fixed nasty threading bug. Using context manager to empty queue from workers when done --- pybis/src/python/pybis/dataset.py | 185 +++++++++++++++++------------- 1 file changed, 106 insertions(+), 79 deletions(-) diff --git a/pybis/src/python/pybis/dataset.py b/pybis/src/python/pybis/dataset.py index 982977b0e67..6e893eb7d11 100644 --- a/pybis/src/python/pybis/dataset.py +++ b/pybis/src/python/pybis/dataset.py @@ -1,7 +1,7 @@ import os from threading import Thread -from tabulate import tabulate from queue import Queue +from tabulate import tabulate from .openbis_object import OpenBisObject from .definitions import openbis_definitions from .utils import VERBOSE @@ -241,23 +241,21 @@ class DataSet( """ base_url = self.data['dataStore']['downloadUrl'] + '/datastore_server/' + self.permId + '/' + with DataSetDownloadQueue(workers=workers) as queue: + # get file list and start download + for filename in files: + file_info = self.get_file_list(start_folder=filename) + file_size = file_info[0]['fileSize'] + download_url = base_url + filename + '?sessionID=' + self.openbis.token + filename_dest = os.path.join(destination, self.permId, filename) + queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, 'wb']) - queue = DataSetDownloadQueue(workers=workers) - - # get file list and start download - for filename in files: - file_info = self.get_file_list(start_folder=filename) - file_size = file_info[0]['fileSize'] - download_url = base_url + filename + '?sessionID=' + self.openbis.token - filename_dest = os.path.join(destination, self.permId, filename) - queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, 'wb']) + # wait until all files have downloaded + if wait_until_finished: + queue.join() - # wait until all files have downloaded - if wait_until_finished: - queue.join() - - if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId)) - return destination + if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId)) + return destination def _download_link(self, files, destination, wait_until_finished, workers, linked_dataset_fileservice_url, content_copy_index): @@ -265,42 +263,42 @@ class DataSet( Requires the microservice server to be running at the given linked_dataset_fileservice_url. """ - queue = DataSetDownloadQueue(workers=workers, collect_files_with_wrong_length=True) + with DataSetDownloadQueue(workers=workers, collect_files_with_wrong_length=True) as queue: - if content_copy_index >= len(self.data["linkedData"]["contentCopies"]): - raise ValueError("Content Copy index out of range.") - content_copy = self.data["linkedData"]["contentCopies"][content_copy_index] + if content_copy_index >= len(self.data["linkedData"]["contentCopies"]): + raise ValueError("Content Copy index out of range.") + content_copy = self.data["linkedData"]["contentCopies"][content_copy_index] - for filename in files: - file_info = self.get_file_list(start_folder=filename) - file_size = file_info[0]['fileSize'] + for filename in files: + file_info = self.get_file_list(start_folder=filename) + file_size = file_info[0]['fileSize'] - download_url = linked_dataset_fileservice_url - download_url += "?sessionToken=" + self.openbis.token - download_url += "&datasetPermId=" + self.data["permId"]["permId"] - download_url += "&externalDMSCode=" + content_copy["externalDms"]["code"] - download_url += "&contentCopyPath=" + content_copy["path"].replace("/", "%2F") - download_url += "&datasetPathToFile=" + urllib.parse.quote(filename) + download_url = linked_dataset_fileservice_url + download_url += "?sessionToken=" + self.openbis.token + download_url += "&datasetPermId=" + self.data["permId"]["permId"] + download_url += "&externalDMSCode=" + content_copy["externalDms"]["code"] + download_url += "&contentCopyPath=" + content_copy["path"].replace("/", "%2F") + download_url += "&datasetPathToFile=" + urllib.parse.quote(filename) - filename_dest = os.path.join(destination, self.permId, filename) + filename_dest = os.path.join(destination, self.permId, filename) - # continue download if file is not complete - do nothing if it is - write_mode = 'wb' - if os.path.exists(filename_dest): - actual_size = os.path.getsize(filename_dest) - if actual_size == int(file_size): - continue - elif actual_size < int(file_size): - write_mode = 'ab' - download_url += "&offset=" + str(actual_size) + # continue download if file is not complete - do nothing if it is + write_mode = 'wb' + if os.path.exists(filename_dest): + actual_size = os.path.getsize(filename_dest) + if actual_size == int(file_size): + continue + elif actual_size < int(file_size): + write_mode = 'ab' + download_url += "&offset=" + str(actual_size) - queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, write_mode]) + queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, write_mode]) - if wait_until_finished: - queue.join() + if wait_until_finished: + queue.join() - if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId)) - return destination, queue.files_with_wrong_length + if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId)) + return destination, queue.files_with_wrong_length @property @@ -584,50 +582,60 @@ class DataSet( # define a queue to handle the upload threads - queue = DataSetUploadQueue() + with DataSetUploadQueue() as queue: - real_files = [] - for filename in files: - if os.path.isdir(filename): - real_files.extend( - [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn]) - else: - real_files.append(os.path.join(filename)) - - # compose the upload-URL and put URL and filename in the upload queue - for filename in real_files: - file_in_wsp = os.path.join(folder, os.path.basename(filename)) - url_filename = os.path.join(folder, urllib.parse.quote(os.path.basename(filename))) - self.files_in_wsp.append(file_in_wsp) - - upload_url = ( - datastore_url + '/datastore_server/session_workspace_file_upload' - + '?filename=' + url_filename - + '&id=1' - + '&startByte=0&endByte=0' - + '&sessionID=' + self.openbis.token - ) - queue.put([upload_url, filename, self.openbis.verify_certificates]) + real_files = [] + for filename in files: + if os.path.isdir(filename): + real_files.extend( + [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn]) + else: + real_files.append(os.path.join(filename)) + + # compose the upload-URL and put URL and filename in the upload queue + for filename in real_files: + file_in_wsp = os.path.join(folder, os.path.basename(filename)) + url_filename = os.path.join(folder, urllib.parse.quote(os.path.basename(filename))) + self.files_in_wsp.append(file_in_wsp) + + upload_url = ( + datastore_url + '/datastore_server/session_workspace_file_upload' + + '?filename=' + url_filename + + '&id=1' + + '&startByte=0&endByte=0' + + '&sessionID=' + self.openbis.token + ) + queue.put([upload_url, filename, self.openbis.verify_certificates]) - # wait until all files have uploaded - if wait_until_finished: - queue.join() + # wait until all files have uploaded + if wait_until_finished: + queue.join() - # return files with full path in session workspace - return self.files_in_wsp + # return files with full path in session workspace + return self.files_in_wsp class DataSetUploadQueue(): def __init__(self, workers=20): # maximum files to be uploaded at once self.upload_queue = Queue() + self.workers = workers # define number of threads and start them for t in range(workers): t = Thread(target=self.upload_file) - t.daemon = True t.start() + def __enter__(self, *args, **kwargs): + return self + + def __exit__(self, *args, **kwargs): + """This method is called at the end of a with statement. + """ + # stop the workers + for i in self.workers: + self.upload_queue.put(None) + def put(self, things): """ expects a list [url, filename] which is put into the upload queue """ @@ -636,12 +644,17 @@ class DataSetUploadQueue(): def join(self): """ needs to be called if you want to wait for all uploads to be finished """ + # block until all tasks are done self.upload_queue.join() def upload_file(self): while True: # get the next item in the queue - upload_url, filename, verify_certificates = self.upload_queue.get() + queue_item = self.upload_queue.get() + if queue_item is None: + # when we call the .join() method of the DataSetUploadQueue and empty the queue + break + upload_url, filename, verify_certificates = queue_item filesize = os.path.getsize(filename) @@ -713,14 +726,24 @@ class DataSetDownloadQueue(): def __init__(self, workers=20, collect_files_with_wrong_length=False): self.collect_files_with_wrong_length = collect_files_with_wrong_length # maximum files to be downloaded at once + self.workers = workers self.download_queue = Queue() self.files_with_wrong_length = [] # define number of threads - for t in range(workers): - t = Thread(target=self.download_file) - t.daemon = True - t.start() + for i in range(workers): + thread = Thread(target=self.download_file) + thread.start() + + def __enter__(self, *args, **kwargs): + return self + + def __exit__(self, *args, **kwargs): + """This method is called at the end of a with statement. + """ + # stop all workers + for i in range(self.workers): + self.download_queue.put(None) def put(self, things): """ expects a list [url, filename] which is put into the download queue @@ -735,7 +758,11 @@ class DataSetDownloadQueue(): def download_file(self): while True: try: - url, filename, filename_dest, file_size, verify_certificates, write_mode = self.download_queue.get() + queue_item = self.download_queue.get() + if queue_item is None: + # when we call the .join() method of the DataSetDownloadQueue and empty the queue + break + url, filename, filename_dest, file_size, verify_certificates, write_mode = queue_item # create the necessary directory structure if they don't exist yet os.makedirs(os.path.dirname(filename_dest), exist_ok=True) -- GitLab