Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/cogify ecr deployment #95

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dags/veda_data_pipeline/groups/transfer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def subdag_transfer():
overrides={
"containerOverrides": [
{
"name": f"{mwaa_stack_conf.get('PREFIX')}-veda-cogify-transfer",
"name": f"{mwaa_stack_conf.get('PREFIX')}-veda-cogify_transfer",
"command": [
"/usr/local/bin/python",
"handler.py",
Expand Down
92 changes: 61 additions & 31 deletions docker_tasks/cogify_transfer/handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import ast
import json
import os
import re
import tempfile
from argparse import ArgumentParser
from time import sleep, time

import boto3
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles


def assume_role(role_arn, session_name="veda-airflow-pipelines_transfer_files"):
Expand All @@ -12,12 +17,7 @@ def assume_role(role_arn, session_name="veda-airflow-pipelines_transfer_files"):
RoleArn=role_arn,
RoleSessionName=session_name,
)
creds = credentials["Credentials"]
return {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds.get("SecretAccessKey"),
"aws_session_token": creds.get("SessionToken"),
}
return credentials["Credentials"]


def get_matching_files(s3_client, bucket, prefix, regex_pattern):
Expand All @@ -42,44 +42,74 @@ def get_matching_files(s3_client, bucket, prefix, regex_pattern):
return matching_files


def transfer_file(s3_client, file_key, local_file_path, destination_bucket, collection):
filename = file_key.split("/")[-1]
target_key = f"{collection}/{filename}"
s3_client.upload_file(local_file_path, destination_bucket, target_key)


def cogify_transfer_handler(event, context):
external_role_arn = os.environ["EXTERNAL_ROLE_ARN"]
creds = assume_role(external_role_arn, "veda-data-pipelines_data-transfer")
kwargs = {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds["SecretAccessKey"],
"aws_session_token": creds["SessionToken"],
}
source_s3 = boto3.client("s3")
target_s3 = boto3.client("s3", **kwargs)

def cogify_transfer_handler(event):
origin_bucket = event.get("origin_bucket")
origin_prefix = event.get("origin_prefix")
regex_pattern = event.get("filename_regex")
target_bucket = event.get("target_bucket", "veda-data-store-staging")
collection = event.get("collection")
cog_profile = event.get("cog_profile", "deflate")
dry_run = event.get("dry_run", False)

source_s3 = boto3.client("s3")
if target_bucket == "veda-data-store-staging":
external_role_arn = os.environ["EXTERNAL_ROLE_ARN"]
creds = assume_role(external_role_arn, "veda-data-pipelines_data-transfer")
kwargs = {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds["SecretAccessKey"],
"aws_session_token": creds["SessionToken"],
}
target_s3 = boto3.client("s3", **kwargs)
else:
target_s3 = boto3.client("s3")

dst_profile = cog_profiles.get(cog_profile)

matching_files = get_matching_files(
source_s3, origin_bucket, origin_prefix, regex_pattern
)
if not event.get("dry_run"):
if not dry_run:
for origin_key in matching_files:
with tempfile.NamedTemporaryFile() as local_tif, tempfile.NamedTemporaryFile() as local_cog:
with tempfile.NamedTemporaryFile(
delete=False
) as local_tif, tempfile.NamedTemporaryFile(delete=False) as local_cog:
local_tif_path = local_tif.name
local_cog_path = local_cog.name
source_s3.download_file(origin_bucket, origin_key, local_tif_path)
cog_translate(local_tif_path, local_cog_path, quiet=True)
local_tif.close()
cog_translate(local_tif_path, local_cog_path, dst_profile, quiet=True)
local_cog.close()
filename = origin_key.split("/")[-1]
destination_key = f"{collection}/{filename}"
target_s3.upload_file(local_cog_path, target_bucket, destination_key)
else:
print(
f"Would have copied {len(matching_files)} files from {origin_bucket} to {target_bucket}"
)
print(f"Files matched: {matching_files}")

# Manually delete the temporary files
os.remove(local_tif_path)
os.remove(local_cog_path)

return {"matching_files": matching_files, "dry_run": dry_run}


if __name__ == "__main__":
parser = ArgumentParser(
prog="cogify_transfer",
description="Cogify and transfer files on S3",
)
parser.add_argument(
"--payload", dest="payload", help="event passed to stac_handler function"
)
args = parser.parse_args()
# For cloud watch log to work the task should stay alive for at least 30 s
start = time()
print(f"Start at {start}")

payload_event = ast.literal_eval(args.payload)
cogify_transfer_response = cogify_transfer_handler(payload_event)
response = json.dumps({**payload_event, **cogify_transfer_response})
end = time() - start
print(f"Actual processing took {end:.2f} seconds")
# Check if it took less than 50 seconds
if end - start < 50:
sleep(50)
print(response)
2 changes: 1 addition & 1 deletion docker_tasks/cogify_transfer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ awslambdaric
boto3
pystac==1.4.0
python-cmr
rasterio==1.3.0
rasterio==1.3.3
rio-cogeo==4.0.0
shapely
smart-open==6.3.0
Expand Down
29 changes: 29 additions & 0 deletions infrastructure/task_definition.tf
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,32 @@ resource "aws_ecs_task_definition" "veda_vector_task_definition" {
cpu = 2048
memory = 4096
}

resource "aws_ecs_task_definition" "veda_transfer_task_definition" {


container_definitions = jsonencode([

{
name = "${var.prefix}-veda-cogify_transfer"
image = "${local.account_id}.dkr.ecr.${local.aws_region}.amazonaws.com/${var.prefix}-veda-cogify_transfer"
essential = true,
logConfiguration = {
"logDriver" : "awslogs",
"options" : {
"awslogs-group" : module.mwaa.log_group_name,
"awslogs-region" : local.aws_region,
"awslogs-stream-prefix" : "ecs"
}
}
}

])
family = "${var.prefix}-tasks"
requires_compatibilities = ["FARGATE"]
network_mode = "awsvpc"
execution_role_arn = module.mwaa.mwaa_role_arn
task_role_arn = module.mwaa.mwaa_role_arn
cpu = 2048
memory = 4096
}