Skip to content

Commit

Permalink
wip: add node/ContainedNode
Browse files Browse the repository at this point in the history
  • Loading branch information
qti3e committed May 8, 2024
1 parent d7e41be commit 6b9c921
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6149,6 +6149,7 @@ dependencies = [
name = "lightning-node"
version = "0.1.0"
dependencies = [
"anyhow",
"lightning-interfaces",
"tokio",
"tracing",
Expand Down
1 change: 1 addition & 0 deletions core/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ edition = "2021"
lightning-interfaces = { path = "../interfaces" }
tokio.workspace = true
tracing.workspace = true
anyhow.workspace = true
workspace-hack = { version = "0.1", path = "../../etc/workspace-hack" }
120 changes: 99 additions & 21 deletions core/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,139 @@
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::JoinHandle;
use std::time::Duration;

use anyhow::Result;
use lightning_interfaces::prelude::*;
use lightning_interfaces::ShutdownController;
use tokio::runtime::{Runtime, UnhandledPanic};
use tokio::task::JoinHandle;
use tokio::time::timeout;

/// A single [Node] instance that has ownership over its tokio runtime.
pub struct ContainedNode<C: Collection> {
/// The dependency injection data provider which can contain most of the items that make up
/// a node.
provider: fdi::Provider,
provider: fdi::MultiThreadedProvider,

/// A handle to the work thread.
handle: JoinHandle<()>,
/// The shutdown controller that has its waiter in the provider.
shutdown: ShutdownController,

/// A handle to the tokio runtime.
runtime: Runtime,

collection: PhantomData<C>,
}

impl<C: Collection> ContainedNode<C> {
pub fn new(provider: fdi::Provider, index: usize) -> Self {
pub fn new(provider: fdi::MultiThreadedProvider, name: Option<String>) -> Self {
let worker_id = AtomicUsize::new(0);
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(move || {
let id = worker_id.fetch_add(1, Ordering::SeqCst);
format!("NODE-{index}#{id}")
format!("{}#{id}", name.as_deref().unwrap_or("LIGHTNING"))
})
.enable_all()
.unhandled_panic(UnhandledPanic::ShutdownRuntime)
.build()
.expect("Failed to build tokio runtime for node container.");

let handle = std::thread::Builder::new()
.name(format!("NODE-{index}#MAIN"))
.spawn(move || {
runtime.block_on(async move {
//
});
})
.expect("Failed to spawn E2E thread");
// Create and insert the shutdown controller to the provider.
let trace_shutdown = std::env::var("TRACE_SHUTDOWN").is_ok();
let shutdown = ShutdownController::new(trace_shutdown);
let waiter = shutdown.waiter();
provider.insert(waiter);

// Will make the shutdown controller listen for ctrl+c.
shutdown.install_ctrlc_handlers();

Self {
provider,
handle,
runtime,
shutdown,
collection: PhantomData,
}
}

/// Returns a reference to the data provider.
pub fn provider(&self) -> &fdi::Provider {
&self.provider
/// Start the node and return a handle to the task that started the node. The task returns as
/// soon as the node
pub fn spawn(&self) -> JoinHandle<Result<()>> {
let provider = self.provider.clone();

self.runtime.spawn_blocking(move || {
let graph = C::build_graph();
let mut provider = provider.get_local_provider();

// Set tokio as the spawner of fdi async works.
provider.get_mut::<fdi::Executor>().set_spawn_cb(|fut| {
tokio::spawn(fut);
});

// Init all of the components and dependencies.
graph.init_all(&mut provider)?;

// Send the start signal to the node.
provider.trigger("start");

Ok(())
})
}

pub fn shutdown(self) {
self.handle.join().unwrap()
pub async fn shutdown(mut self) {
// Tell the controller it's time to go down.
self.shutdown.trigger_shutdown();

// Give the runtime 30 seconds to stop.
self.runtime.shutdown_timeout(Duration::from_secs(30));

for i in 0.. {
if timeout(Duration::from_secs(3), self.shutdown.wait_for_completion())
.await
.is_ok()
{
// shutdown completed.
return;
}

match i {
0 | 1 => {
// 3s, 6s
tracing::trace!("Still shutting down...");
continue;
},
2 => {
// 9s
tracing::warn!("Still shutting down...");
continue;
},
_ => {
// 12s
tracing::error!("Shutdown taking too long..")
},
}

if i == 10 {
// 33s
break;
}

let Some(iter) = self.shutdown.pending_backtraces() else {
continue;
};

for (i, trace) in iter.enumerate() {
eprintln!("Pending task backtrace #{i}:\n{trace:#?}");
}
}
}

/// Returns a reference to the data provider.
pub fn provider(&self) -> &fdi::MultiThreadedProvider {
&self.provider
}
}

impl<C: Collection> Default for ContainedNode<C> {
fn default() -> Self {
Self::new(fdi::Provider::default(), 0)
Self::new(fdi::MultiThreadedProvider::default(), None)
}
}

0 comments on commit 6b9c921

Please sign in to comment.