diff --git a/serving/Cargo.lock b/serving/Cargo.lock index 4bb0fa7c0f..30fd1a14db 100644 --- a/serving/Cargo.lock +++ b/serving/Cargo.lock @@ -2777,6 +2777,7 @@ dependencies = [ "tower", "tracing", "tracing-subscriber", + "trait-variant", "uuid", ] diff --git a/serving/source-sink/Cargo.toml b/serving/source-sink/Cargo.toml index 26d4027da6..9813d52b1e 100644 --- a/serving/source-sink/Cargo.toml +++ b/serving/source-sink/Cargo.toml @@ -26,6 +26,7 @@ uuid = { version = "1.10.0", features = ["v4"] } once_cell = "1.19.0" serde_json = "1.0.122" numaflow-models = { path = "../numaflow-models"} +trait-variant = "0.1.2" rcgen = "0.13.1" rustls = { version = "0.23.12", features = ["aws_lc_rs"] } serde = { version = "1.0.204", features = ["derive"] } diff --git a/serving/source-sink/src/config.rs b/serving/source-sink/src/config.rs index 8adbc2691d..3939e16f1a 100644 --- a/serving/source-sink/src/config.rs +++ b/serving/source-sink/src/config.rs @@ -1,11 +1,14 @@ -use crate::error::Error; -use base64::prelude::BASE64_STANDARD; -use base64::Engine; -use numaflow_models::models::MonoVertex; use std::env; use std::sync::OnceLock; + +use base64::prelude::BASE64_STANDARD; +use base64::Engine; use tracing::level_filters::LevelFilter; +use numaflow_models::models::MonoVertex; + +use crate::error::Error; + const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT"; const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE"; const ENV_POD_REPLICA: &str = "NUMAFLOW_REPLICA"; @@ -125,9 +128,10 @@ impl Settings { #[cfg(test)] mod tests { - use super::*; use std::env; + use super::*; + #[test] fn test_settings_load() { // Set up environment variables diff --git a/serving/source-sink/src/forwarder.rs b/serving/source-sink/src/forwarder.rs index cd39038a7c..002d42ee9c 100644 --- a/serving/source-sink/src/forwarder.rs +++ b/serving/source-sink/src/forwarder.rs @@ -1,3 +1,9 @@ +use chrono::Utc; +use metrics::counter; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::{info, trace}; + use crate::config::config; use crate::error::{Error, Result}; use crate::metrics::{ @@ -7,11 +13,6 @@ use crate::metrics::{ use crate::sink::SinkClient; use crate::source::SourceClient; use crate::transformer::TransformerClient; -use chrono::Utc; -use metrics::counter; -use tokio::sync::oneshot; -use tokio::task::JoinSet; -use tracing::{info, trace}; const MONO_VERTEX_TYPE: &str = "mono_vertex"; @@ -22,7 +23,7 @@ pub(crate) struct Forwarder { source_client: SourceClient, sink_client: SinkClient, transformer_client: Option, - shutdown_rx: oneshot::Receiver<()>, + cln_token: CancellationToken, common_labels: Vec<(String, String)>, } @@ -32,7 +33,7 @@ impl Forwarder { source_client: SourceClient, sink_client: SinkClient, transformer_client: Option, - shutdown_rx: oneshot::Receiver<()>, + cln_token: CancellationToken, ) -> Result { let common_labels = vec![ ( @@ -48,8 +49,8 @@ impl Forwarder { source_client, sink_client, transformer_client, - shutdown_rx, common_labels, + cln_token, }) } @@ -64,7 +65,7 @@ impl Forwarder { let start_time = tokio::time::Instant::now(); // two arms, either shutdown or forward-a-chunk tokio::select! { - _ = &mut self.shutdown_rx => { + _ = self.cln_token.cancelled() => { info!("Shutdown signal received, stopping forwarder..."); break; } @@ -145,6 +146,7 @@ mod tests { use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow::{sink, source, sourcetransform}; use tokio::sync::mpsc::Sender; + use tokio_util::sync::CancellationToken; use crate::forwarder::Forwarder; use crate::sink::{SinkClient, SinkConfig}; @@ -347,7 +349,7 @@ mod tests { // Wait for the servers to start tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - let (forwarder_shutdown_tx, forwarder_shutdown_rx) = tokio::sync::oneshot::channel(); + let cln_token = CancellationToken::new(); let source_client = SourceClient::connect(source_config) .await @@ -365,7 +367,7 @@ mod tests { source_client, sink_client, Some(transformer_client), - forwarder_shutdown_rx, + cln_token.clone(), ) .await .expect("failed to create forwarder"); @@ -383,9 +385,7 @@ mod tests { ); // stop the forwarder - forwarder_shutdown_tx - .send(()) - .expect("failed to send shutdown signal"); + cln_token.cancel(); forwarder_handle .await .expect("failed to join forwarder task"); diff --git a/serving/source-sink/src/lib.rs b/serving/source-sink/src/lib.rs index 35a4d15a1a..15b59e537f 100644 --- a/serving/source-sink/src/lib.rs +++ b/serving/source-sink/src/lib.rs @@ -1,16 +1,16 @@ +use std::net::SocketAddr; use std::time::Duration; -use tokio::signal; -use tokio::sync::oneshot; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use tracing::{error, info, warn}; - +use crate::config::config; pub(crate) use crate::error::Error; use crate::forwarder::Forwarder; +use crate::metrics::{start_metrics_https_server, MetricsState}; use crate::sink::{SinkClient, SinkConfig}; use crate::source::{SourceClient, SourceConfig}; use crate::transformer::{TransformerClient, TransformerConfig}; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; pub(crate) use self::error::Result; @@ -40,13 +40,13 @@ pub(crate) mod shared; /// forwards a chunk of data from the source to the sink via an optional transformer. /// It takes an optional custom_shutdown_rx for shutting down the forwarder, useful for testing. -pub async fn run_forwarder( +pub async fn init( source_config: SourceConfig, sink_config: SinkConfig, transformer_config: Option, - custom_shutdown_rx: Option>, + cln_token: CancellationToken, ) -> Result<()> { - server_info::check_for_server_compatibility(&source_config.server_info_file) + server_info::check_for_server_compatibility(&source_config.server_info_file, cln_token.clone()) .await .map_err(|e| { warn!("Error waiting for source server info file: {:?}", e); @@ -54,11 +54,7 @@ pub async fn run_forwarder( })?; let mut source_client = SourceClient::connect(source_config).await?; - // start the lag reader to publish lag metrics - let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None); - lag_reader.start().await; - - server_info::check_for_server_compatibility(&sink_config.server_info_file) + server_info::check_for_server_compatibility(&sink_config.server_info_file, cln_token.clone()) .await .map_err(|e| { warn!("Error waiting for sink server info file: {:?}", e); @@ -68,7 +64,7 @@ pub async fn run_forwarder( let mut sink_client = SinkClient::connect(sink_config).await?; let mut transformer_client = if let Some(config) = transformer_config { - server_info::check_for_server_compatibility(&config.server_info_file) + server_info::check_for_server_compatibility(&config.server_info_file, cln_token.clone()) .await .map_err(|e| { warn!("Error waiting for transformer server info file: {:?}", e); @@ -79,8 +75,6 @@ pub async fn run_forwarder( None }; - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - // readiness check for all the ud containers wait_until_ready( &mut source_client, @@ -89,38 +83,35 @@ pub async fn run_forwarder( ) .await?; - // TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc? - let mut forwarder = - Forwarder::new(source_client, sink_client, transformer_client, shutdown_rx).await?; - - let forwarder_handle: JoinHandle> = tokio::spawn(async move { - forwarder.run().await?; - Ok(()) + // Start the metrics server, which server the prometheus metrics. + let metrics_addr: SocketAddr = format!("0.0.0.0:{}", &config().metrics_server_listen_port) + .parse() + .expect("Invalid address"); + + // Start the metrics server in a separate background async spawn, + // This should be running throughout the lifetime of the application, hence the handle is not + // joined. + let metrics_state = MetricsState { + source_client: source_client.clone(), + sink_client: sink_client.clone(), + transformer_client: transformer_client.clone(), + }; + tokio::spawn(async move { + if let Err(e) = start_metrics_https_server(metrics_addr, metrics_state).await { + error!("Metrics server error: {:?}", e); + } }); - let shutdown_handle: JoinHandle> = tokio::spawn(async move { - shutdown_signal(custom_shutdown_rx).await; - shutdown_tx - .send(()) - .map_err(|_| Error::ForwarderError("Failed to send shutdown signal".to_string()))?; - Ok(()) - }); + // start the lag reader to publish lag metrics + let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None); + lag_reader.start().await; - forwarder_handle - .await - .unwrap_or_else(|e| { - error!("Forwarder task panicked: {:?}", e); - Err(Error::ForwarderError("Forwarder task panicked".to_string())) - }) - .unwrap_or_else(|e| { - error!("Forwarder failed: {:?}", e); - }); + // TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc? + let mut forwarder = + Forwarder::new(source_client, sink_client, transformer_client, cln_token).await?; - if !shutdown_handle.is_finished() { - shutdown_handle.abort(); - } + forwarder.run().await?; - lag_reader.shutdown().await; info!("Forwarder stopped gracefully"); Ok(()) } @@ -131,18 +122,18 @@ async fn wait_until_ready( transformer_client: &mut Option, ) -> Result<()> { loop { - let source_ready = source_client.is_ready().await.is_ok(); + let source_ready = source_client.is_ready().await; if !source_ready { info!("UDSource is not ready, waiting..."); } - let sink_ready = sink_client.is_ready().await.is_ok(); + let sink_ready = sink_client.is_ready().await; if !sink_ready { info!("UDSink is not ready, waiting..."); } let transformer_ready = if let Some(client) = transformer_client { - let ready = client.is_ready().await.is_ok(); + let ready = client.is_ready().await; if !ready { info!("UDTransformer is not ready, waiting..."); } @@ -161,40 +152,6 @@ async fn wait_until_ready( Ok(()) } -async fn shutdown_signal(shutdown_rx: Option>) { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - info!("Received Ctrl+C signal"); - }; - - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - info!("Received terminate signal"); - }; - - let custom_shutdown = async { - if let Some(rx) = shutdown_rx { - rx.await.ok(); - } else { - // Create a watch channel that never sends - let (_tx, mut rx) = tokio::sync::watch::channel(()); - rx.changed().await.ok(); - } - info!("Received custom shutdown signal"); - }; - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, - _ = custom_shutdown => {}, - } -} - #[cfg(test)] mod tests { use std::env; @@ -202,6 +159,7 @@ mod tests { use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow::{sink, source}; use tokio::sync::mpsc::Sender; + use tokio_util::sync::CancellationToken; use crate::sink::SinkConfig; use crate::source::SourceConfig; @@ -284,11 +242,11 @@ mod tests { env::set_var("SOURCE_SOCKET", src_sock_file.to_str().unwrap()); env::set_var("SINK_SOCKET", sink_sock_file.to_str().unwrap()); - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let cln_token = CancellationToken::new(); + let forwarder_cln_token = cln_token.clone(); let forwarder_handle = tokio::spawn(async move { - let result = - super::run_forwarder(source_config, sink_config, None, Some(shutdown_rx)).await; + let result = super::init(source_config, sink_config, None, forwarder_cln_token).await; assert!(result.is_ok()); }); @@ -297,7 +255,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // stop the forwarder - shutdown_tx.send(()).unwrap(); + cln_token.cancel(); forwarder_handle.await.unwrap(); // stop the source and sink servers diff --git a/serving/source-sink/src/main.rs b/serving/source-sink/src/main.rs index 0013e33613..e3cfb7e6a6 100644 --- a/serving/source-sink/src/main.rs +++ b/serving/source-sink/src/main.rs @@ -1,41 +1,28 @@ +use tokio::signal; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::level_filters::LevelFilter; +use tracing::{error, info}; +use tracing_subscriber::EnvFilter; + use sourcer_sinker::config::config; -use sourcer_sinker::metrics::start_metrics_https_server; -use sourcer_sinker::run_forwarder; +use sourcer_sinker::init; use sourcer_sinker::sink::SinkConfig; use sourcer_sinker::source::SourceConfig; use sourcer_sinker::transformer::TransformerConfig; -use std::env; -use std::net::SocketAddr; -use tracing::level_filters::LevelFilter; -use tracing::{error, info}; -use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() { - let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| LevelFilter::INFO.to_string()); // Initialize the logger tracing_subscriber::fmt() .with_env_filter( EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) - .parse_lossy(log_level), + .parse_lossy(&config().log_level), ) .with_target(false) .init(); - // Start the metrics server, which server the prometheus metrics. - // TODO: make the port configurable. - let metrics_addr: SocketAddr = "0.0.0.0:2469".parse().expect("Invalid address"); - - // Start the metrics server in a separate background async spawn, - // This should be running throughout the lifetime of the application, hence the handle is not - // joined. - tokio::spawn(async move { - if let Err(e) = start_metrics_https_server(metrics_addr).await { - error!("Metrics server error: {:?}", e); - } - }); - // Initialize the source, sink and transformer configurations // We are using the default configurations for now. let source_config = SourceConfig { @@ -47,6 +34,7 @@ async fn main() { max_message_size: config().grpc_max_message_size, ..Default::default() }; + let transformer_config = if config().is_transformer_enabled { Some(TransformerConfig { max_message_size: config().grpc_max_message_size, @@ -56,10 +44,46 @@ async fn main() { None }; - // Run the forwarder - if let Err(e) = run_forwarder(source_config, sink_config, transformer_config, None).await { + let cln_token = CancellationToken::new(); + let shutdown_cln_token = cln_token.clone(); + // wait for SIG{INT,TERM} and invoke cancellation token. + let shutdown_handle: JoinHandle> = tokio::spawn(async move { + shutdown_signal().await; + shutdown_cln_token.cancel(); + Ok(()) + }); + + // Run the forwarder with cancellation token. + if let Err(e) = init(source_config, sink_config, transformer_config, cln_token).await { error!("Application error: {:?}", e); + + // abort the task since we have an error + if !shutdown_handle.is_finished() { + shutdown_handle.abort(); + } } info!("Gracefully Exiting..."); } + +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + info!("Received Ctrl+C signal"); + }; + + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + info!("Received terminate signal"); + }; + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } +} diff --git a/serving/source-sink/src/metrics.rs b/serving/source-sink/src/metrics.rs index b33b5b2fb2..896c7a768d 100644 --- a/serving/source-sink/src/metrics.rs +++ b/serving/source-sink/src/metrics.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use axum::extract::State; use axum::http::StatusCode; use axum::response::IntoResponse; use axum::{routing::get, Router}; @@ -14,11 +15,12 @@ use tokio::net::{TcpListener, ToSocketAddrs}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::time; -use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; use crate::error::Error; +use crate::sink::SinkClient; use crate::source::SourceClient; +use crate::transformer::TransformerClient; // Define the labels for the metrics pub const MONO_VERTEX_NAME: &str = "vertex"; @@ -32,16 +34,37 @@ pub const FORWARDER_READ_BYTES_TOTAL: &str = "forwarder_read_bytes_total"; pub const FORWARDER_ACK_TOTAL: &str = "forwarder_ack_total"; pub const FORWARDER_WRITE_TOTAL: &str = "forwarder_write_total"; +#[derive(Clone)] +pub(crate) struct MetricsState { + pub source_client: SourceClient, + pub sink_client: SinkClient, + pub transformer_client: Option, +} + /// Collect and emit prometheus metrics. -/// Metrics router and server -pub async fn start_metrics_http_server(addr: A) -> crate::Result<()> +/// Metrics router and server over HTTP endpoint. +// This is not used currently +#[allow(dead_code)] +pub(crate) async fn start_metrics_http_server( + addr: A, + source_client: SourceClient, + sink_client: SinkClient, + transformer_client: Option, +) -> crate::Result<()> where A: ToSocketAddrs + std::fmt::Debug, { // setup_metrics_recorder should only be invoked once let recorder_handle = setup_metrics_recorder()?; - let metrics_app = metrics_router(recorder_handle); + let metrics_app = metrics_router( + recorder_handle, + MetricsState { + source_client, + sink_client, + transformer_client, + }, + ); let listener = TcpListener::bind(&addr) .await @@ -55,9 +78,10 @@ where Ok(()) } -pub async fn start_metrics_https_server(addr: SocketAddr) -> crate::Result<()> -where -{ +pub(crate) async fn start_metrics_https_server( + addr: SocketAddr, + metrics_state: MetricsState, +) -> crate::Result<()> { let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); // Generate a self-signed certificate @@ -71,7 +95,7 @@ where // setup_metrics_recorder should only be invoked once let recorder_handle = setup_metrics_recorder()?; - let metrics_app = metrics_router(recorder_handle); + let metrics_app = metrics_router(recorder_handle, metrics_state); axum_server::bind_rustls(addr, tls_config) .serve(metrics_app.into_make_service()) @@ -82,12 +106,14 @@ where } /// router for metrics and k8s health endpoints -fn metrics_router(recorder_handle: PrometheusHandle) -> Router { +fn metrics_router(recorder_handle: PrometheusHandle, metrics_state: MetricsState) -> Router { let metrics_app = Router::new() .route("/metrics", get(move || ready(recorder_handle.render()))) .route("/livez", get(livez)) .route("/readyz", get(readyz)) - .route("/sidecar-livez", get(sidecar_livez)); + .route("/sidecar-livez", get(sidecar_livez)) + .with_state(metrics_state); + metrics_app } @@ -99,7 +125,18 @@ async fn readyz() -> impl IntoResponse { StatusCode::NO_CONTENT } -async fn sidecar_livez() -> impl IntoResponse { +async fn sidecar_livez(State(mut state): State) -> impl IntoResponse { + if !state.source_client.is_ready().await { + return StatusCode::SERVICE_UNAVAILABLE; + } + if !state.sink_client.is_ready().await { + return StatusCode::SERVICE_UNAVAILABLE; + } + if let Some(mut transformer_client) = state.transformer_client { + if !transformer_client.is_ready().await { + return StatusCode::SERVICE_UNAVAILABLE; + } + } StatusCode::NO_CONTENT } @@ -157,7 +194,6 @@ pub(crate) struct LagReader { source_client: SourceClient, lag_checking_interval: Duration, refresh_interval: Duration, - cancellation_token: CancellationToken, buildup_handle: Option>, expose_handle: Option>, pending_stats: Arc>>, @@ -174,7 +210,6 @@ impl LagReader { source_client, lag_checking_interval: lag_checking_interval.unwrap_or_else(|| Duration::from_secs(3)), refresh_interval: refresh_interval.unwrap_or_else(|| Duration::from_secs(5)), - cancellation_token: CancellationToken::new(), buildup_handle: None, expose_handle: None, pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))), @@ -187,94 +222,81 @@ impl LagReader { /// - One to periodically check the lag and update the pending stats. /// - Another to periodically expose the pending metrics. pub async fn start(&mut self) { - let token = self.cancellation_token.clone(); let source_client = self.source_client.clone(); let lag_checking_interval = self.lag_checking_interval; let refresh_interval = self.refresh_interval; let pending_stats = self.pending_stats.clone(); self.buildup_handle = Some(tokio::spawn(async move { - build_pending_info(source_client, token, lag_checking_interval, pending_stats).await; + build_pending_info(source_client, lag_checking_interval, pending_stats).await; })); - let token = self.cancellation_token.clone(); let pending_stats = self.pending_stats.clone(); self.expose_handle = Some(tokio::spawn(async move { - expose_pending_metrics(token, refresh_interval, pending_stats).await; + expose_pending_metrics(refresh_interval, pending_stats).await; })); } +} - /// Shuts down the lag reader by cancelling the tasks and waiting for them to complete. - pub(crate) async fn shutdown(self) { - self.cancellation_token.cancel(); - if let Some(handle) = self.buildup_handle { - let _ = handle.await; +/// When lag-reader is dropped, we need to clean up the pending exposer and the pending builder tasks. +impl Drop for LagReader { + fn drop(&mut self) { + if let Some(handle) = self.expose_handle.take() { + handle.abort(); } - if let Some(handle) = self.expose_handle { - let _ = handle.await; + if let Some(handle) = self.buildup_handle.take() { + handle.abort(); } + + info!("Stopped the Lag-Reader Expose and Builder tasks"); } } /// Periodically checks the pending messages from the source client and build the pending stats. async fn build_pending_info( mut source_client: SourceClient, - cancellation_token: CancellationToken, lag_checking_interval: Duration, pending_stats: Arc>>, ) { let mut ticker = time::interval(lag_checking_interval); loop { - tokio::select! { - _ = cancellation_token.cancelled() => { - return; - } - _ = ticker.tick() => { - match source_client.pending_fn().await { - Ok(pending) => { - if pending != -1 { - let mut stats = pending_stats.lock().await; - stats.push(TimestampedPending { - pending, - timestamp: std::time::Instant::now(), - }); - let n = stats.len(); - // Ensure only the most recent MAX_PENDING_STATS entries are kept - if n >= MAX_PENDING_STATS { - stats.drain(0..(n - MAX_PENDING_STATS)); - } - } - } - Err(err) => { - error!("Failed to get pending messages: {:?}", err); + ticker.tick().await; + match source_client.pending_fn().await { + Ok(pending) => { + if pending != -1 { + let mut stats = pending_stats.lock().await; + stats.push(TimestampedPending { + pending, + timestamp: std::time::Instant::now(), + }); + let n = stats.len(); + // Ensure only the most recent MAX_PENDING_STATS entries are kept + if n >= MAX_PENDING_STATS { + stats.drain(0..(n - MAX_PENDING_STATS)); } } } + Err(err) => { + error!("Failed to get pending messages: {:?}", err); + } } } } // Periodically exposes the pending metrics by calculating the average pending messages over different intervals. async fn expose_pending_metrics( - cancellation_token: CancellationToken, refresh_interval: Duration, pending_stats: Arc>>, ) { let mut ticker = time::interval(refresh_interval); let lookback_seconds_map = vec![("1m", 60), ("5m", 300), ("15m", 900)]; loop { - tokio::select! { - _ = cancellation_token.cancelled() => { - return; - } - _ = ticker.tick() => { - for (label, seconds) in &lookback_seconds_map { - let pending = calculate_pending(*seconds, &pending_stats).await; - if pending != -1 { - // TODO: emit it as a metric - info!("Pending messages ({}): {}", label, pending); - } - } + ticker.tick().await; + for (label, seconds) in &lookback_seconds_map { + let pending = calculate_pending(*seconds, &pending_stats).await; + if pending != -1 { + // TODO: emit it as a metric + info!("Pending messages ({}): {}", label, pending); } } } @@ -307,27 +329,4 @@ async fn calculate_pending( result } -#[cfg(test)] -mod tests { - use std::net::SocketAddr; - use std::time::Duration; - - use tokio::time::sleep; - - use super::*; - - #[tokio::test] - async fn test_start_metrics_server() { - let addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let server = tokio::spawn(async move { - let result = start_metrics_http_server(addr).await; - assert!(result.is_ok()) - }); - - // Give the server a little bit of time to start - sleep(Duration::from_millis(100)).await; - - // Stop the server - server.abort(); - } -} +// TODO add tests diff --git a/serving/source-sink/src/server_info.rs b/serving/source-sink/src/server_info.rs index fe841fe99e..7412b2ca9d 100644 --- a/serving/source-sink/src/server_info.rs +++ b/serving/source-sink/src/server_info.rs @@ -7,10 +7,10 @@ use pep440_rs::{Version as PepVersion, VersionSpecifier}; use semver::{Version, VersionReq}; use serde::{Deserialize, Serialize}; use tokio::time::sleep; +use tokio_util::sync::CancellationToken; use tracing::{info, warn}; -use crate::error; -use crate::error::Error; +use crate::error::{self, Error}; use crate::server_info::version::SdkConstraints; // Constant to represent the end of the server info. @@ -34,9 +34,12 @@ pub(crate) struct ServerInfo { /// check_for_server_compatibility waits until the server info file is ready and check whether the /// server is compatible with Numaflow. -pub async fn check_for_server_compatibility(file_path: &str) -> error::Result<()> { +pub async fn check_for_server_compatibility( + file_path: &str, + cln_token: CancellationToken, +) -> error::Result<()> { // Read the server info file - let server_info = read_server_info(file_path).await?; + let server_info = read_server_info(file_path, cln_token).await?; // Log the server info info!("Server info file: {:?}", server_info); @@ -79,8 +82,7 @@ fn check_constraint(version: &Version, constraint: &str) -> error::Result<()> { // Parse the given constraint as a semantic version requirement let version_req = VersionReq::parse(constraint).map_err(|e| { Error::ServerInfoError(format!( - "Error parsing constraint: {},\ - constraint string: {}", + "Error parsing constraint: {}, constraint string: {}", e, constraint )) })?; @@ -109,14 +111,12 @@ fn check_numaflow_compatibility( // Create a version constraint based on the minimum numaflow version let numaflow_constraint = format!(">={}", min_numaflow_version); - Ok( - check_constraint(&numaflow_version_semver, &numaflow_constraint).map_err(|e| { - Error::ServerInfoError(format!( - "numaflow version {} must be upgraded to at least {}, in order to work with current SDK version {}", - numaflow_version_semver, min_numaflow_version, e - )) - })? - ) + check_constraint(&numaflow_version_semver, &numaflow_constraint).map_err(|e| { + Error::ServerInfoError(format!( + "numaflow version {} must be upgraded to at least {}, in order to work with current SDK version {}", + numaflow_version_semver, min_numaflow_version, e + )) + }) } /// Checks if the current SDK version is compatible with the given language's minimum supported SDK version. @@ -177,9 +177,18 @@ fn check_sdk_compatibility( } /// Reads the server info file and returns the parsed ServerInfo struct. -async fn read_server_info(file_path: &str) -> error::Result { +/// The cancellation token is used to stop ready-check of server_info file in case it is missing. +/// This cancellation token is closed via the global shutdown handler. +async fn read_server_info( + file_path: &str, + cln_token: CancellationToken, +) -> error::Result { // Infinite loop to keep checking until the file is ready loop { + if cln_token.is_cancelled() { + return Err(Error::ServerInfoError("Operation cancelled".to_string())); + } + // Check if the file exists and has content if let Ok(metadata) = fs::metadata(file_path) { if metadata.len() > 0 { @@ -235,7 +244,6 @@ async fn read_server_info(file_path: &str) -> error::Result { Ok(server_info) // Return the parsed server info } - /// create a mod for version.rs mod version { use std::collections::HashMap; @@ -358,7 +366,7 @@ mod tests { } // Create a new file - let mut file = File::create(svr_info_file_path); + let file = File::create(svr_info_file_path); // Extract the file from the Result let mut file = match file { @@ -560,6 +568,9 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let file_path = dir.path().join("server_info.txt"); + let cln_token = CancellationToken::new(); + let _drop_guard = cln_token.clone().drop_guard(); + // Server info to write let server_info = ServerInfo { protocol: TCP.parse().unwrap(), @@ -577,7 +588,7 @@ mod tests { let _ = write_server_info(&server_info, file_path.to_str().unwrap()).await; // Call the read_server_info function - let result = read_server_info(file_path.to_str().unwrap()).await; + let result = read_server_info(file_path.to_str().unwrap(), cln_token).await; assert!(result.is_ok(), "Expected Ok, got {:?}", result); let server_info = result.unwrap(); @@ -602,8 +613,11 @@ mod tests { let mut file = File::create(&file_path).unwrap(); writeln!(file, r#"{{"protocol":"tcp","language":"go","minimum_numaflow_version":"1.2.0-rc4","version":"1.0.0","metadata":{{"key1":"value1"}}}}"#).unwrap(); + let cln_token = CancellationToken::new(); + let _drop_guard = cln_token.clone().drop_guard(); + // Call the read_server_info function - let result = read_server_info(file_path.to_str().unwrap()).await; + let result = read_server_info(file_path.to_str().unwrap(), cln_token).await; assert!(result.is_err(), "Expected Err, got {:?}", result); let error = result.unwrap_err(); @@ -623,7 +637,7 @@ mod tests { "version": "v0.7.0-rc2", "metadata": null }) - .to_string(); + .to_string(); let _expected_server_info = ServerInfo { protocol: "uds".to_string(), diff --git a/serving/source-sink/src/sink.rs b/serving/source-sink/src/sink.rs index 6312287338..ab9a0f49c2 100644 --- a/serving/source-sink/src/sink.rs +++ b/serving/source-sink/src/sink.rs @@ -30,6 +30,7 @@ impl Default for SinkConfig { } } +#[derive(Clone)] /// SinkClient is a client to interact with the sink server. pub struct SinkClient { client: proto::sink_client::SinkClient, @@ -67,20 +68,19 @@ impl SinkClient { Ok(response) } - pub(crate) async fn is_ready(&mut self) -> Result { - let request = Request::new(()); - let response = self.client.is_ready(request).await?.into_inner(); - Ok(response) + pub(crate) async fn is_ready(&mut self) -> bool { + self.client.is_ready(Request::new(())).await.is_ok() } } #[cfg(test)] mod tests { - use crate::message::Offset; use chrono::offset::Utc; use numaflow::sink; use tracing::info; + use crate::message::Offset; + use super::*; struct Logger; @@ -159,8 +159,8 @@ mod tests { }, ]; - let ready_response = sink_client.is_ready().await.unwrap(); - assert_eq!(ready_response.ready, true); + let ready_response = sink_client.is_ready().await; + assert!(ready_response); let response = sink_client.sink_fn(messages).await.unwrap(); assert_eq!(response.results.len(), 2); diff --git a/serving/source-sink/src/source.rs b/serving/source-sink/src/source.rs index 3c164bb5e2..3e4ec30d40 100644 --- a/serving/source-sink/src/source.rs +++ b/serving/source-sink/src/source.rs @@ -115,10 +115,8 @@ impl SourceClient { Ok(response.result.map_or(vec![], |r| r.partitions)) } - pub(crate) async fn is_ready(&mut self) -> Result { - let request = Request::new(()); - let response = self.client.is_ready(request).await?.into_inner(); - Ok(response) + pub(crate) async fn is_ready(&mut self) -> bool { + self.client.is_ready(Request::new(())).await.is_ok() } } @@ -222,8 +220,8 @@ mod tests { .await .expect("failed to connect to source server"); - let response = source_client.is_ready().await.unwrap(); - assert!(response.ready); + let response = source_client.is_ready().await; + assert!(response); let messages = source_client.read_fn(5, 1000).await.unwrap(); assert_eq!(messages.len(), 5); diff --git a/serving/source-sink/src/transformer.rs b/serving/source-sink/src/transformer.rs index 2bbca45bce..5a3f70f73f 100644 --- a/serving/source-sink/src/transformer.rs +++ b/serving/source-sink/src/transformer.rs @@ -74,10 +74,8 @@ impl TransformerClient { Ok(messages) } - pub(crate) async fn is_ready(&mut self) -> Result { - let request = Request::new(()); - let response = self.client.is_ready(request).await?.into_inner(); - Ok(response) + pub(crate) async fn is_ready(&mut self) -> bool { + self.client.is_ready(Request::new(())).await.is_ok() } } @@ -144,8 +142,8 @@ mod tests { headers: Default::default(), }; - let resp = client.is_ready().await?; - assert_eq!(resp.ready, true); + let resp = client.is_ready().await; + assert_eq!(resp, true); let resp = client.transform_fn(message).await?; assert_eq!(resp.len(), 1);