Skip to content

Commit

Permalink
Allow naming tasks to differentiate actors in tokio-console. (#4451)
Browse files Browse the repository at this point in the history
* Allow naming tasks to differentiate actors in tokio-console.

By default, tokio-console labels its task using its spawning point.
Since all actors are spawned from the same place they end up with the
same label.

This introduce an API to optionally name tasks.
For actors, the actor's struct Name is used automatically.

If tokio-console is not enabled, this is no cost.

* Allow naming tasks to differentiate actors in tokio-console.

By default, tokio-console labels its task using its spawning point.
Since all actors are spawned from the same place they end up with the
same label.

This introduce an API to optionally name tasks.
For actors, the actor's struct Name is used automatically.

If tokio-console is not enabled, this is no cost.
  • Loading branch information
fulmicoton authored Jan 25, 2024
1 parent a2e3b92 commit 9eef3bb
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 88 deletions.
118 changes: 53 additions & 65 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions quickwit/quickwit-actors/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

use quickwit_common::spawn_named_task;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -203,16 +204,19 @@ pub fn start_scheduler() -> SchedulerClient {
}),
};
let mut scheduler = Scheduler::new(&scheduler_client);
tokio::spawn(async move {
while let Ok(scheduler_message) = rx.recv_async().await {
match scheduler_message {
SchedulerMessage::ProcessTime => scheduler.process_time(),
SchedulerMessage::Schedule { callback, timeout } => {
scheduler.process_schedule(callback, timeout);
spawn_named_task(
async move {
while let Ok(scheduler_message) = rx.recv_async().await {
match scheduler_message {
SchedulerMessage::ProcessTime => scheduler.process_time(),
SchedulerMessage::Schedule { callback, timeout } => {
scheduler.process_schedule(callback, timeout);
}
}
}
}
});
},
"scheduler",
);
scheduler_client
}

Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ impl<A: Actor> SpawnBuilder<A> {
let ctx_clone = ctx.clone();
let loop_async_actor_future =
async move { actor_loop(actor, inbox, no_advance_time_guard, ctx).await };
let join_handle = ActorJoinHandle::new(runtime_handle.spawn(loop_async_actor_future));
let join_handle = ActorJoinHandle::new(quickwit_common::spawn_named_task_on(
loop_async_actor_future,
std::any::type_name::<A>(),
&runtime_handle,
));
ctx_clone.registry().register(&mailbox, join_handle.clone());
let actor_handle = ActorHandle::new(state_rx, join_handle, ctx_clone);
(mailbox, actor_handle)
Expand Down
Loading

0 comments on commit 9eef3bb

Please sign in to comment.