diff --git a/Cargo.lock b/Cargo.lock index 3a052c3..aa2d769 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1478,7 +1478,7 @@ checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" [[package]] name = "s3-event-bridge" -version = "0.1.6" +version = "0.2.0" dependencies = [ "anyhow", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index d89ac1e..73e3140 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "s3-event-bridge" -version = "0.1.6" +version = "0.2.0" edition = "2021" [dependencies] @@ -20,4 +20,3 @@ tempfile = "3.4.0" tokio = { version = "1", features = ["macros", "process", "rt", "fs", "io-util"] } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } - diff --git a/README.md b/README.md index e7c50e5..67ff213 100644 --- a/README.md +++ b/README.md @@ -51,8 +51,49 @@ Configuration is achieved via the following environment variables: - `ROOT_FOLDER_VAR` is the name of the environment variable that will be populated for the handler program, containing the path to the temporary folder which contains the inputs and outputs. Defaults to `ROOT_FOLDER`. +- `BUCKET_VAR` is the name of the environment variable that will be populated + for the handler program, containing the name of the bucket from which files + are being pulled to act as inputs. Defaults to `BUCKET`. +- `KEY_PREFIX_VAR` is the name of the environment variable that will be + populated for the handler program, containing object key prefix used to select + input files to be pulled, to act as inputs. Defaults to `KEY_PREFIX`. ## Deployment This is mostly intended to be deployed as an entrypoint in a Docker image, -alongside the dependencies and runtimes of the handler program. +alongside the dependencies and runtimes of the handler program. For example, to +run a hypothetical python script `handler.py` as a lambda function, you could +write something like this: + +```dockerfile +FROM python:3.11 + +WORKDIR /app +COPY handler.py ./ + +# Install the event bridge +RUN curl -L -o /usr/bin/bootstrap \ + https://github.com/kklingenberg/s3-event-bridge/releases/download/v0.2.0/bootstrap && \ + chmod +x /usr/bin/bootstrap + +# Provide the instruction to be run for each event +ENV HANDLER_COMMAND="python handler.py" + +ENTRYPOINT ["/usr/bin/bootstrap"] +``` + +In this example, it'll be up to the script `handler.py` to properly consider +files using the environment variable `ROOT_FOLDER` as base. For example, if such +a script expected a file named `inputs.json`, it would have to read it similarly +to: + +```python +import json +import os +from pathlib import Path + +base_path = Path(os.getenv("ROOT_FOLDER", ".")) + +with open(base_path / "inputs.json") as f: + input_data = json.load(f) +``` diff --git a/src/app.rs b/src/app.rs index 4c9a4b5..3b837b7 100644 --- a/src/app.rs +++ b/src/app.rs @@ -3,19 +3,28 @@ use crate::client::{download, list_keys, upload}; use crate::conf::Settings; -use crate::sign::{compute_signatures, find_signature_differences}; +use crate::sign::{compute_signatures, empty_signatures, find_signature_differences}; use anyhow::{anyhow, Context, Result}; use aws_lambda_events::s3::S3EventRecord; use envy::from_env; use once_cell::sync::OnceCell; use regex::Regex; use std::cmp::max; +use std::collections::BTreeSet; use std::collections::VecDeque; use std::path::Path; use tempfile::TempDir; use tokio::process::Command; use tracing::{info, instrument}; +/// A batch of S3 events that share a key prefix and represent objects +/// that belong to the same bucket. +#[derive(Debug)] +pub struct EventBatch { + pub bucket: String, + pub prefix: String, +} + /// An App is an initialized application state, derived from /// settings. This is only useful to pre-compute stuff that will be /// used constantly. @@ -98,70 +107,92 @@ impl App { }) } - /// Handle an S3 event record. + /// Group events according to common bucket and key prefixes. + pub fn batch_events(&self, records: I) -> Vec + where + I: Iterator, + { + let mut batches = BTreeSet::new(); + for record in records { + let processed = (|| { + let key = record + .s3 + .object + .key + .as_ref() + .ok_or_else(|| anyhow!("S3 event record is missing an object key"))?; + if !self.match_key_re.is_match(key) { + return Err(anyhow!( + "S3 event record has object key {:?} \ + that doesn't match configured pattern {:?}; ignoring", + key, + self.settings.match_key + )); + } + let bucket = record + .s3 + .bucket + .name + .clone() + .ok_or_else(|| anyhow!("S3 event record is missing a bucket name"))?; + let prefix = if self.settings.pull_parent_dirs < 0 { + String::from("") + } else { + let mut prefix_parts = key + .split('/') + .rev() + .skip((self.settings.pull_parent_dirs + 1).try_into()?) + .collect::>() + .into_iter() + .rev() + .collect::>() + .join("/"); + if !prefix_parts.is_empty() { + prefix_parts.push('/'); + } + prefix_parts + }; + Ok((bucket, prefix)) + })(); + if let Ok((bucket, prefix)) = processed { + batches.insert((bucket, prefix)); + } else { + info!("Skipped event record {:?}", processed); + } + } + + batches + .into_iter() + .map(|(bucket, prefix)| EventBatch { bucket, prefix }) + .collect() + } + + /// Handle a batch of S3 event records. #[instrument(skip(self, client))] - pub async fn handle(&self, record: &S3EventRecord, client: &aws_sdk_s3::Client) -> Result<()> { + pub async fn handle(&self, batch: &EventBatch, client: &aws_sdk_s3::Client) -> Result<()> { let base_dir = TempDir::new().context("Failed to create temporary directory")?; let base_path = base_dir.into_path(); info!( path = ?base_path, "Created temporary directory to hold input and output files" ); - - let key = record - .s3 - .object - .key - .as_ref() - .ok_or_else(|| anyhow!("S3 event record is missing an object key"))?; - if !self.match_key_re.is_match(key) { - info!( - key, - pattern = self.settings.match_key, - "S3 event record has object key that doesn't match configured pattern; ignoring" - ); - return Ok(()); - } - let bucket = record - .s3 - .bucket - .name - .as_ref() - .ok_or_else(|| anyhow!("S3 event record is missing a bucket name"))?; let target_bucket = self .settings .target_bucket .clone() - .unwrap_or_else(|| bucket.clone()); - let prefix = if self.settings.pull_parent_dirs < 0 { - String::from("") - } else { - let mut prefix_parts = key - .split('/') - .rev() - .skip((self.settings.pull_parent_dirs + 1).try_into()?) - .collect::>() - .into_iter() - .rev() - .collect::>() - .join("/"); - if !prefix_parts.is_empty() { - prefix_parts.push('/'); - } - prefix_parts - }; + .unwrap_or_else(|| batch.bucket.clone()); // First: pull all relevant files from S3, and compute a // signature for each file pulled info!("Downloading input files"); let mut next = None; loop { - let (page, next_token) = list_keys(client, bucket, &prefix, &next) + let (page, next_token) = list_keys(client, &batch.bucket, &batch.prefix, &next) .await .with_context(|| { format!( "Failed to list keys under {:?} in bucket {:?}", - &prefix, bucket + &batch.prefix, &batch.bucket ) })?; for page_key in page { @@ -170,14 +201,14 @@ impl App { .iter() .any(|re| re.is_match(&page_key)) { - let filename = page_key.strip_prefix(&prefix).unwrap_or(&page_key); + let filename = page_key.strip_prefix(&batch.prefix).unwrap_or(&page_key); let local_path = base_path.join(filename); - download(client, bucket, &page_key, &local_path) + download(client, &batch.bucket, &page_key, &local_path) .await .with_context(|| { format!( "Failed to download object {:?} from bucket {:?}", - &page_key, bucket + &page_key, &batch.bucket ) })?; } @@ -188,8 +219,12 @@ impl App { next = next_token; } } - let signatures = compute_signatures(&base_path) - .with_context(|| format!("Failed to compute signatures in {:?}", &base_path))?; + let signatures = if target_bucket == batch.bucket { + compute_signatures(&base_path) + .with_context(|| format!("Failed to compute signatures in {:?}", &base_path)) + } else { + empty_signatures() + }?; // Second: invoke the handler program info!( @@ -199,6 +234,8 @@ impl App { let status = Command::new(&self.handler_command_program) .args(&self.handler_command_args) .env(&self.settings.root_folder_var, &base_path) + .env(&self.settings.bucket_var, &batch.bucket) + .env(&self.settings.key_prefix_var, &batch.prefix) .status() .await .with_context(|| { @@ -225,14 +262,15 @@ impl App { "Uploading files with found differences" ); for path in differences { - let storage_key_path = - Path::new(&prefix).join(path.strip_prefix(&base_path).with_context(|| { + let storage_key_path = Path::new(&batch.prefix).join( + path.strip_prefix(&base_path).with_context(|| { format!( "Failed to convert local file path \ to bucket path for {:?} (using base path {:?})", path, &base_path ) - })?); + })?, + ); let storage_key = storage_key_path.to_string_lossy(); info!(key = ?storage_key, "Uploading file"); upload(client, &target_bucket, &path, &storage_key) diff --git a/src/conf.rs b/src/conf.rs index d5d6e06..27f27d3 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -7,6 +7,16 @@ fn default_root_folder_var() -> String { String::from("ROOT_FOLDER") } +/// Default `bucket_var` value. +fn default_bucket_var() -> String { + String::from("BUCKET") +} + +/// Default `key_prefix_var` value. +fn default_key_prefix_var() -> String { + String::from("KEY_PREFIX") +} + /// The event bridge is configured to pull files from S3, execute a /// command, and push resulting files to S3. The configuration must be /// given as environment variables. @@ -47,7 +57,18 @@ pub struct Settings { pub handler_command: String, /// The environment variable populated with the temporary folder - /// pulled from S3, to give the handler command. + /// pulled from S3, to be passed to the handler command. #[serde(default = "default_root_folder_var")] pub root_folder_var: String, + + /// The environment variable populated with the bucket name from + /// which files are pulled, to be passed to the handler command. + #[serde(default = "default_bucket_var")] + pub bucket_var: String, + + /// The environment variable populated with the object key prefix + /// used to pull files from S3, to be passed to the handler + /// command. + #[serde(default = "default_key_prefix_var")] + pub key_prefix_var: String, } diff --git a/src/main.rs b/src/main.rs index d7bd222..fe977e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,13 +10,17 @@ use lambda_runtime::{run, service_fn, LambdaEvent}; /// Handle each S3 event record through the handler program async fn function_handler(event: LambdaEvent>) -> Result<()> { - for s3_event in event.payload.records { - for record in s3_event.body.records { - app::current() - .handle(&record, client::current()) - .await - .with_context(|| format!("Failed to handle record {:?}", &record))?; - } + for batch in app::current().batch_events( + event + .payload + .records + .into_iter() + .flat_map(|record| record.body.records), + ) { + app::current() + .handle(&batch, client::current()) + .await + .with_context(|| format!("Failed to handle batch of records {:?}", &batch))?; } Ok(()) } diff --git a/src/sign.rs b/src/sign.rs index aa6d5f1..d6cc926 100644 --- a/src/sign.rs +++ b/src/sign.rs @@ -44,6 +44,12 @@ fn hash_file(path: &Path) -> Result { Ok(Base64::encode_string(&hash)) } +/// Produces an empty snapshot, guaranteed to be different to anything +/// except another empty snapshot. +pub fn empty_signatures() -> Result> { + Ok(BTreeMap::new()) +} + /// Produces a snapshot of the given folder. pub fn compute_signatures(path: &Path) -> Result> { let mut signatures = BTreeMap::new();