Skip to content

Commit

Permalink
fix(node): allow dropping a nested runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
qti3e committed May 9, 2024
1 parent a185f47 commit 06d3184
Showing 1 changed file with 76 additions and 39 deletions.
115 changes: 76 additions & 39 deletions core/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::future::Future;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
Expand All @@ -11,6 +12,9 @@ use tokio::time::timeout;

/// A single [Node] instance that has ownership over its tokio runtime.
pub struct ContainedNode<C: Collection> {
/// The name of this contained node.
name: String,

/// The dependency injection data provider which can contain most of the items that make up
/// a node.
provider: fdi::MultiThreadedProvider,
Expand All @@ -26,6 +30,8 @@ pub struct ContainedNode<C: Collection> {

impl<C: Collection> ContainedNode<C> {
pub fn new(provider: fdi::MultiThreadedProvider, name: Option<String>) -> Self {
let name = name.unwrap_or_else(|| "LIGHTNING".into());

// Create and insert the shutdown controller to the provider.
let trace_shutdown = std::env::var("TRACE_SHUTDOWN").is_ok();
let shutdown = ShutdownController::new(trace_shutdown);
Expand All @@ -40,10 +46,11 @@ impl<C: Collection> ContainedNode<C> {

// Create the tokio runtime.
let worker_id = AtomicUsize::new(0);
let node_name = name.clone();
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(move || {
let id = worker_id.fetch_add(1, Ordering::SeqCst);
format!("{}#{id}", name.as_deref().unwrap_or("LIGHTNING"))
format!("{node_name}#{id}")
})
.on_thread_start(move || {
let permit = permit.clone();
Expand All @@ -59,6 +66,7 @@ impl<C: Collection> ContainedNode<C> {
.expect("Failed to build tokio runtime for node container.");

Self {
name,
provider,
runtime,
shutdown,
Expand Down Expand Up @@ -90,50 +98,79 @@ impl<C: Collection> ContainedNode<C> {
})
}

pub async fn shutdown(mut self) {
/// Shut down the node and return a future that will be resolved when the node is fully down.
///
/// Unlike other async method this function can trigger the shutdown without it being polled.
/// In other words you can still trigger the shutdown event by calling this method and never
/// awaiting the returned future.
pub fn shutdown(mut self) -> impl Future<Output = ()> {
// Tell the controller it's time to go down.
self.shutdown.trigger_shutdown();

// Give the runtime 30 seconds to stop.
self.runtime.shutdown_timeout(Duration::from_secs(30));

for i in 0.. {
if timeout(Duration::from_secs(3), self.shutdown.wait_for_completion())
.await
.is_ok()
{
// shutdown completed.
return;
}
let task_name = format!("{}::RuntimeDrop", self.name);

match i {
0 | 1 => {
// 3s, 6s
tracing::trace!("Still shutting down...");
continue;
},
2 => {
// 9s
tracing::warn!("Still shutting down...");
// Give the runtime 30 seconds to stop.
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
tokio::task::Builder::new()
.name(&task_name)
.spawn_blocking_on(
|| {
self.runtime.shutdown_timeout(Duration::from_secs(30));
},
&handle,
)
.unwrap();
},
Err(_) => {
std::thread::Builder::new()
.name(task_name)
.spawn(|| {
self.runtime.shutdown_timeout(Duration::from_secs(30));
})
.unwrap();
},
};

async move {
for i in 0.. {
if timeout(Duration::from_secs(3), self.shutdown.wait_for_completion())
.await
.is_ok()
{
// shutdown completed.
return;
}

match i {
0 | 1 => {
// 3s, 6s
tracing::trace!("Still shutting down...");
continue;
},
2 => {
// 9s
tracing::warn!("Still shutting down...");
continue;
},
_ => {
// 12s
tracing::error!("Shutdown taking too long..")
},
}

if i == 10 {
// 33s
break;
}

let Some(iter) = self.shutdown.pending_backtraces() else {
continue;
},
_ => {
// 12s
tracing::error!("Shutdown taking too long..")
},
}

if i == 10 {
// 33s
break;
}

let Some(iter) = self.shutdown.pending_backtraces() else {
continue;
};
};

for (i, trace) in iter.enumerate() {
eprintln!("Pending task backtrace #{i}:\n{trace:#?}");
for (i, trace) in iter.enumerate() {
eprintln!("Pending task backtrace #{i}:\n{trace:#?}");
}
}
}
}
Expand Down

0 comments on commit 06d3184

Please sign in to comment.