From eca9cddb4b0f97d1e6fd5b4fbb161f6ad30a1ffc Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Thu, 11 Jul 2024 17:15:04 -0400 Subject: [PATCH] Observer errors are now fatal (#913) * Print errors from observer rather than silently swallowing them * Observer errors now cause lading to exit --- lading/src/bin/lading.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lading/src/bin/lading.rs b/lading/src/bin/lading.rs index 5856ed18e..147eb08e8 100644 --- a/lading/src/bin/lading.rs +++ b/lading/src/bin/lading.rs @@ -459,6 +459,7 @@ async fn inner_main( } let mut tsrv_joinset = tokio::task::JoinSet::new(); + let mut osrv_joinset = tokio::task::JoinSet::new(); // // OBSERVER // @@ -466,7 +467,7 @@ async fn inner_main( if let Some(target) = config.target { let obs_rcv = tgt_snd.subscribe(); let observer_server = observer::Server::new(config.observer, shutdown.clone())?; - let _osrv = tokio::spawn(observer_server.run(obs_rcv)); + osrv_joinset.spawn(observer_server.run(obs_rcv)); // // TARGET @@ -504,6 +505,19 @@ async fn inner_main( info!("shutdown signal received."); break Ok(()); } + Some(res) = osrv_joinset.join_next() => { + match res { + Ok(observer_result) => match observer_result { + Ok(()) => { /* Observer shut down successfully */ } + Err(err) => { + error!("Observer shut down unexpectedly: {err}"); + shutdown.signal(); + break Err(Error::LadingObserver(err)); + } + } + Err(err) => error!("Could not join the spawned observer task: {}", err), + } + }, Some(res) = gsrv_joinset.join_next() => { match res { Ok(generator_result) => match generator_result {