Skip to content

Commit

Permalink
Implement Retries and Timeout for File Download
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Oct 1, 2024
1 parent 0199888 commit 553cbf9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 20 deletions.
3 changes: 3 additions & 0 deletions skymap_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
# HTTP source to download data from.
REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu"

REMOTE_DATA_DOWNLOAD_RETRIES: Final[int] = 3
REMOTE_DATA_DOWNLOAD_TIMEOUT: Final[int] = 15

# Local ephemeral directory to stage files.
LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache")

Expand Down
69 changes: 49 additions & 20 deletions skymap_scanner/utils/data_handling.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""data_handling.py."""

import logging
import subprocess
import time
from pathlib import Path
from typing import List

import requests

from .. import config as cfg

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,12 +43,12 @@ def stage_files(self, file_list: List[str]):
LOGGER.debug("File is available on staging path.")
else:
LOGGER.debug("Staging from HTTP source.")
self.fetch_file(basename)
self.download_file(basename)

else:
LOGGER.debug(f"File {basename} is available at {filepath}.")

def fetch_file(self, basename: str):
def download_file(self, basename: str):
"""Retrieves a file from the HTTP source.
Args:
Expand All @@ -55,24 +57,51 @@ def fetch_file(self, basename: str):
Raises:
RuntimeError: if the file retrieval fails.
"""
local_destination_path = self.staging_path / basename
http_source_path = f"{self.remote_path}/{basename}"
# not sure why we use the -O pattern here
cmd = [
"wget",
"-nv",
"-t",
"5",
"-O",
str(local_destination_path),
http_source_path,
]

subprocess.run(cmd, check=True)

if not local_destination_path.is_file():
dest = self.staging_path / basename
url = f"{self.remote_path}/{basename}"

def backoff_sleep(attempt: int):
"""Sleep with exponential backoff."""
sleep_duration = 2**attempt # Exponential backoff: 2, 4, 8 seconds...
LOGGER.info(f"Retrying file download in {sleep_duration} seconds...")
time.sleep(sleep_duration)

# Step 1: Download the file
attempt = 0
while True:
if attempt:
backoff_sleep(attempt)
else:
attempt += 1
# get
try:
response = requests.get(
url,
stream=True,
timeout=cfg.REMOTE_DATA_DOWNLOAD_TIMEOUT,
)
response.raise_for_status() # Check if the request was successful (2xx)
break
except requests.exceptions.RequestException as e:
if attempt > cfg.REMOTE_DATA_DOWNLOAD_RETRIES: # 'attempt' is 1-indexed
raise RuntimeError(
f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: {e}"
) from e

# Step 2: Write the file
try:
with open(dest, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
except IOError as e:
raise RuntimeError(f"File download failed during file write: {e}") from e

# Step 3: Ensure the file was created successfully
if dest.is_file():
LOGGER.debug(f"File successfully created at {dest}.")
else:
raise RuntimeError(
f"Subprocess `wget` succeeded but the resulting file is invalid:\n-> {cmd}"
f"File download failed during file write (file invalid):\n-> {dest}."
)

def get_filepath(self, filename: str) -> str:
Expand Down

0 comments on commit 553cbf9

Please sign in to comment.