Skip to content

Commit

Permalink
fix servesink
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 30, 2024
1 parent ac7b33b commit 2c052fe
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions rust/servesink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,37 @@ use tracing::{error, warn};

const NUMAFLOW_CALLBACK_URL_HEADER: &str = "X-Numaflow-Callback-Url";
const NUMAFLOW_ID_HEADER: &str = "X-Numaflow-Id";
const ENV_NUMAFLOW_CALLBACK_URL_KEY: &str = "NUMAFLOW_CALLBACK_URL_KEY";
const ENV_NUMAFLOW_MESSAGE_ID_KEY: &str = "NUMAFLOW_MESSAGE_ID_KEY";

/// servesink is a Numaflow Sink which forwards the payload to the Numaflow serving URL.
pub async fn servesink() -> Result<(), Box<dyn Error + Send + Sync>> {
sink::Server::new(ServeSink::new()).start().await
}

struct ServeSink {
callback_url_key: String,
message_id_key: String,
client: Client,
}

impl ServeSink {
fn new() -> Self {
// extract the callback url key from the environment
let callback_url_key = std::env::var(ENV_NUMAFLOW_CALLBACK_URL_KEY)
.unwrap_or_else(|_| NUMAFLOW_CALLBACK_URL_HEADER.to_string());

// extract the message id key from the environment
let message_id_key = std::env::var(ENV_NUMAFLOW_MESSAGE_ID_KEY)
.unwrap_or_else(|_| NUMAFLOW_ID_HEADER.to_string());

Self {
client: Client::new(),
callback_url_key,
message_id_key,
client: Client::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap(),
}
}
}
Expand All @@ -31,25 +48,25 @@ impl sink::Sinker for ServeSink {

while let Some(datum) = input.recv().await {
// if the callback url is absent, ignore the request
let url = match datum.headers.get(NUMAFLOW_CALLBACK_URL_HEADER) {
let url = match datum.headers.get(self.callback_url_key.as_str()) {
Some(url) => url,
None => {
warn!(
"Missing {} header, Ignoring the request",
NUMAFLOW_CALLBACK_URL_HEADER
self.callback_url_key
);
responses.push(Response::ok(datum.id));
continue;
}
};

// if the numaflow id is absent, ignore the request
let numaflow_id = match datum.headers.get(NUMAFLOW_ID_HEADER) {
let numaflow_id = match datum.headers.get(self.message_id_key.as_str()) {
Some(id) => id,
None => {
warn!(
"Missing {} header, Ignoring the request",
NUMAFLOW_ID_HEADER
self.message_id_key
);
responses.push(Response::ok(datum.id));
continue;
Expand All @@ -59,7 +76,7 @@ impl sink::Sinker for ServeSink {
let resp = self
.client
.post(format!("{}_{}", url, "save"))
.header(NUMAFLOW_ID_HEADER, numaflow_id)
.header(self.message_id_key.as_str(), numaflow_id)
.header("id", numaflow_id)
.body(datum.value)
.send()
Expand Down

0 comments on commit 2c052fe

Please sign in to comment.