From 2c052fe39fd011496d4145efd42e9e36559264a4 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Mon, 30 Sep 2024 12:29:10 +0530 Subject: [PATCH] fix servesink Signed-off-by: Yashash H L --- rust/servesink/src/lib.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/rust/servesink/src/lib.rs b/rust/servesink/src/lib.rs index 3c384b657f..cf4354f32b 100644 --- a/rust/servesink/src/lib.rs +++ b/rust/servesink/src/lib.rs @@ -6,6 +6,8 @@ use tracing::{error, warn}; const NUMAFLOW_CALLBACK_URL_HEADER: &str = "X-Numaflow-Callback-Url"; const NUMAFLOW_ID_HEADER: &str = "X-Numaflow-Id"; +const ENV_NUMAFLOW_CALLBACK_URL_KEY: &str = "NUMAFLOW_CALLBACK_URL_KEY"; +const ENV_NUMAFLOW_MESSAGE_ID_KEY: &str = "NUMAFLOW_MESSAGE_ID_KEY"; /// servesink is a Numaflow Sink which forwards the payload to the Numaflow serving URL. pub async fn servesink() -> Result<(), Box> { @@ -13,13 +15,28 @@ pub async fn servesink() -> Result<(), Box> { } struct ServeSink { + callback_url_key: String, + message_id_key: String, client: Client, } impl ServeSink { fn new() -> Self { + // extract the callback url key from the environment + let callback_url_key = std::env::var(ENV_NUMAFLOW_CALLBACK_URL_KEY) + .unwrap_or_else(|_| NUMAFLOW_CALLBACK_URL_HEADER.to_string()); + + // extract the message id key from the environment + let message_id_key = std::env::var(ENV_NUMAFLOW_MESSAGE_ID_KEY) + .unwrap_or_else(|_| NUMAFLOW_ID_HEADER.to_string()); + Self { - client: Client::new(), + callback_url_key, + message_id_key, + client: Client::builder() + .danger_accept_invalid_certs(true) + .build() + .unwrap(), } } } @@ -31,12 +48,12 @@ impl sink::Sinker for ServeSink { while let Some(datum) = input.recv().await { // if the callback url is absent, ignore the request - let url = match datum.headers.get(NUMAFLOW_CALLBACK_URL_HEADER) { + let url = match datum.headers.get(self.callback_url_key.as_str()) { Some(url) => url, None => { warn!( "Missing {} header, Ignoring the request", - NUMAFLOW_CALLBACK_URL_HEADER + self.callback_url_key ); responses.push(Response::ok(datum.id)); continue; @@ -44,12 +61,12 @@ impl sink::Sinker for ServeSink { }; // if the numaflow id is absent, ignore the request - let numaflow_id = match datum.headers.get(NUMAFLOW_ID_HEADER) { + let numaflow_id = match datum.headers.get(self.message_id_key.as_str()) { Some(id) => id, None => { warn!( "Missing {} header, Ignoring the request", - NUMAFLOW_ID_HEADER + self.message_id_key ); responses.push(Response::ok(datum.id)); continue; @@ -59,7 +76,7 @@ impl sink::Sinker for ServeSink { let resp = self .client .post(format!("{}_{}", url, "save")) - .header(NUMAFLOW_ID_HEADER, numaflow_id) + .header(self.message_id_key.as_str(), numaflow_id) .header("id", numaflow_id) .body(datum.value) .send()