From e6f7c67780569fb226f703005dd3b513c19aefb5 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 24 Jan 2024 14:32:15 +0900 Subject: [PATCH] blop --- quickwit/Cargo.lock | 118 ++++++++---------- quickwit/quickwit-actors/src/actor_context.rs | 4 +- quickwit/quickwit-actors/src/scheduler.rs | 19 +-- quickwit/quickwit-actors/src/spawn_builder.rs | 38 ++++-- quickwit/quickwit-cli/Cargo.toml | 2 +- quickwit/quickwit-common/Cargo.toml | 3 + quickwit/quickwit-common/src/lib.rs | 51 ++++++++ .../quickwit-indexing/src/actors/indexer.rs | 3 +- .../quickwit-indexing/src/actors/uploader.rs | 3 +- .../quickwit-ingest/src/ingest_v2/fetch.rs | 6 +- quickwit/quickwit-serve/src/lib.rs | 21 ++-- 11 files changed, 167 insertions(+), 101 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index fec22dab348..de8c0cd10c0 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -141,9 +141,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.8" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628a8f9bd1e24b4e0db2b4bc2d000b001e7dd032d54afa60a68836aeec5aa54a" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" dependencies = [ "anstyle", "anstyle-parse", @@ -258,19 +258,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-compat" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b" -dependencies = [ - "futures-core", - "futures-io", - "once_cell", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-compression" version = "0.4.6" @@ -760,9 +747,9 @@ dependencies = [ [[package]] name = "aws_lambda_events" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c25e7620d59c7a9ed653439ec402218e3f6be118000f92802c5bbfc6da98e65b" +checksum = "d5af275f3e5801892c4295a919c5edbe8d24134f91458269a77a2da12622c123" dependencies = [ "base64 0.21.7", "bytes", @@ -775,7 +762,7 @@ dependencies = [ "serde", "serde_dynamo", "serde_json", - "serde_with 3.5.0", + "serde_with 3.5.1", ] [[package]] @@ -1272,9 +1259,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1282,7 +1269,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -1309,9 +1296,9 @@ dependencies = [ [[package]] name = "ciborium" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" dependencies = [ "ciborium-io", "ciborium-ll", @@ -1320,15 +1307,15 @@ dependencies = [ [[package]] name = "ciborium-io" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" [[package]] name = "ciborium-ll" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" dependencies = [ "ciborium-io", "half", @@ -2727,9 +2714,13 @@ dependencies = [ [[package]] name = "half" -version = "1.8.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" +dependencies = [ + "cfg-if", + "crunchy", +] [[package]] name = "hashbrown" @@ -3396,9 +3387,9 @@ checksum = "3f35c735096c0293d313e8f2a641627472b83d01b937177fe76e5e2708d31e0d" [[package]] name = "lambda_http" -version = "0.9.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad7732ab05100525c1716db7b48ccf2165da4dcd72340f679fea593766f4f042" +checksum = "dacbb68e7ae5264884f114f79d4e0de33c66cb5a8f363ac93de7112f624d233d" dependencies = [ "aws_lambda_events", "base64 0.21.7", @@ -4330,12 +4321,11 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "opendal" -version = "0.44.1" +version = "0.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc0ad72f7b44ca4ae59d27ea151fdc6f37305cf6efe099bdaedbb30ec34579c0" +checksum = "4af824652d4d2ffabf606d337a071677ae621b05622adf35df9562f69d9b4498" dependencies = [ "anyhow", - "async-compat", "async-trait", "backon", "base64 0.21.7", @@ -4348,9 +4338,7 @@ dependencies = [ "log", "md-5", "once_cell", - "parking_lot", "percent-encoding", - "pin-project", "quick-xml 0.30.0", "reqsign", "reqwest", @@ -4389,9 +4377,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.62" +version = "0.10.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cde4d2d9200ad5909f8dac647e29482e07c3a35de8a13fce7c9c7747ad9f671" +checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" dependencies = [ "bitflags 2.4.2", "cfg-if", @@ -4430,9 +4418,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.98" +version = "0.9.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7" +checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" dependencies = [ "cc", "libc", @@ -4584,9 +4572,9 @@ dependencies = [ [[package]] name = "ouroboros" -version = "0.18.2" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50b637ffd883b2733a8483599fb6136b9dcedaa1850f7ac08b9b6f9f2061208" +checksum = "97b7be5a8a3462b752f4be3ff2b2bf2f7f1d00834902e46be2a4d68b87b0573c" dependencies = [ "aliasable", "ouroboros_macro", @@ -4595,9 +4583,9 @@ dependencies = [ [[package]] name = "ouroboros_macro" -version = "0.18.2" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3633d65683f13b9bcfaa3150880b018899fb0e5d0542f4adaea4f503fdb5eabf" +checksum = "b645dcde5f119c2c454a92d0dfa271a2a3b205da92e4292a68ead4bdbfde1f33" dependencies = [ "heck", "itertools 0.12.0", @@ -5192,9 +5180,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -5647,7 +5635,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.5.1", "serde_yaml 0.9.30", "tokio", "toml", @@ -6093,7 +6081,7 @@ dependencies = [ "sea-query-binder", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.5.1", "sqlx", "tempfile", "thiserror", @@ -6186,7 +6174,7 @@ dependencies = [ "quickwit-datetime", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.5.1", "tantivy", "thiserror", "time", @@ -6254,7 +6242,7 @@ dependencies = [ "rayon", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.5.1", "tantivy", "tempfile", "thiserror", @@ -6321,7 +6309,7 @@ dependencies = [ "serde", "serde_json", "serde_qs 0.12.0", - "serde_with 3.5.0", + "serde_with 3.5.1", "tempfile", "termcolor", "thiserror", @@ -6590,13 +6578,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.4", "regex-syntax 0.8.2", ] @@ -6611,9 +6599,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "3b7fa1134405e2ec9353fd416b17f8dacd46c473d7d3fd1cf202706a14eb792a" dependencies = [ "aho-corasick", "memchr", @@ -7183,9 +7171,9 @@ dependencies = [ [[package]] name = "serde_dynamo" -version = "4.2.6" +version = "4.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64307a9d3b5af5237b1a95c0b63fbeb45134d7d7c372c284847fa37a6ddee44" +checksum = "8b652e4dd5549c24a4ec779981278cccae2f85b4d5649441c745d58866e20283" dependencies = [ "base64 0.21.7", "serde", @@ -7279,9 +7267,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f58c3a1b3e418f61c25b2aeb43fc6c95eaa252b8cecdda67f401943e9e08d33f" +checksum = "f5c9fdb6b00a489875b22efd4b78fe2b363b72265cc5f6eb2e2b9ee270e6140c" dependencies = [ "base64 0.21.7", "chrono", @@ -7290,7 +7278,7 @@ dependencies = [ "indexmap 2.1.0", "serde", "serde_json", - "serde_with_macros 3.5.0", + "serde_with_macros 3.5.1", "time", ] @@ -7308,9 +7296,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2068b437a31fc68f25dd7edc296b078f04b45145c199d8eed9866e45f1ff274" +checksum = "dbff351eb4b33600a2e138dfa0b10b65a238ea8ff8fb2387c422c5022a3e8298" dependencies = [ "darling 0.20.3", "proc-macro2", @@ -7474,9 +7462,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.12.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "snafu" diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index aff023ad602..0fc36f8d823 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -187,7 +187,7 @@ impl ActorContext { self.progress.record_progress(); } - pub(crate) fn state(&self) -> ActorState { + pub fn state(&self) -> ActorState { self.actor_state.get_state() } @@ -199,7 +199,7 @@ impl ActorContext { self.actor_state.idle(); } - pub(crate) fn pause(&self) { + pub fn pause(&self) { self.actor_state.pause(); } diff --git a/quickwit/quickwit-actors/src/scheduler.rs b/quickwit/quickwit-actors/src/scheduler.rs index c5ca99b38b6..dcf9b1ddda7 100644 --- a/quickwit/quickwit-actors/src/scheduler.rs +++ b/quickwit/quickwit-actors/src/scheduler.rs @@ -203,16 +203,19 @@ pub fn start_scheduler() -> SchedulerClient { }), }; let mut scheduler = Scheduler::new(&scheduler_client); - tokio::spawn(async move { - while let Ok(scheduler_message) = rx.recv_async().await { - match scheduler_message { - SchedulerMessage::ProcessTime => scheduler.process_time(), - SchedulerMessage::Schedule { callback, timeout } => { - scheduler.process_schedule(callback, timeout); + quickwit_common::spawn_named_task( + async move { + while let Ok(scheduler_message) = rx.recv_async().await { + match scheduler_message { + SchedulerMessage::ProcessTime => scheduler.process_time(), + SchedulerMessage::Schedule { callback, timeout } => { + scheduler.process_schedule(callback, timeout); + } } } - } - }); + }, + "Scheduler", + ); scheduler_client } diff --git a/quickwit/quickwit-actors/src/spawn_builder.rs b/quickwit/quickwit-actors/src/spawn_builder.rs index f7641fb9413..937d3d1aa28 100644 --- a/quickwit/quickwit-actors/src/spawn_builder.rs +++ b/quickwit/quickwit-actors/src/spawn_builder.rs @@ -31,7 +31,8 @@ use crate::registry::{ActorJoinHandle, ActorRegistry}; use crate::scheduler::{NoAdvanceTimeGuard, SchedulerClient}; use crate::supervisor::Supervisor; use crate::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, KillSwitch, Mailbox, QueueCapacity, + Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, KillSwitch, Mailbox, + QueueCapacity, }; #[derive(Clone)] @@ -181,7 +182,11 @@ impl SpawnBuilder { let ctx_clone = ctx.clone(); let loop_async_actor_future = async move { actor_loop(actor, inbox, no_advance_time_guard, ctx).await }; - let join_handle = ActorJoinHandle::new(runtime_handle.spawn(loop_async_actor_future)); + let join_handle = ActorJoinHandle::new(quickwit_common::spawn_named_task_on( + loop_async_actor_future, + std::any::type_name::(), + &runtime_handle, + )); ctx_clone.registry().register(&mailbox, join_handle.clone()); let actor_handle = ActorHandle::new(state_rx, join_handle, ctx_clone); (mailbox, actor_handle) @@ -294,19 +299,28 @@ impl ActorExecutionEnv { async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> { self.yield_and_check_if_killed().await?; let envelope = recv_envelope(&mut self.inbox, &self.ctx).await; - self.ctx.process(); + if self.ctx.state() == ActorState::Idle { + self.ctx.process(); + } self.process_one_message(envelope).await?; - loop { - while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) { - self.process_one_message(envelope).await?; - } - self.ctx.yield_now().await; - if self.inbox.is_empty() { - break; + // If the actor is not Paused, we consume all of the message in the mailbox. + if self.ctx.state() == ActorState::Processing { + loop { + while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) { + self.process_one_message(envelope).await?; + } + self.ctx.yield_now().await; + if self.inbox.is_empty() { + break; + } } } - self.actor.get_mut().on_drained_messages(&self.ctx).await?; - self.ctx.idle(); + if self.ctx.state().is_running() { + self.actor.get_mut().on_drained_messages(&self.ctx).await?; + } + if self.ctx.state() == ActorState::Processing { + self.ctx.idle(); + } if self.ctx.mailbox().is_last_mailbox() { // We double check here that the mailbox does not contain any messages, // as someone on different runtime thread could have added a last message diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index e7ac36569eb..0ee0533d6e5 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -86,7 +86,7 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"] ci-test = [] openssl-support = ["openssl-probe"] -tokio-console = ["console-subscriber"] +tokio-console = ["console-subscriber", "quickwit-common/named_tasks"] release-feature-set = [ "jemalloc", "openssl-support", diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 4ea3c9d407c..9acdd37f735 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -41,6 +41,9 @@ tracing = { workspace = true } [features] testsuite = [] +named_tasks = ["tokio/tracing"] + + [dev-dependencies] serde_json = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index fc88f1821cb..eb3239503ca 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -47,6 +47,7 @@ pub mod uri; use std::env; use std::fmt::{Debug, Display}; +use std::future::Future; use std::ops::{Range, RangeInclusive}; use std::str::FromStr; @@ -198,6 +199,56 @@ pub const fn div_ceil(lhs: i64, rhs: i64) -> i64 { } } +#[cfg(not(feature = "named_tasks"))] +pub fn spawn_named_task(future: F, _name: &'static str) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + tokio::task::spawn(future) +} + +#[cfg(not(feature = "named_tasks"))] +pub fn spawn_named_task_on( + future: F, + _name: &'static str, + runtime: &tokio::runtime::Handle, +) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + runtime.spawn(future) +} + +#[cfg(feature = "named_tasks")] +pub fn spawn_named_task(future: F, name: &'static str) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + tokio::task::Builder::new() + .name(name) + .spawn(future) + .unwrap() +} + +#[cfg(feature = "named_tasks")] +pub fn spawn_named_task_on( + future: F, + name: &'static str, + runtime: &tokio::runtime::Handle, +) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + tokio::task::Builder::new() + .name(name) + .spawn_on(future, runtime) + .unwrap() +} + #[cfg(test)] mod tests { use std::io::ErrorKind; diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index f97cdee7eed..19d4977d099 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -417,8 +417,9 @@ impl Actor for Indexer { // Time to take a nap. let sleep_for = commit_timeout - elapsed; + ctx.pause(); ctx.schedule_self_msg(sleep_for, Command::Resume); - self.handle(Command::Pause, ctx).await?; + Ok(()) } diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index ede253b15a9..c6a62ca2e38 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -301,7 +301,7 @@ impl Handler for Uploader { let merge_policy = self.merge_policy.clone(); debug!(split_ids=?split_ids, "start-stage-and-store-splits"); let event_broker = self.event_broker.clone(); - tokio::spawn( + quickwit_common::spawn_named_task( async move { fail_point!("uploader:intask:before"); @@ -383,6 +383,7 @@ impl Handler for Uploader { Result::<(), anyhow::Error>::Ok(()) } .instrument(Span::current()), + "upload-single-task" ); fail_point!("uploader:intask:after"); Ok(()) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index cbf94bb6ed3..7e36cf8423c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -27,7 +27,7 @@ use bytes::{BufMut, BytesMut}; use futures::StreamExt; use mrecordlog::MultiRecordLog; use quickwit_common::retry::RetryParams; -use quickwit_common::ServiceStream; +use quickwit_common::{spawn_named_task, ServiceStream}; use quickwit_proto::ingest::ingester::{ fetch_message, FetchEof, FetchMessage, FetchPayload, IngesterService, OpenFetchStreamRequest, }; @@ -98,7 +98,7 @@ impl FetchStreamTask { batch_num_bytes, }; let future = async move { fetch_task.run().await }; - let fetch_task_handle: JoinHandle<()> = tokio::spawn(future); + let fetch_task_handle: JoinHandle<()> = spawn_named_task(future, "fetch_task"); (fetch_stream, fetch_task_handle) } @@ -308,7 +308,7 @@ impl MultiFetchStream { self.retry_params, self.fetch_message_tx.clone(), ); - let fetch_task_handle = tokio::spawn(fetch_stream_future); + let fetch_task_handle = spawn_named_task(fetch_stream_future, "fetch_stream_future"); self.fetch_task_handles.insert(queue_id, fetch_task_handle); Ok(()) } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index fa760e310bc..aeef3b3b8c8 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -62,6 +62,7 @@ use quickwit_cluster::{ use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; use quickwit_common::rate_limiter::RateLimiterSettings; use quickwit_common::runtimes::RuntimesConfig; +use quickwit_common::spawn_named_task; use quickwit_common::tower::{ BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, ConstantRate, EstimateRateLayer, EventListenerLayer, RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, @@ -611,12 +612,16 @@ pub async fn serve_quickwit( // Node readiness indicates that the server is ready to receive requests. // Thus readiness task is started once gRPC and REST servers are started. - tokio::spawn(node_readiness_reporting_task( - cluster, - metastore_through_control_plane, - grpc_readiness_signal_rx, - rest_readiness_signal_rx, - )); + spawn_named_task( + node_readiness_reporting_task( + cluster, + metastore_through_control_plane, + grpc_readiness_signal_rx, + rest_readiness_signal_rx, + ), + "node_readiness_reporting", + ); + let shutdown_handle = tokio::spawn(async move { shutdown_signal.await; @@ -633,8 +638,8 @@ pub async fn serve_quickwit( } actor_exit_statuses }); - let grpc_join_handle = tokio::spawn(grpc_server); - let rest_join_handle = tokio::spawn(rest_server); + let grpc_join_handle = spawn_named_task(grpc_server, "grpc_server"); + let rest_join_handle = spawn_named_task(rest_server, "rest_server"); let (grpc_res, rest_res) = tokio::try_join!(grpc_join_handle, rest_join_handle) .expect("the tasks running the gRPC and REST servers should not panic or be cancelled");