Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Numaflow serving sink #2103

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions rust/servesink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,37 @@ use tracing::{error, warn};

const NUMAFLOW_CALLBACK_URL_HEADER: &str = "X-Numaflow-Callback-Url";
const NUMAFLOW_ID_HEADER: &str = "X-Numaflow-Id";
const ENV_NUMAFLOW_CALLBACK_URL_KEY: &str = "NUMAFLOW_CALLBACK_URL_KEY";
const ENV_NUMAFLOW_MESSAGE_ID_KEY: &str = "NUMAFLOW_MESSAGE_ID_KEY";

/// servesink is a Numaflow Sink which forwards the payload to the Numaflow serving URL.
pub async fn servesink() -> Result<(), Box<dyn Error + Send + Sync>> {
sink::Server::new(ServeSink::new()).start().await
}

struct ServeSink {
callback_url_key: String,
message_id_key: String,
client: Client,
}

impl ServeSink {
fn new() -> Self {
// extract the callback url key from the environment
let callback_url_key = std::env::var(ENV_NUMAFLOW_CALLBACK_URL_KEY)
.unwrap_or_else(|_| NUMAFLOW_CALLBACK_URL_HEADER.to_string());

// extract the message id key from the environment
let message_id_key = std::env::var(ENV_NUMAFLOW_MESSAGE_ID_KEY)
.unwrap_or_else(|_| NUMAFLOW_ID_HEADER.to_string());

Self {
client: Client::new(),
callback_url_key,
message_id_key,
client: Client::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap(),
}
}
}
Expand All @@ -31,25 +48,25 @@ impl sink::Sinker for ServeSink {

while let Some(datum) = input.recv().await {
// if the callback url is absent, ignore the request
let url = match datum.headers.get(NUMAFLOW_CALLBACK_URL_HEADER) {
let url = match datum.headers.get(self.callback_url_key.as_str()) {
Some(url) => url,
None => {
warn!(
"Missing {} header, Ignoring the request",
NUMAFLOW_CALLBACK_URL_HEADER
self.callback_url_key
);
responses.push(Response::ok(datum.id));
continue;
}
};

// if the numaflow id is absent, ignore the request
let numaflow_id = match datum.headers.get(NUMAFLOW_ID_HEADER) {
let numaflow_id = match datum.headers.get(self.message_id_key.as_str()) {
Some(id) => id,
None => {
warn!(
"Missing {} header, Ignoring the request",
NUMAFLOW_ID_HEADER
self.message_id_key
);
responses.push(Response::ok(datum.id));
continue;
Expand All @@ -59,7 +76,7 @@ impl sink::Sinker for ServeSink {
let resp = self
.client
.post(format!("{}_{}", url, "save"))
.header(NUMAFLOW_ID_HEADER, numaflow_id)
.header(self.message_id_key.as_str(), numaflow_id)
.header("id", numaflow_id)
.body(datum.value)
.send()
Expand Down
Loading