Skip to content

Commit

Permalink
Merge pull request #445 from hotosm/process_imagery
Browse files Browse the repository at this point in the history
Download Images asynchronously in Image Processing
  • Loading branch information
nrjadkry authored Jan 25, 2025
2 parents d32bedf + dcb58ef commit 85e9901
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 27 deletions.
81 changes: 54 additions & 27 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
import uuid
import tempfile
import shutil
import asyncio
import aiohttp
from pathlib import Path
from app.tasks import task_logic
from app.models.enums import State, ImageProcessingStatus
from app.utils import timestamp
from app.db import database
from app.projects import project_logic
from pyodm import Node
from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket
from app.s3 import (
get_file_from_bucket,
list_objects_from_bucket,
add_file_to_bucket,
get_presigned_url,
)
from loguru import logger as log
from concurrent.futures import ThreadPoolExecutor
from psycopg import Connection
from asgiref.sync import async_to_sync
from app.config import settings
Expand Down Expand Up @@ -78,30 +84,31 @@ def list_images(self, directory: str) -> List[str]:
images.append(str(file))
return images

def download_object(self, bucket_name: str, obj, images_folder: str):
"""
Download an object from the bucket if it's an image or related file.
async def download_image(self, session, url, save_path):
try:
async with session.get(url) as response:
if response.status == 200:
with open(save_path, "wb") as f:
f.write(await response.read())
else:
log.error(f"Failed to download {url}, status: {response.status}")
except Exception as e:
log.error(f"Error downloading {url}: {e}")

:param bucket_name: Bucket name
:param obj: Object to download
:param images_folder: Destination folder
"""
if obj.object_name.endswith(
(".jpg", ".jpeg", ".JPG", ".png", ".PNG", ".txt", ".laz")
):
local_path = Path(images_folder) / Path(obj.object_name).name
local_path.parent.mkdir(parents=True, exist_ok=True)
get_file_from_bucket(bucket_name, obj.object_name, local_path)

def download_images_from_s3(
self, bucket_name: str, local_dir: str, task_id: Optional[uuid.UUID] = None
async def download_images_from_s3(
self,
bucket_name: str,
local_dir: str,
task_id: Optional[uuid.UUID] = None,
batch_size: int = 20,
):
"""
Download images from S3 for a specific task or project.
Asynchronously download images from S3 in batches.
:param bucket_name: Bucket name
:param local_dir: Local directory to save images
:param task_id: Optional specific task ID
:param batch_size: Number of images to download concurrently
"""
prefix = (
f"dtm-data/projects/{self.project_id}/{task_id}"
Expand All @@ -110,12 +117,32 @@ def download_images_from_s3(
)
objects = list_objects_from_bucket(bucket_name, prefix)

# Limit the number of images to download concurrently
with ThreadPoolExecutor(max_workers=self.max_concurrent_downloads) as executor:
executor.map(
lambda obj: self.download_object(bucket_name, obj, local_dir),
objects,
)
if not objects:
log.warning(f"No images found in S3 for task {task_id}")
return

log.info(f"Downloading images from S3 for task {task_id}...")

s3_download_url = settings.S3_DOWNLOAD_ROOT
if s3_download_url:
object_urls = [f"{s3_download_url}/{obj.object_name}" for obj in objects]
else:
# generate pre-signed URL for each object
object_urls = [
get_presigned_url(bucket_name, obj.object_name, 12) for obj in objects
]

async with aiohttp.ClientSession() as session:
for i in range(0, len(object_urls), batch_size):
batch = object_urls[i : i + batch_size]

tasks = [
self.download_image(
session, url, os.path.join(local_dir, f"file_{i + 1}.jpg")
)
for i, url in enumerate(batch)
]
await asyncio.gather(*tasks)

def process_new_task(
self,
Expand Down Expand Up @@ -165,7 +192,7 @@ async def _process_images(
images_list = []
# Download images based on single or multiple task processing
if single_task: # and self.task_id:
self.download_images_from_s3(bucket_name, temp_dir, self.task_id)
await self.download_images_from_s3(bucket_name, temp_dir, self.task_id)
images_list = self.list_images(temp_dir)
else:
gcp_list_file = f"dtm-data/projects/{self.project_id}/gcp/gcp_list.txt"
Expand All @@ -180,7 +207,7 @@ async def _process_images(
)

for task_id in self.task_ids:
self.download_images_from_s3(bucket_name, temp_dir, task_id)
await self.download_images_from_s3(bucket_name, temp_dir, task_id)
images_list.extend(self.list_images(temp_dir))

# Start a new processing task
Expand Down
1 change: 1 addition & 0 deletions src/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"Jinja2>=3.1.4",
"numpy==1.26.4",
"GDAL==3.6.2",
"aiohttp>=3.11.11",
"aiosmtplib>=3.0.1",
"python-slugify>=8.0.4",
"psycopg2>=2.9.9",
Expand Down
Loading

0 comments on commit 85e9901

Please sign in to comment.