Skip to content

Commit

Permalink
refresh site creds on file fetcher processes
Browse files Browse the repository at this point in the history
  • Loading branch information
kathia-barahona committed Feb 1, 2024
1 parent 2b2ea98 commit 1e64714
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions pghoard/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def fetch_file(self, site, key, target_path):
self.last_activity = time.monotonic()
self._start_process()
if self.mp_manager:
self.task_queue.put((site, key, target_path))
self.task_queue.put((self.config, site, key, target_path))
result = self.result_queue.get()
if result is None:
# Should only happen if the process is terminated while we're waiting for
Expand Down Expand Up @@ -70,9 +70,7 @@ def _start_process(self):
return
self.result_queue = self.mp_manager.Queue()
self.task_queue = self.mp_manager.Queue()
self.process = multiprocessing.Process(
target=_remote_file_fetch_loop, args=(self.config, self.task_queue, self.result_queue)
)
self.process = multiprocessing.Process(target=_remote_file_fetch_loop, args=(self.task_queue, self.result_queue))
self.process.start()


Expand Down Expand Up @@ -101,18 +99,26 @@ def fetch(self, site, key, target_path):
raise


def _remote_file_fetch_loop(app_config, task_queue, result_queue):
def _remote_file_fetch_loop(task_queue, result_queue):
transfers = {}
obj_storage_configs = {}
while True:
task = task_queue.get()
if not task:
return
try:
site, key, target_path = task
app_config, site, key, target_path = task
obj_storage_config = get_object_storage_config(app_config, site)
transfer = transfers.get(site)
if not transfer:
transfer = get_transfer(get_object_storage_config(app_config, site))

# even if we got a transfer for the site
# we should check if there was a change on the site's storage config, in such case
# we must get the correct transfer
if not transfer or obj_storage_configs.get(site, {}) != obj_storage_config:
transfer = get_transfer(obj_storage_config)
transfers[site] = transfer
obj_storage_configs[site] = obj_storage_config

file_size, metadata = FileFetcher(app_config, transfer).fetch(site, key, target_path)
result_queue.put((task, file_size, metadata))
except Exception as e: # pylint: disable=broad-except
Expand Down

0 comments on commit 1e64714

Please sign in to comment.