Skip to content

Commit

Permalink
batch events by bucket and prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
kklingenberg committed Mar 25, 2023
1 parent 8c42c85 commit 6205389
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "s3-event-bridge"
version = "0.1.6"
version = "0.2.0"
edition = "2021"

[dependencies]
Expand All @@ -20,4 +20,3 @@ tempfile = "3.4.0"
tokio = { version = "1", features = ["macros", "process", "rt", "fs", "io-util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }

43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,49 @@ Configuration is achieved via the following environment variables:
- `ROOT_FOLDER_VAR` is the name of the environment variable that will be
populated for the handler program, containing the path to the temporary folder
which contains the inputs and outputs. Defaults to `ROOT_FOLDER`.
- `BUCKET_VAR` is the name of the environment variable that will be populated
for the handler program, containing the name of the bucket from which files
are being pulled to act as inputs. Defaults to `BUCKET`.
- `KEY_PREFIX_VAR` is the name of the environment variable that will be
populated for the handler program, containing object key prefix used to select
input files to be pulled, to act as inputs. Defaults to `KEY_PREFIX`.

## Deployment

This is mostly intended to be deployed as an entrypoint in a Docker image,
alongside the dependencies and runtimes of the handler program.
alongside the dependencies and runtimes of the handler program. For example, to
run a hypothetical python script `handler.py` as a lambda function, you could
write something like this:

```dockerfile
FROM python:3.11

WORKDIR /app
COPY handler.py ./

# Install the event bridge
RUN curl -L -o /usr/bin/bootstrap \
https://github.com/kklingenberg/s3-event-bridge/releases/download/v0.2.0/bootstrap && \
chmod +x /usr/bin/bootstrap

# Provide the instruction to be run for each event
ENV HANDLER_COMMAND="python handler.py"

ENTRYPOINT ["/usr/bin/bootstrap"]
```

In this example, it'll be up to the script `handler.py` to properly consider
files using the environment variable `ROOT_FOLDER` as base. For example, if such
a script expected a file named `inputs.json`, it would have to read it similarly
to:

```python
import json
import os
from pathlib import Path

base_path = Path(os.getenv("ROOT_FOLDER", "."))

with open(base_path / "inputs.json") as f:
input_data = json.load(f)
```
142 changes: 90 additions & 52 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@
use crate::client::{download, list_keys, upload};
use crate::conf::Settings;
use crate::sign::{compute_signatures, find_signature_differences};
use crate::sign::{compute_signatures, empty_signatures, find_signature_differences};
use anyhow::{anyhow, Context, Result};
use aws_lambda_events::s3::S3EventRecord;
use envy::from_env;
use once_cell::sync::OnceCell;
use regex::Regex;
use std::cmp::max;
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::path::Path;
use tempfile::TempDir;
use tokio::process::Command;
use tracing::{info, instrument};

/// A batch of S3 events that share a key prefix and represent objects
/// that belong to the same bucket.
#[derive(Debug)]
pub struct EventBatch {
pub bucket: String,
pub prefix: String,
}

/// An App is an initialized application state, derived from
/// settings. This is only useful to pre-compute stuff that will be
/// used constantly.
Expand Down Expand Up @@ -98,70 +107,92 @@ impl App {
})
}

/// Handle an S3 event record.
/// Group events according to common bucket and key prefixes.
pub fn batch_events<I>(&self, records: I) -> Vec<EventBatch>
where
I: Iterator<Item = S3EventRecord>,
{
let mut batches = BTreeSet::new();
for record in records {
let processed = (|| {
let key = record
.s3
.object
.key
.as_ref()
.ok_or_else(|| anyhow!("S3 event record is missing an object key"))?;
if !self.match_key_re.is_match(key) {
return Err(anyhow!(
"S3 event record has object key {:?} \
that doesn't match configured pattern {:?}; ignoring",
key,
self.settings.match_key
));
}
let bucket = record
.s3
.bucket
.name
.clone()
.ok_or_else(|| anyhow!("S3 event record is missing a bucket name"))?;
let prefix = if self.settings.pull_parent_dirs < 0 {
String::from("")
} else {
let mut prefix_parts = key
.split('/')
.rev()
.skip((self.settings.pull_parent_dirs + 1).try_into()?)
.collect::<Vec<&str>>()
.into_iter()
.rev()
.collect::<Vec<&str>>()
.join("/");
if !prefix_parts.is_empty() {
prefix_parts.push('/');
}
prefix_parts
};
Ok((bucket, prefix))
})();
if let Ok((bucket, prefix)) = processed {
batches.insert((bucket, prefix));
} else {
info!("Skipped event record {:?}", processed);
}
}

batches
.into_iter()
.map(|(bucket, prefix)| EventBatch { bucket, prefix })
.collect()
}

/// Handle a batch of S3 event records.
#[instrument(skip(self, client))]
pub async fn handle(&self, record: &S3EventRecord, client: &aws_sdk_s3::Client) -> Result<()> {
pub async fn handle(&self, batch: &EventBatch, client: &aws_sdk_s3::Client) -> Result<()> {
let base_dir = TempDir::new().context("Failed to create temporary directory")?;
let base_path = base_dir.into_path();
info!(
path = ?base_path,
"Created temporary directory to hold input and output files"
);

let key = record
.s3
.object
.key
.as_ref()
.ok_or_else(|| anyhow!("S3 event record is missing an object key"))?;
if !self.match_key_re.is_match(key) {
info!(
key,
pattern = self.settings.match_key,
"S3 event record has object key that doesn't match configured pattern; ignoring"
);
return Ok(());
}
let bucket = record
.s3
.bucket
.name
.as_ref()
.ok_or_else(|| anyhow!("S3 event record is missing a bucket name"))?;
let target_bucket = self
.settings
.target_bucket
.clone()
.unwrap_or_else(|| bucket.clone());
let prefix = if self.settings.pull_parent_dirs < 0 {
String::from("")
} else {
let mut prefix_parts = key
.split('/')
.rev()
.skip((self.settings.pull_parent_dirs + 1).try_into()?)
.collect::<Vec<&str>>()
.into_iter()
.rev()
.collect::<Vec<&str>>()
.join("/");
if !prefix_parts.is_empty() {
prefix_parts.push('/');
}
prefix_parts
};
.unwrap_or_else(|| batch.bucket.clone());

// First: pull all relevant files from S3, and compute a
// signature for each file pulled
info!("Downloading input files");
let mut next = None;
loop {
let (page, next_token) = list_keys(client, bucket, &prefix, &next)
let (page, next_token) = list_keys(client, &batch.bucket, &batch.prefix, &next)
.await
.with_context(|| {
format!(
"Failed to list keys under {:?} in bucket {:?}",
&prefix, bucket
&batch.prefix, &batch.bucket
)
})?;
for page_key in page {
Expand All @@ -170,14 +201,14 @@ impl App {
.iter()
.any(|re| re.is_match(&page_key))
{
let filename = page_key.strip_prefix(&prefix).unwrap_or(&page_key);
let filename = page_key.strip_prefix(&batch.prefix).unwrap_or(&page_key);
let local_path = base_path.join(filename);
download(client, bucket, &page_key, &local_path)
download(client, &batch.bucket, &page_key, &local_path)
.await
.with_context(|| {
format!(
"Failed to download object {:?} from bucket {:?}",
&page_key, bucket
&page_key, &batch.bucket
)
})?;
}
Expand All @@ -188,8 +219,12 @@ impl App {
next = next_token;
}
}
let signatures = compute_signatures(&base_path)
.with_context(|| format!("Failed to compute signatures in {:?}", &base_path))?;
let signatures = if target_bucket == batch.bucket {
compute_signatures(&base_path)
.with_context(|| format!("Failed to compute signatures in {:?}", &base_path))
} else {
empty_signatures()
}?;

// Second: invoke the handler program
info!(
Expand All @@ -199,6 +234,8 @@ impl App {
let status = Command::new(&self.handler_command_program)
.args(&self.handler_command_args)
.env(&self.settings.root_folder_var, &base_path)
.env(&self.settings.bucket_var, &batch.bucket)
.env(&self.settings.key_prefix_var, &batch.prefix)
.status()
.await
.with_context(|| {
Expand All @@ -225,14 +262,15 @@ impl App {
"Uploading files with found differences"
);
for path in differences {
let storage_key_path =
Path::new(&prefix).join(path.strip_prefix(&base_path).with_context(|| {
let storage_key_path = Path::new(&batch.prefix).join(
path.strip_prefix(&base_path).with_context(|| {
format!(
"Failed to convert local file path \
to bucket path for {:?} (using base path {:?})",
path, &base_path
)
})?);
})?,
);
let storage_key = storage_key_path.to_string_lossy();
info!(key = ?storage_key, "Uploading file");
upload(client, &target_bucket, &path, &storage_key)
Expand Down
23 changes: 22 additions & 1 deletion src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ fn default_root_folder_var() -> String {
String::from("ROOT_FOLDER")
}

/// Default `bucket_var` value.
fn default_bucket_var() -> String {
String::from("BUCKET")
}

/// Default `key_prefix_var` value.
fn default_key_prefix_var() -> String {
String::from("KEY_PREFIX")
}

/// The event bridge is configured to pull files from S3, execute a
/// command, and push resulting files to S3. The configuration must be
/// given as environment variables.
Expand Down Expand Up @@ -47,7 +57,18 @@ pub struct Settings {
pub handler_command: String,

/// The environment variable populated with the temporary folder
/// pulled from S3, to give the handler command.
/// pulled from S3, to be passed to the handler command.
#[serde(default = "default_root_folder_var")]
pub root_folder_var: String,

/// The environment variable populated with the bucket name from
/// which files are pulled, to be passed to the handler command.
#[serde(default = "default_bucket_var")]
pub bucket_var: String,

/// The environment variable populated with the object key prefix
/// used to pull files from S3, to be passed to the handler
/// command.
#[serde(default = "default_key_prefix_var")]
pub key_prefix_var: String,
}
18 changes: 11 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ use lambda_runtime::{run, service_fn, LambdaEvent};

/// Handle each S3 event record through the handler program
async fn function_handler(event: LambdaEvent<SqsEventObj<S3Event>>) -> Result<()> {
for s3_event in event.payload.records {
for record in s3_event.body.records {
app::current()
.handle(&record, client::current())
.await
.with_context(|| format!("Failed to handle record {:?}", &record))?;
}
for batch in app::current().batch_events(
event
.payload
.records
.into_iter()
.flat_map(|record| record.body.records),
) {
app::current()
.handle(&batch, client::current())
.await
.with_context(|| format!("Failed to handle batch of records {:?}", &batch))?;
}
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions src/sign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ fn hash_file(path: &Path) -> Result<String> {
Ok(Base64::encode_string(&hash))
}

/// Produces an empty snapshot, guaranteed to be different to anything
/// except another empty snapshot.
pub fn empty_signatures() -> Result<BTreeMap<PathBuf, String>> {
Ok(BTreeMap::new())
}

/// Produces a snapshot of the given folder.
pub fn compute_signatures(path: &Path) -> Result<BTreeMap<PathBuf, String>> {
let mut signatures = BTreeMap::new();
Expand Down

0 comments on commit 6205389

Please sign in to comment.