Skip to content

Commit

Permalink
chore: improve shutdown and health checks for MonoVertex (#1919)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith committed Aug 10, 2024
1 parent 8b7a9a1 commit 0e10767
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 251 deletions.
1 change: 1 addition & 0 deletions serving/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions serving/source-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ uuid = { version = "1.10.0", features = ["v4"] }
once_cell = "1.19.0"
serde_json = "1.0.122"
numaflow-models = { path = "../numaflow-models"}
trait-variant = "0.1.2"
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
serde = { version = "1.0.204", features = ["derive"] }
Expand Down
14 changes: 9 additions & 5 deletions serving/source-sink/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::error::Error;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use numaflow_models::models::MonoVertex;
use std::env;
use std::sync::OnceLock;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use tracing::level_filters::LevelFilter;

use numaflow_models::models::MonoVertex;

use crate::error::Error;

const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE";
const ENV_POD_REPLICA: &str = "NUMAFLOW_REPLICA";
Expand Down Expand Up @@ -125,9 +128,10 @@ impl Settings {

#[cfg(test)]
mod tests {
use super::*;
use std::env;

use super::*;

#[test]
fn test_settings_load() {
// Set up environment variables
Expand Down
28 changes: 14 additions & 14 deletions serving/source-sink/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use chrono::Utc;
use metrics::counter;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{info, trace};

use crate::config::config;
use crate::error::{Error, Result};
use crate::metrics::{
Expand All @@ -7,11 +13,6 @@ use crate::metrics::{
use crate::sink::SinkClient;
use crate::source::SourceClient;
use crate::transformer::TransformerClient;
use chrono::Utc;
use metrics::counter;
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tracing::{info, trace};

const MONO_VERTEX_TYPE: &str = "mono_vertex";

Expand All @@ -22,7 +23,7 @@ pub(crate) struct Forwarder {
source_client: SourceClient,
sink_client: SinkClient,
transformer_client: Option<TransformerClient>,
shutdown_rx: oneshot::Receiver<()>,
cln_token: CancellationToken,
common_labels: Vec<(String, String)>,
}

Expand All @@ -32,7 +33,7 @@ impl Forwarder {
source_client: SourceClient,
sink_client: SinkClient,
transformer_client: Option<TransformerClient>,
shutdown_rx: oneshot::Receiver<()>,
cln_token: CancellationToken,
) -> Result<Self> {
let common_labels = vec![
(
Expand All @@ -48,8 +49,8 @@ impl Forwarder {
source_client,
sink_client,
transformer_client,
shutdown_rx,
common_labels,
cln_token,
})
}

Expand All @@ -64,7 +65,7 @@ impl Forwarder {
let start_time = tokio::time::Instant::now();
// two arms, either shutdown or forward-a-chunk
tokio::select! {
_ = &mut self.shutdown_rx => {
_ = self.cln_token.cancelled() => {
info!("Shutdown signal received, stopping forwarder...");
break;
}
Expand Down Expand Up @@ -145,6 +146,7 @@ mod tests {
use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source, sourcetransform};
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;

use crate::forwarder::Forwarder;
use crate::sink::{SinkClient, SinkConfig};
Expand Down Expand Up @@ -347,7 +349,7 @@ mod tests {
// Wait for the servers to start
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

let (forwarder_shutdown_tx, forwarder_shutdown_rx) = tokio::sync::oneshot::channel();
let cln_token = CancellationToken::new();

let source_client = SourceClient::connect(source_config)
.await
Expand All @@ -365,7 +367,7 @@ mod tests {
source_client,
sink_client,
Some(transformer_client),
forwarder_shutdown_rx,
cln_token.clone(),
)
.await
.expect("failed to create forwarder");
Expand All @@ -383,9 +385,7 @@ mod tests {
);

// stop the forwarder
forwarder_shutdown_tx
.send(())
.expect("failed to send shutdown signal");
cln_token.cancel();
forwarder_handle
.await
.expect("failed to join forwarder task");
Expand Down
128 changes: 43 additions & 85 deletions serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::net::SocketAddr;
use std::time::Duration;

use tokio::signal;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{error, info, warn};

use crate::config::config;
pub(crate) use crate::error::Error;
use crate::forwarder::Forwarder;
use crate::metrics::{start_metrics_https_server, MetricsState};
use crate::sink::{SinkClient, SinkConfig};
use crate::source::{SourceClient, SourceConfig};
use crate::transformer::{TransformerClient, TransformerConfig};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

pub(crate) use self::error::Result;

Expand Down Expand Up @@ -40,25 +40,21 @@ pub(crate) mod shared;

/// forwards a chunk of data from the source to the sink via an optional transformer.
/// It takes an optional custom_shutdown_rx for shutting down the forwarder, useful for testing.
pub async fn run_forwarder(
pub async fn init(
source_config: SourceConfig,
sink_config: SinkConfig,
transformer_config: Option<TransformerConfig>,
custom_shutdown_rx: Option<oneshot::Receiver<()>>,
cln_token: CancellationToken,
) -> Result<()> {
server_info::check_for_server_compatibility(&source_config.server_info_file)
server_info::check_for_server_compatibility(&source_config.server_info_file, cln_token.clone())
.await
.map_err(|e| {
warn!("Error waiting for source server info file: {:?}", e);
Error::ForwarderError("Error waiting for server info file".to_string())
})?;
let mut source_client = SourceClient::connect(source_config).await?;

// start the lag reader to publish lag metrics
let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None);
lag_reader.start().await;

server_info::check_for_server_compatibility(&sink_config.server_info_file)
server_info::check_for_server_compatibility(&sink_config.server_info_file, cln_token.clone())
.await
.map_err(|e| {
warn!("Error waiting for sink server info file: {:?}", e);
Expand All @@ -68,7 +64,7 @@ pub async fn run_forwarder(
let mut sink_client = SinkClient::connect(sink_config).await?;

let mut transformer_client = if let Some(config) = transformer_config {
server_info::check_for_server_compatibility(&config.server_info_file)
server_info::check_for_server_compatibility(&config.server_info_file, cln_token.clone())
.await
.map_err(|e| {
warn!("Error waiting for transformer server info file: {:?}", e);
Expand All @@ -79,8 +75,6 @@ pub async fn run_forwarder(
None
};

let (shutdown_tx, shutdown_rx) = oneshot::channel();

// readiness check for all the ud containers
wait_until_ready(
&mut source_client,
Expand All @@ -89,38 +83,35 @@ pub async fn run_forwarder(
)
.await?;

// TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc?
let mut forwarder =
Forwarder::new(source_client, sink_client, transformer_client, shutdown_rx).await?;

let forwarder_handle: JoinHandle<Result<()>> = tokio::spawn(async move {
forwarder.run().await?;
Ok(())
// Start the metrics server, which server the prometheus metrics.
let metrics_addr: SocketAddr = format!("0.0.0.0:{}", &config().metrics_server_listen_port)
.parse()
.expect("Invalid address");

// Start the metrics server in a separate background async spawn,
// This should be running throughout the lifetime of the application, hence the handle is not
// joined.
let metrics_state = MetricsState {
source_client: source_client.clone(),
sink_client: sink_client.clone(),
transformer_client: transformer_client.clone(),
};
tokio::spawn(async move {
if let Err(e) = start_metrics_https_server(metrics_addr, metrics_state).await {
error!("Metrics server error: {:?}", e);
}
});

let shutdown_handle: JoinHandle<Result<()>> = tokio::spawn(async move {
shutdown_signal(custom_shutdown_rx).await;
shutdown_tx
.send(())
.map_err(|_| Error::ForwarderError("Failed to send shutdown signal".to_string()))?;
Ok(())
});
// start the lag reader to publish lag metrics
let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None);
lag_reader.start().await;

forwarder_handle
.await
.unwrap_or_else(|e| {
error!("Forwarder task panicked: {:?}", e);
Err(Error::ForwarderError("Forwarder task panicked".to_string()))
})
.unwrap_or_else(|e| {
error!("Forwarder failed: {:?}", e);
});
// TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc?
let mut forwarder =
Forwarder::new(source_client, sink_client, transformer_client, cln_token).await?;

if !shutdown_handle.is_finished() {
shutdown_handle.abort();
}
forwarder.run().await?;

lag_reader.shutdown().await;
info!("Forwarder stopped gracefully");
Ok(())
}
Expand All @@ -131,18 +122,18 @@ async fn wait_until_ready(
transformer_client: &mut Option<TransformerClient>,
) -> Result<()> {
loop {
let source_ready = source_client.is_ready().await.is_ok();
let source_ready = source_client.is_ready().await;
if !source_ready {
info!("UDSource is not ready, waiting...");
}

let sink_ready = sink_client.is_ready().await.is_ok();
let sink_ready = sink_client.is_ready().await;
if !sink_ready {
info!("UDSink is not ready, waiting...");
}

let transformer_ready = if let Some(client) = transformer_client {
let ready = client.is_ready().await.is_ok();
let ready = client.is_ready().await;
if !ready {
info!("UDTransformer is not ready, waiting...");
}
Expand All @@ -161,47 +152,14 @@ async fn wait_until_ready(
Ok(())
}

async fn shutdown_signal(shutdown_rx: Option<oneshot::Receiver<()>>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
info!("Received Ctrl+C signal");
};

let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
info!("Received terminate signal");
};

let custom_shutdown = async {
if let Some(rx) = shutdown_rx {
rx.await.ok();
} else {
// Create a watch channel that never sends
let (_tx, mut rx) = tokio::sync::watch::channel(());
rx.changed().await.ok();
}
info!("Received custom shutdown signal");
};

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
_ = custom_shutdown => {},
}
}

#[cfg(test)]
mod tests {
use std::env;

use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source};
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;

use crate::sink::SinkConfig;
use crate::source::SourceConfig;
Expand Down Expand Up @@ -284,11 +242,11 @@ mod tests {
env::set_var("SOURCE_SOCKET", src_sock_file.to_str().unwrap());
env::set_var("SINK_SOCKET", sink_sock_file.to_str().unwrap());

let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let cln_token = CancellationToken::new();

let forwarder_cln_token = cln_token.clone();
let forwarder_handle = tokio::spawn(async move {
let result =
super::run_forwarder(source_config, sink_config, None, Some(shutdown_rx)).await;
let result = super::init(source_config, sink_config, None, forwarder_cln_token).await;
assert!(result.is_ok());
});

Expand All @@ -297,7 +255,7 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// stop the forwarder
shutdown_tx.send(()).unwrap();
cln_token.cancel();
forwarder_handle.await.unwrap();

// stop the source and sink servers
Expand Down
Loading

0 comments on commit 0e10767

Please sign in to comment.