diff --git a/Cargo.lock b/Cargo.lock index 02c0a5771..1efabf6d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6149,6 +6149,7 @@ dependencies = [ name = "lightning-node" version = "0.1.0" dependencies = [ + "anyhow", "lightning-interfaces", "tokio", "tracing", diff --git a/core/node/Cargo.toml b/core/node/Cargo.toml index 50d3b35ae..bd0640f8d 100644 --- a/core/node/Cargo.toml +++ b/core/node/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" lightning-interfaces = { path = "../interfaces" } tokio.workspace = true tracing.workspace = true +anyhow.workspace = true workspace-hack = { version = "0.1", path = "../../etc/workspace-hack" } diff --git a/core/node/src/lib.rs b/core/node/src/lib.rs index b5a6fae5c..d89d53e32 100644 --- a/core/node/src/lib.rs +++ b/core/node/src/lib.rs @@ -1,61 +1,139 @@ use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread::JoinHandle; +use std::time::Duration; +use anyhow::Result; use lightning_interfaces::prelude::*; +use lightning_interfaces::ShutdownController; +use tokio::runtime::{Runtime, UnhandledPanic}; +use tokio::task::JoinHandle; +use tokio::time::timeout; /// A single [Node] instance that has ownership over its tokio runtime. pub struct ContainedNode { /// The dependency injection data provider which can contain most of the items that make up /// a node. - provider: fdi::Provider, + provider: fdi::MultiThreadedProvider, - /// A handle to the work thread. - handle: JoinHandle<()>, + /// The shutdown controller that has its waiter in the provider. + shutdown: ShutdownController, + + /// A handle to the tokio runtime. + runtime: Runtime, collection: PhantomData, } impl ContainedNode { - pub fn new(provider: fdi::Provider, index: usize) -> Self { + pub fn new(provider: fdi::MultiThreadedProvider, name: Option) -> Self { let worker_id = AtomicUsize::new(0); let runtime = tokio::runtime::Builder::new_multi_thread() .thread_name_fn(move || { let id = worker_id.fetch_add(1, Ordering::SeqCst); - format!("NODE-{index}#{id}") + format!("{}#{id}", name.as_deref().unwrap_or("LIGHTNING")) }) .enable_all() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) .build() .expect("Failed to build tokio runtime for node container."); - let handle = std::thread::Builder::new() - .name(format!("NODE-{index}#MAIN")) - .spawn(move || { - runtime.block_on(async move { - // - }); - }) - .expect("Failed to spawn E2E thread"); + // Create and insert the shutdown controller to the provider. + let trace_shutdown = std::env::var("TRACE_SHUTDOWN").is_ok(); + let shutdown = ShutdownController::new(trace_shutdown); + let waiter = shutdown.waiter(); + provider.insert(waiter); + + // Will make the shutdown controller listen for ctrl+c. + shutdown.install_ctrlc_handlers(); Self { provider, - handle, + runtime, + shutdown, collection: PhantomData, } } - /// Returns a reference to the data provider. - pub fn provider(&self) -> &fdi::Provider { - &self.provider + /// Start the node and return a handle to the task that started the node. The task returns as + /// soon as the node + pub fn spawn(&self) -> JoinHandle> { + let provider = self.provider.clone(); + + self.runtime.spawn_blocking(move || { + let graph = C::build_graph(); + let mut provider = provider.get_local_provider(); + + // Set tokio as the spawner of fdi async works. + provider.get_mut::().set_spawn_cb(|fut| { + tokio::spawn(fut); + }); + + // Init all of the components and dependencies. + graph.init_all(&mut provider)?; + + // Send the start signal to the node. + provider.trigger("start"); + + Ok(()) + }) } - pub fn shutdown(self) { - self.handle.join().unwrap() + pub async fn shutdown(mut self) { + // Tell the controller it's time to go down. + self.shutdown.trigger_shutdown(); + + // Give the runtime 30 seconds to stop. + self.runtime.shutdown_timeout(Duration::from_secs(30)); + + for i in 0.. { + if timeout(Duration::from_secs(3), self.shutdown.wait_for_completion()) + .await + .is_ok() + { + // shutdown completed. + return; + } + + match i { + 0 | 1 => { + // 3s, 6s + tracing::trace!("Still shutting down..."); + continue; + }, + 2 => { + // 9s + tracing::warn!("Still shutting down..."); + continue; + }, + _ => { + // 12s + tracing::error!("Shutdown taking too long..") + }, + } + + if i == 10 { + // 33s + break; + } + + let Some(iter) = self.shutdown.pending_backtraces() else { + continue; + }; + + for (i, trace) in iter.enumerate() { + eprintln!("Pending task backtrace #{i}:\n{trace:#?}"); + } + } + } + + /// Returns a reference to the data provider. + pub fn provider(&self) -> &fdi::MultiThreadedProvider { + &self.provider } } impl Default for ContainedNode { fn default() -> Self { - Self::new(fdi::Provider::default(), 0) + Self::new(fdi::MultiThreadedProvider::default(), None) } }