Skip to content

Commit

Permalink
provide a one-shot cli utility
Browse files Browse the repository at this point in the history
  • Loading branch information
kklingenberg committed May 14, 2023
1 parent 7913cf4 commit 225b7ff
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 219 deletions.
222 changes: 55 additions & 167 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
[package]
name = "s3-event-bridge"
version = "0.3.1"
version = "0.4.0"
edition = "2021"

[profile.release]
strip = true
lto = true
codegen-units = 1
panic = "abort"

[dependencies]
anyhow = "1.0.69"
aws-config = "0.55.1"
Expand All @@ -19,7 +25,6 @@ regex = "1.7.1"
serde = { version = "1.0.154", features = ["derive"] }
serde_json = "1.0.96"
sha1 = "0.10.5"
shell-words = "1.1.0"
tempfile = "3.4.0"
tokio = { version = "1", features = ["macros", "process", "rt", "fs", "io-util"] }
tracing = { version = "0.1", features = ["log"] }
Expand Down
35 changes: 24 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ Configuration is achieved via the following environment variables:
they default to the equivalent of a constant `true` jq expression.
- `TARGET_BUCKET` is the bucket name that will receive outputs. If omitted, it
will default to the same bucket as the one specified in the original event.
- `HANDLER_COMMAND` is the shell expression that starts the command that handles
files and does the actual work.
- `ROOT_FOLDER_VAR` is the name of the environment variable that will be
populated for the handler program, containing the path to the temporary folder
which contains the inputs and outputs. Defaults to `ROOT_FOLDER`.
Expand All @@ -73,6 +71,11 @@ Configuration is achieved via the following environment variables:
populated for the handler program, containing object key prefix used to select
input files to be pulled, to act as inputs. Defaults to `KEY_PREFIX`.

Apart from the configuration variables, the AWS Lambda bootstrap binary needs to
receive the handler command expression as its argument (e.g. if the bootstrap
binary is place in the current directory, `./lambda-bootstrap ls` would execute
`ls` as the handler).

### A small note on map-reduce

This whole wrapper thing was made to accomplish the goal of enabling simple
Expand Down Expand Up @@ -125,14 +128,14 @@ WORKDIR /app
COPY handler.py ./

# Install the event bridge
RUN curl -L -o /usr/bin/bootstrap \
https://github.com/kklingenberg/s3-event-bridge/releases/download/v0.3.1/bootstrap && \
RUN set -ex ; \
curl https://github.com/kklingenberg/s3-event-bridge/releases/download/v0.4.0/lambda-bootstrap \
-L -o /usr/bin/bootstrap ; \
chmod +x /usr/bin/bootstrap

# Provide the instruction to be run for each event
ENV HANDLER_COMMAND="python handler.py"

ENTRYPOINT ["/usr/bin/bootstrap"]
# Provide the instruction to be run for each event
CMD ["python", "handler.py"]
```

In this example, it'll be up to the script `handler.py` to properly consider
Expand Down Expand Up @@ -165,6 +168,16 @@ queue triggering a Lambda function, with said queue only being fed events from
S3 buckets. This may change in the future, possibly to consider other kinds of
integrations with S3 (e.g. direct invocation, SNS publish/subscribe, etc.).

## Usage as CLI wrapper

Included in the release artifacts there's also a `command` utility that operates
just like the lambda bootstrap binary, but as a one-shot utility for CLI
programs.

```bash
env BUCKET=some-bucket KEY_PREFIX=some/prefix/of/keys/to/pull ./command python handler.py
```

## Usage as glue for other AWS services

> :warning: This isn't the intended use case for this utility, as the resulting
Expand All @@ -185,12 +198,12 @@ FROM debian:stable-slim
RUN set -ex ; \
apt-get update ; \
apt-get install -y groff less curl unzip ; \
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" ; \
curl https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip ; \
unzip awscliv2.zip ; \
./aws/install ; \
rm -r awscliv2.zip ./aws ; \
curl -L -o /usr/bin/bootstrap \
https://github.com/kklingenberg/s3-event-bridge/releases/download/v0.3.1/bootstrap ; \
curl https://github.com/kklingenberg/s3-event-bridge/releases/download/v0.4.0/lambda-bootstrap \
-L -o /usr/bin/bootstrap ; \
chmod +x /usr/bin/bootstrap ; \
apt-get purge -y curl unzip ; \
apt-get autoremove -y ; \
Expand All @@ -200,11 +213,11 @@ RUN set -ex ; \
WORKDIR /app
COPY command.sh ./

ENV HANDLER_COMMAND="bash command.sh"
# Don't pull any file from S3, since they're not needed
ENV PULL_MATCH_KEYS="^$"

ENTRYPOINT ["/usr/bin/bootstrap"]
CMD ["bash", "command.sh"]
```

And a script file `command.sh` like this:
Expand Down
45 changes: 18 additions & 27 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use serde::Serialize;
use std::cmp::max;
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::env::args_os;
use std::ffi::OsString;
use std::fs;
use std::path::Path;
use tempfile::TempDir;
Expand Down Expand Up @@ -48,10 +50,10 @@ pub struct App {
pub execution_filter_expr: Option<String>,

/// The program that needs to be executed as the handler.
pub handler_command_program: String,
pub handler_command_program: OsString,

/// The arguments passed to the executed handler program.
pub handler_command_args: VecDeque<String>,
pub handler_command_args: VecDeque<OsString>,
}

impl App {
Expand Down Expand Up @@ -110,15 +112,8 @@ impl App {
"Can't use both an execution filter expression and a file at the same time",
)),
}?;
// Parse handler command
let mut handler_command_args = VecDeque::from(
shell_words::split(&settings.handler_command).with_context(|| {
format!(
"Failed to shell-split handler command {:?}",
&settings.handler_command
)
})?,
);
// Gather handler command
let mut handler_command_args = VecDeque::from(args_os().skip(1).collect::<Vec<OsString>>());
let handler_command_program = handler_command_args
.pop_front()
.ok_or(anyhow!("empty handler command"))?;
Expand Down Expand Up @@ -201,7 +196,7 @@ impl App {
client: &'static aws_sdk_s3::Client,
) -> Result<()> {
let base_dir = TempDir::new().context("Failed to create temporary directory")?;
let base_path = base_dir.into_path();
let base_path = base_dir.path();
info!(
path = ?base_path,
"Created temporary directory to hold input and output files"
Expand Down Expand Up @@ -285,20 +280,20 @@ impl App {

// Fourth: compute a signature for each file pulled
let signatures = if target_bucket == batch.bucket {
compute_signatures(&base_path)
.with_context(|| format!("Failed to compute signatures in {:?}", &base_path))
compute_signatures(base_path)
.with_context(|| format!("Failed to compute signatures in {:?}", base_path))
} else {
empty_signatures()
}?;

// Fifth: invoke the handler program
info!(
command = self.settings.handler_command,
"Invoking handler command"
"Invoking handler command {:?} {:?}",
&self.handler_command_program, &self.handler_command_args
);
let status = Command::new(&self.handler_command_program)
.args(&self.handler_command_args)
.env(&self.settings.root_folder_var, &base_path)
.env(&self.settings.root_folder_var, base_path)
.env(&self.settings.bucket_var, &batch.bucket)
.env(&self.settings.key_prefix_var, &batch.prefix)
.status()
Expand All @@ -316,11 +311,8 @@ impl App {

// Sixth: upload the changed files
let differences =
find_signature_differences(&base_path, &signatures).with_context(|| {
format!(
"Failed to compute signature differences in {:?}",
&base_path
)
find_signature_differences(base_path, &signatures).with_context(|| {
format!("Failed to compute signature differences in {:?}", base_path)
})?;
info!(
total = differences.len(),
Expand All @@ -329,15 +321,14 @@ impl App {
let mut joinset: JoinSet<Result<String>> = JoinSet::new();
for path in differences {
let bucket = target_bucket.clone();
let storage_key_path = Path::new(&batch.prefix).join(
path.strip_prefix(&base_path).with_context(|| {
let storage_key_path =
Path::new(&batch.prefix).join(path.strip_prefix(base_path).with_context(|| {
format!(
"Failed to convert local file path \
to bucket path for {:?} (using base path {:?})",
path, &base_path
path, base_path
)
})?,
);
})?);
let storage_key = storage_key_path.to_string_lossy().to_string();
joinset.spawn(async move {
info!(key = ?storage_key, "Uploading file");
Expand Down
29 changes: 29 additions & 0 deletions src/bin/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use anyhow::{Context, Result};
use s3_event_bridge::{app, client};
use std::env::var;

/// Run a command with files pulled from S3, uploading the results to
/// S3 after it exits.
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.without_time()
.init();
app::init()?;
client::init().await?;

let bucket =
var(&app::current().settings.bucket_var).context(&app::current().settings.bucket_var)?;
let prefix = var(&app::current().settings.key_prefix_var)
.context(&app::current().settings.key_prefix_var)?;
let batch = app::EventBatch { bucket, prefix };

app::current()
.handle(&batch, client::current())
.await
.with_context(|| format!("Failed to handle batch of records {:?}", &batch))?;

Ok(())
}
10 changes: 5 additions & 5 deletions src/main.rs → src/bin/lambda-bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
mod app;
mod client;
mod conf;
mod sign;

use anyhow::{anyhow, Context, Result};
use aws_lambda_events::event::s3::S3Event;
use aws_lambda_events::event::sqs::SqsEventObj;
use lambda_runtime::{run, service_fn, LambdaEvent};
use s3_event_bridge::{app, client};

/// Handle each S3 event record through the handler program
async fn function_handler(event: LambdaEvent<SqsEventObj<S3Event>>) -> Result<()> {
Expand All @@ -25,6 +21,10 @@ async fn function_handler(event: LambdaEvent<SqsEventObj<S3Event>>) -> Result<()
Ok(())
}

/// Run an AWS Lambda function that listens to SQS events containing
/// batches of S3 events, that executes a handler command with files
/// pulled from S3 according to the input events, and that uploads the
/// command's results to S3 after it exits.
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
Expand Down
9 changes: 2 additions & 7 deletions src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ fn default_key_prefix_var() -> String {
/// given as environment variables.
#[derive(Deserialize)]
pub struct Settings {
/// Defines a filter to select only matching keys. The star (*)
/// can be used as a wildcard matching any number of non-slash
/// characters. E.g. to match any file in a folder, use
/// `"folder/*"`. Omitting this will make it match any file.
/// Defines a filter to select only matching keys. Use regexes to
/// match object keys. Omitting this will make it match any file.
#[serde(default)]
pub match_key: Option<String>,

Expand Down Expand Up @@ -65,9 +63,6 @@ pub struct Settings {
#[serde(default)]
pub target_bucket: Option<String>,

/// Defines the command that will be executed.
pub handler_command: String,

/// The environment variable populated with the temporary folder
/// pulled from S3, to be passed to the handler command.
#[serde(default = "default_root_folder_var")]
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod app;
pub mod client;
mod conf;
mod sign;

0 comments on commit 225b7ff

Please sign in to comment.