Skip to content
Snippets Groups Projects
Commit baaa8b7f authored by Swen Vermeul's avatar Swen Vermeul
Browse files

fixed nasty threading bug. Using context manager to empty queue from workers when done

parent f5d62550
No related branches found
No related tags found
No related merge requests found
import os
from threading import Thread
from tabulate import tabulate
from queue import Queue
from tabulate import tabulate
from .openbis_object import OpenBisObject
from .definitions import openbis_definitions
from .utils import VERBOSE
......@@ -241,23 +241,21 @@ class DataSet(
"""
base_url = self.data['dataStore']['downloadUrl'] + '/datastore_server/' + self.permId + '/'
with DataSetDownloadQueue(workers=workers) as queue:
# get file list and start download
for filename in files:
file_info = self.get_file_list(start_folder=filename)
file_size = file_info[0]['fileSize']
download_url = base_url + filename + '?sessionID=' + self.openbis.token
filename_dest = os.path.join(destination, self.permId, filename)
queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, 'wb'])
queue = DataSetDownloadQueue(workers=workers)
# get file list and start download
for filename in files:
file_info = self.get_file_list(start_folder=filename)
file_size = file_info[0]['fileSize']
download_url = base_url + filename + '?sessionID=' + self.openbis.token
filename_dest = os.path.join(destination, self.permId, filename)
queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, 'wb'])
# wait until all files have downloaded
if wait_until_finished:
queue.join()
# wait until all files have downloaded
if wait_until_finished:
queue.join()
if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId))
return destination
if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId))
return destination
def _download_link(self, files, destination, wait_until_finished, workers, linked_dataset_fileservice_url, content_copy_index):
......@@ -265,42 +263,42 @@ class DataSet(
Requires the microservice server to be running at the given linked_dataset_fileservice_url.
"""
queue = DataSetDownloadQueue(workers=workers, collect_files_with_wrong_length=True)
with DataSetDownloadQueue(workers=workers, collect_files_with_wrong_length=True) as queue:
if content_copy_index >= len(self.data["linkedData"]["contentCopies"]):
raise ValueError("Content Copy index out of range.")
content_copy = self.data["linkedData"]["contentCopies"][content_copy_index]
if content_copy_index >= len(self.data["linkedData"]["contentCopies"]):
raise ValueError("Content Copy index out of range.")
content_copy = self.data["linkedData"]["contentCopies"][content_copy_index]
for filename in files:
file_info = self.get_file_list(start_folder=filename)
file_size = file_info[0]['fileSize']
for filename in files:
file_info = self.get_file_list(start_folder=filename)
file_size = file_info[0]['fileSize']
download_url = linked_dataset_fileservice_url
download_url += "?sessionToken=" + self.openbis.token
download_url += "&datasetPermId=" + self.data["permId"]["permId"]
download_url += "&externalDMSCode=" + content_copy["externalDms"]["code"]
download_url += "&contentCopyPath=" + content_copy["path"].replace("/", "%2F")
download_url += "&datasetPathToFile=" + urllib.parse.quote(filename)
download_url = linked_dataset_fileservice_url
download_url += "?sessionToken=" + self.openbis.token
download_url += "&datasetPermId=" + self.data["permId"]["permId"]
download_url += "&externalDMSCode=" + content_copy["externalDms"]["code"]
download_url += "&contentCopyPath=" + content_copy["path"].replace("/", "%2F")
download_url += "&datasetPathToFile=" + urllib.parse.quote(filename)
filename_dest = os.path.join(destination, self.permId, filename)
filename_dest = os.path.join(destination, self.permId, filename)
# continue download if file is not complete - do nothing if it is
write_mode = 'wb'
if os.path.exists(filename_dest):
actual_size = os.path.getsize(filename_dest)
if actual_size == int(file_size):
continue
elif actual_size < int(file_size):
write_mode = 'ab'
download_url += "&offset=" + str(actual_size)
# continue download if file is not complete - do nothing if it is
write_mode = 'wb'
if os.path.exists(filename_dest):
actual_size = os.path.getsize(filename_dest)
if actual_size == int(file_size):
continue
elif actual_size < int(file_size):
write_mode = 'ab'
download_url += "&offset=" + str(actual_size)
queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, write_mode])
queue.put([download_url, filename, filename_dest, file_size, self.openbis.verify_certificates, write_mode])
if wait_until_finished:
queue.join()
if wait_until_finished:
queue.join()
if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId))
return destination, queue.files_with_wrong_length
if VERBOSE: print("Files downloaded to: %s" % os.path.join(destination, self.permId))
return destination, queue.files_with_wrong_length
@property
......@@ -584,50 +582,60 @@ class DataSet(
# define a queue to handle the upload threads
queue = DataSetUploadQueue()
with DataSetUploadQueue() as queue:
real_files = []
for filename in files:
if os.path.isdir(filename):
real_files.extend(
[os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
else:
real_files.append(os.path.join(filename))
# compose the upload-URL and put URL and filename in the upload queue
for filename in real_files:
file_in_wsp = os.path.join(folder, os.path.basename(filename))
url_filename = os.path.join(folder, urllib.parse.quote(os.path.basename(filename)))
self.files_in_wsp.append(file_in_wsp)
upload_url = (
datastore_url + '/datastore_server/session_workspace_file_upload'
+ '?filename=' + url_filename
+ '&id=1'
+ '&startByte=0&endByte=0'
+ '&sessionID=' + self.openbis.token
)
queue.put([upload_url, filename, self.openbis.verify_certificates])
real_files = []
for filename in files:
if os.path.isdir(filename):
real_files.extend(
[os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(filename)) for f in fn])
else:
real_files.append(os.path.join(filename))
# compose the upload-URL and put URL and filename in the upload queue
for filename in real_files:
file_in_wsp = os.path.join(folder, os.path.basename(filename))
url_filename = os.path.join(folder, urllib.parse.quote(os.path.basename(filename)))
self.files_in_wsp.append(file_in_wsp)
upload_url = (
datastore_url + '/datastore_server/session_workspace_file_upload'
+ '?filename=' + url_filename
+ '&id=1'
+ '&startByte=0&endByte=0'
+ '&sessionID=' + self.openbis.token
)
queue.put([upload_url, filename, self.openbis.verify_certificates])
# wait until all files have uploaded
if wait_until_finished:
queue.join()
# wait until all files have uploaded
if wait_until_finished:
queue.join()
# return files with full path in session workspace
return self.files_in_wsp
# return files with full path in session workspace
return self.files_in_wsp
class DataSetUploadQueue():
def __init__(self, workers=20):
# maximum files to be uploaded at once
self.upload_queue = Queue()
self.workers = workers
# define number of threads and start them
for t in range(workers):
t = Thread(target=self.upload_file)
t.daemon = True
t.start()
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, *args, **kwargs):
"""This method is called at the end of a with statement.
"""
# stop the workers
for i in self.workers:
self.upload_queue.put(None)
def put(self, things):
""" expects a list [url, filename] which is put into the upload queue
"""
......@@ -636,12 +644,17 @@ class DataSetUploadQueue():
def join(self):
""" needs to be called if you want to wait for all uploads to be finished
"""
# block until all tasks are done
self.upload_queue.join()
def upload_file(self):
while True:
# get the next item in the queue
upload_url, filename, verify_certificates = self.upload_queue.get()
queue_item = self.upload_queue.get()
if queue_item is None:
# when we call the .join() method of the DataSetUploadQueue and empty the queue
break
upload_url, filename, verify_certificates = queue_item
filesize = os.path.getsize(filename)
......@@ -713,14 +726,24 @@ class DataSetDownloadQueue():
def __init__(self, workers=20, collect_files_with_wrong_length=False):
self.collect_files_with_wrong_length = collect_files_with_wrong_length
# maximum files to be downloaded at once
self.workers = workers
self.download_queue = Queue()
self.files_with_wrong_length = []
# define number of threads
for t in range(workers):
t = Thread(target=self.download_file)
t.daemon = True
t.start()
for i in range(workers):
thread = Thread(target=self.download_file)
thread.start()
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, *args, **kwargs):
"""This method is called at the end of a with statement.
"""
# stop all workers
for i in range(self.workers):
self.download_queue.put(None)
def put(self, things):
""" expects a list [url, filename] which is put into the download queue
......@@ -735,7 +758,11 @@ class DataSetDownloadQueue():
def download_file(self):
while True:
try:
url, filename, filename_dest, file_size, verify_certificates, write_mode = self.download_queue.get()
queue_item = self.download_queue.get()
if queue_item is None:
# when we call the .join() method of the DataSetDownloadQueue and empty the queue
break
url, filename, filename_dest, file_size, verify_certificates, write_mode = queue_item
# create the necessary directory structure if they don't exist yet
os.makedirs(os.path.dirname(filename_dest), exist_ok=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment