From 495ae550fd3c258e232c8312e68a13e104ee2791 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 9 Feb 2024 09:25:04 -0500 Subject: [PATCH] Fix race condition during ingester state initialization (#4557) --- .../src/ingest_v2/broadcast.rs | 8 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 70 +++---- .../src/ingest_v2/replication.rs | 8 +- .../quickwit-ingest/src/ingest_v2/state.rs | 178 +++++++++++++++--- .../src/actors/delete_task_service.rs | 2 +- 5 files changed, 186 insertions(+), 80 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 8a5c3e9a1c7..04ea2863f61 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -161,8 +161,10 @@ impl BroadcastLocalShardsTask { async fn snapshot_local_shards(&self) -> Option { let state = self.weak_state.upgrade()?; - let mut state_guard = state.lock_partially().await; + let Ok(mut state_guard) = state.lock_partially().await else { + return Some(LocalShardsSnapshot::default()); + }; let mut per_source_shard_infos: BTreeMap = BTreeMap::new(); let queue_ids: Vec<(QueueId, ShardState)> = state_guard @@ -466,7 +468,7 @@ mod tests { let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) .await .unwrap(); - let (_temp_dir, state, _status_rx) = IngesterState::for_test().await; + let (_temp_dir, state) = IngesterState::for_test().await; let weak_state = state.weak(); let task = BroadcastLocalShardsTask { cluster, @@ -475,7 +477,7 @@ mod tests { let previous_snapshot = task.snapshot_local_shards().await.unwrap(); assert!(previous_snapshot.per_source_shard_infos.is_empty()); - let mut state_guard = state.lock_partially().await; + let mut state_guard = state.lock_partially().await.unwrap(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index add62335093..20dcebbae38 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -50,7 +50,6 @@ use quickwit_proto::ingest::ingester::{ }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; use quickwit_proto::types::{queue_id, IndexUid, NodeId, Position, QueueId, SourceId}; -use tokio::sync::watch; use tracing::{debug, error, info, warn}; use super::broadcast::BroadcastLocalShardsTask; @@ -87,7 +86,6 @@ pub struct Ingester { memory_capacity: ByteSize, rate_limiter_settings: RateLimiterSettings, replication_factor: usize, - status_rx: watch::Receiver, } impl fmt::Debug for Ingester { @@ -109,7 +107,7 @@ impl Ingester { replication_factor: usize, ) -> IngestV2Result { let self_node_id: NodeId = cluster.self_node_id().into(); - let (state, status_rx) = IngesterState::load(wal_dir_path, rate_limiter_settings); + let state = IngesterState::load(wal_dir_path, rate_limiter_settings); let ingester = Self { self_node_id, ingester_pool, @@ -118,7 +116,6 @@ impl Ingester { memory_capacity, rate_limiter_settings, replication_factor, - status_rx, }; let weak_state = state.weak(); BroadcastLocalShardsTask::spawn(cluster, weak_state); @@ -128,14 +125,11 @@ impl Ingester { /// Checks whether the ingester is fully decommissioned and updates its status accordingly. fn check_decommissioning_status(&self, state: &mut InnerIngesterState) { - if state.status == IngesterStatus::Decommissioning - && state.shards.values().all(|shard| shard.is_indexed()) - { - state.status = IngesterStatus::Decommissioned; - state - .status_tx - .send(IngesterStatus::Decommissioned) - .expect("channel should be open"); + if state.status() != IngesterStatus::Decommissioning { + return; + } + if state.shards.values().all(|shard| shard.is_indexed()) { + state.set_status(IngesterStatus::Decommissioned); } } @@ -308,7 +302,7 @@ impl Ingester { let mut state_guard = with_lock_metrics!(self.state.lock_fully().await, "persist", "write")?; - if state_guard.status != IngesterStatus::Ready { + if state_guard.status() != IngesterStatus::Ready { persist_failures.reserve_exact(persist_request.subrequests.len()); for subrequest in persist_request.subrequests { @@ -670,9 +664,9 @@ impl Ingester { let leader_id: NodeId = open_replication_stream_request.leader_id.into(); let follower_id: NodeId = open_replication_stream_request.follower_id.into(); - let mut state_guard = self.state.lock_partially().await; + let mut state_guard = self.state.lock_partially().await?; - if state_guard.status != IngesterStatus::Ready { + if state_guard.status() != IngesterStatus::Ready { return Err(IngestV2Error::Internal("node decommissioned".to_string())); } let Entry::Vacant(entry) = state_guard.replication_tasks.entry(leader_id.clone()) else { @@ -712,7 +706,7 @@ impl Ingester { let shard_status_rx = self .state .lock_partially() - .await + .await? .shards .get(&queue_id) .ok_or(IngestV2Error::ShardNotFound { @@ -734,7 +728,7 @@ impl Ingester { &mut self, _open_observation_stream_request: OpenObservationStreamRequest, ) -> IngestV2Result> { - let status_stream = ServiceStream::from(self.status_rx.clone()); + let status_stream = ServiceStream::from(self.state.status_rx.clone()); let self_node_id = self.self_node_id.clone(); let observation_stream = status_stream.map(move |status| { let observation_message = ObservationMessage { @@ -753,7 +747,7 @@ impl Ingester { let mut state_guard = with_lock_metrics!(self.state.lock_fully().await, "init_shards", "write")?; - if state_guard.status != IngesterStatus::Ready { + if state_guard.status() != IngesterStatus::Ready { return Err(IngestV2Error::Internal("node decommissioned".to_string())); } @@ -809,7 +803,7 @@ impl Ingester { close_shards_request: CloseShardsRequest, ) -> IngestV2Result { let mut state_guard = - with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write"); + with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write")?; for shard_ids in close_shards_request.shards { for queue_id in shard_ids.queue_ids() { @@ -823,9 +817,9 @@ impl Ingester { } async fn ping_inner(&mut self, ping_request: PingRequest) -> IngestV2Result { - let state_guard = self.state.lock_partially().await; + let state_guard = self.state.lock_partially().await?; - if state_guard.status != IngesterStatus::Ready { + if state_guard.status() != IngesterStatus::Ready { return Err(IngestV2Error::Internal("node decommissioned".to_string())); } if ping_request.leader_id != self.self_node_id { @@ -857,13 +851,13 @@ impl Ingester { _decommission_request: DecommissionRequest, ) -> IngestV2Result { info!("decommissioning ingester"); - let mut state_guard = self.state.lock_partially().await; + let mut state_guard = self.state.lock_partially().await?; for shard in state_guard.shards.values_mut() { shard.shard_state = ShardState::Closed; shard.notify_shard_status(); } - state_guard.status = IngesterStatus::Decommissioning; + state_guard.set_status(IngesterStatus::Decommissioning); self.check_decommissioning_status(&mut state_guard); Ok(DecommissionResponse {}) @@ -1291,8 +1285,7 @@ mod tests { .await .unwrap(); - state_guard.status = IngesterStatus::Initializing; - state_guard.status_tx.send(IngesterStatus::Ready).unwrap(); + state_guard.set_status(IngesterStatus::Initializing); drop(state_guard); @@ -1316,8 +1309,7 @@ mod tests { state_guard.rate_trackers.contains_key(&queue_id_02); - assert_eq!(state_guard.status, IngesterStatus::Ready); - assert_eq!(*state_guard.status_tx.borrow(), IngesterStatus::Ready); + assert_eq!(state_guard.status(), IngesterStatus::Ready); } #[tokio::test] @@ -2588,7 +2580,7 @@ mod tests { .await .unwrap(); - let state_guard = ingester.state.lock_partially().await; + let state_guard = ingester.state.lock_partially().await.unwrap(); let shard = state_guard.shards.get(&queue_id).unwrap(); shard.assert_is_closed(); @@ -2614,11 +2606,8 @@ mod tests { assert_eq!(observation.node_id, ingester_ctx.node_id); assert_eq!(observation.status(), IngesterStatus::Ready); - let state_guard = ingester.state.lock_fully().await.unwrap(); - state_guard - .status_tx - .send(IngesterStatus::Decommissioning) - .unwrap(); + let mut state_guard = ingester.state.lock_fully().await.unwrap(); + state_guard.set_status(IngesterStatus::Decommissioning); drop(state_guard); let observation = observation_stream.next().await.unwrap().unwrap(); @@ -2637,15 +2626,13 @@ mod tests { let mut state_guard = ingester.state.lock_fully().await.unwrap(); ingester.check_decommissioning_status(&mut state_guard); - assert_eq!(state_guard.status, IngesterStatus::Ready); - assert_eq!(*ingester.status_rx.borrow(), IngesterStatus::Ready); + assert_eq!(state_guard.status(), IngesterStatus::Ready); - state_guard.status = IngesterStatus::Decommissioning; + state_guard.set_status(IngesterStatus::Decommissioning); ingester.check_decommissioning_status(&mut state_guard); - assert_eq!(state_guard.status, IngesterStatus::Decommissioned); - assert_eq!(*ingester.status_rx.borrow(), IngesterStatus::Decommissioned); + assert_eq!(state_guard.status(), IngesterStatus::Decommissioned); - state_guard.status = IngesterStatus::Decommissioning; + state_guard.set_status(IngesterStatus::Decommissioning); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); @@ -2659,14 +2646,13 @@ mod tests { ), ); ingester.check_decommissioning_status(&mut state_guard); - assert_eq!(state_guard.status, IngesterStatus::Decommissioning); + assert_eq!(state_guard.status(), IngesterStatus::Decommissioning); let shard = state_guard.shards.get_mut(&queue_id_01).unwrap(); shard.truncation_position_inclusive = Position::Beginning.as_eof(); ingester.check_decommissioning_status(&mut state_guard); - assert_eq!(state_guard.status, IngesterStatus::Decommissioned); - assert_eq!(*ingester.status_rx.borrow(), IngesterStatus::Decommissioned); + assert_eq!(state_guard.status(), IngesterStatus::Decommissioned); } #[tokio::test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 2f0105544aa..58eb6901cc8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -525,7 +525,7 @@ impl ReplicationTask { let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "replicate", "write").await?; - if state_guard.status != IngesterStatus::Ready { + if state_guard.status() != IngesterStatus::Ready { replicate_failures.reserve_exact(replicate_request.subrequests.len()); for subrequest in replicate_request.subrequests { @@ -1014,7 +1014,7 @@ mod tests { async fn test_replication_task_happy_path() { let leader_id: NodeId = "test-leader".into(); let follower_id: NodeId = "test-follower".into(); - let (_temp_dir, state, _status_rx) = IngesterState::for_test().await; + let (_temp_dir, state) = IngesterState::for_test().await; let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); let (ack_replication_stream_tx, mut ack_replication_stream) = @@ -1277,7 +1277,7 @@ mod tests { async fn test_replication_task_shard_closed() { let leader_id: NodeId = "test-leader".into(); let follower_id: NodeId = "test-follower".into(); - let (_temp_dir, state, _status_rx) = IngesterState::for_test().await; + let (_temp_dir, state) = IngesterState::for_test().await; let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); let (ack_replication_stream_tx, mut ack_replication_stream) = @@ -1352,7 +1352,7 @@ mod tests { async fn test_replication_task_resource_exhausted() { let leader_id: NodeId = "test-leader".into(); let follower_id: NodeId = "test-follower".into(); - let (_temp_dir, state, _status_rx) = IngesterState::for_test().await; + let (_temp_dir, state) = IngesterState::for_test().await; let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); let (ack_replication_stream_tx, mut ack_replication_stream) = diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index cc6cddf0e3d..bc0b0fa0851 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::fmt; use std::ops::{Deref, DerefMut}; use std::path::Path; use std::sync::{Arc, Weak}; @@ -49,6 +50,7 @@ pub(super) struct IngesterState { // `inner` is a mutex because it's almost always accessed mutably. inner: Arc>, mrecordlog: Arc>>, + pub status_rx: watch::Receiver, } pub(super) struct InnerIngesterState { @@ -58,15 +60,23 @@ pub(super) struct InnerIngesterState { pub replication_streams: FnvHashMap, // Replication tasks running for each replication stream opened with leaders. pub replication_tasks: FnvHashMap, - pub status: IngesterStatus, - pub status_tx: watch::Sender, + status: IngesterStatus, + status_tx: watch::Sender, +} + +impl InnerIngesterState { + pub fn status(&self) -> IngesterStatus { + self.status + } + + pub fn set_status(&mut self, status: IngesterStatus) { + self.status = status; + self.status_tx.send(status).expect("channel should be open"); + } } impl IngesterState { - pub fn load( - wal_dir_path: &Path, - rate_limiter_settings: RateLimiterSettings, - ) -> (Self, watch::Receiver) { + fn new() -> Self { let status = IngesterStatus::Initializing; let (status_tx, status_rx) = watch::channel(status); let inner = InnerIngesterState { @@ -79,30 +89,39 @@ impl IngesterState { }; let inner = Arc::new(Mutex::new(inner)); let mrecordlog = Arc::new(RwLock::new(None)); - let state = Self { inner, mrecordlog }; + Self { + inner, + mrecordlog, + status_rx, + } + } + + pub fn load(wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings) -> Self { + let state = Self::new(); let state_clone = state.clone(); let wal_dir_path = wal_dir_path.to_path_buf(); + let init_future = async move { state_clone.init(&wal_dir_path, rate_limiter_settings).await; }; tokio::spawn(init_future); - (state, status_rx) + state } #[cfg(test)] - pub async fn for_test() -> (tempfile::TempDir, Self, watch::Receiver) { + pub async fn for_test() -> (tempfile::TempDir, Self) { let temp_dir = tempfile::tempdir().unwrap(); - let (state, mut status_rx) = - IngesterState::load(temp_dir.path(), RateLimiterSettings::default()); + let mut state = IngesterState::load(temp_dir.path(), RateLimiterSettings::default()); - status_rx + state + .status_rx .wait_for(|status| *status == IngesterStatus::Ready) .await .unwrap(); - (temp_dir, state, status_rx) + (temp_dir, state) } /// Initializes the internal state of the ingester. It loads the local WAL, then lists all its @@ -110,6 +129,8 @@ impl IngesterState { /// corresponding shards are closed and become read-only. pub async fn init(&self, wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings) { let mut inner_guard = self.inner.lock().await; + let mut mrecordlog_guard = self.mrecordlog.write().await; + let now = Instant::now(); info!( @@ -132,8 +153,7 @@ impl IngesterState { } Err(error) => { error!("failed to open write-ahead log: {error}"); - inner_guard.status = IngesterStatus::Failed; - let _ = inner_guard.status_tx.send(IngesterStatus::Failed); + inner_guard.set_status(IngesterStatus::Failed); return; } }; @@ -186,35 +206,53 @@ impl IngesterState { if num_deleted_shards > 0 { info!("deleted {num_deleted_shards} empty shard(s)"); } - inner_guard.status = IngesterStatus::Ready; - let _ = inner_guard.status_tx.send(IngesterStatus::Ready); - - self.mrecordlog.write().await.replace(mrecordlog); + mrecordlog_guard.replace(mrecordlog); + inner_guard.set_status(IngesterStatus::Ready); } - pub async fn lock_partially(&self) -> PartiallyLockedIngesterState<'_> { - PartiallyLockedIngesterState { - inner: self.inner.lock().await, + pub async fn lock_partially(&self) -> IngestV2Result> { + if *self.status_rx.borrow() == IngesterStatus::Initializing { + return Err(IngestV2Error::Internal( + "ingester is initializing".to_string(), + )); + } + let inner_guard = self.inner.lock().await; + + if inner_guard.status() == IngesterStatus::Failed { + return Err(IngestV2Error::Internal( + "failed to initialize ingester".to_string(), + )); } + let partial_lock = PartiallyLockedIngesterState { inner: inner_guard }; + Ok(partial_lock) } pub async fn lock_fully(&self) -> IngestV2Result> { + if *self.status_rx.borrow() == IngesterStatus::Initializing { + return Err(IngestV2Error::Internal( + "ingester is initializing".to_string(), + )); + } // We assume that the mrecordlog lock is the most "expensive" one to acquire, so we acquire // it first. - let mrecordlog = self.mrecordlog.write().await; - let inner = self.inner.lock().await; + let mrecordlog_opt_guard = self.mrecordlog.write().await; + let inner_guard = self.inner.lock().await; - if inner.status == IngesterStatus::Initializing { + if inner_guard.status() == IngesterStatus::Failed { return Err(IngestV2Error::Internal( - "ingester is initializing".to_string(), + "failed to initialize ingester".to_string(), )); } - let mrecordlog = RwLockWriteGuard::map(mrecordlog, |mrecordlog| { - mrecordlog + let mrecordlog_guard = RwLockWriteGuard::map(mrecordlog_opt_guard, |mrecordlog_opt| { + mrecordlog_opt .as_mut() .expect("mrecordlog should be initialized") }); - Ok(FullyLockedIngesterState { inner, mrecordlog }) + let full_lock = FullyLockedIngesterState { + inner: inner_guard, + mrecordlog: mrecordlog_guard, + }; + Ok(full_lock) } // Leaks the mrecordlog lock for use in fetch tasks. It's safe to do so because fetch tasks @@ -227,6 +265,7 @@ impl IngesterState { WeakIngesterState { inner: Arc::downgrade(&self.inner), mrecordlog: Arc::downgrade(&self.mrecordlog), + status_rx: self.status_rx.clone(), } } } @@ -235,6 +274,12 @@ pub(super) struct PartiallyLockedIngesterState<'a> { pub inner: MutexGuard<'a, InnerIngesterState>, } +impl fmt::Debug for PartiallyLockedIngesterState<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PartiallyLockedIngesterState").finish() + } +} + impl Deref for PartiallyLockedIngesterState<'_> { type Target = InnerIngesterState; @@ -254,6 +299,12 @@ pub(super) struct FullyLockedIngesterState<'a> { pub mrecordlog: RwLockMappedWriteGuard<'a, MultiRecordLog>, } +impl fmt::Debug for FullyLockedIngesterState<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FullyLockedIngesterState").finish() + } +} + impl Deref for FullyLockedIngesterState<'_> { type Target = InnerIngesterState; @@ -332,12 +383,79 @@ impl FullyLockedIngesterState<'_> { pub(super) struct WeakIngesterState { inner: Weak>, mrecordlog: Weak>>, + status_rx: watch::Receiver, } impl WeakIngesterState { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let mrecordlog = self.mrecordlog.upgrade()?; - Some(IngesterState { inner, mrecordlog }) + let status_rx = self.status_rx.clone(); + Some(IngesterState { + inner, + mrecordlog, + status_rx, + }) + } +} + +#[cfg(test)] +mod tests { + use tokio::time::timeout; + + use super::*; + + #[tokio::test] + async fn test_ingester_state_does_not_lock_while_initializing() { + let state = IngesterState::new(); + let inner_guard = state.inner.lock().await; + + assert_eq!(inner_guard.status(), IngesterStatus::Initializing); + assert_eq!(*state.status_rx.borrow(), IngesterStatus::Initializing); + + let error = state.lock_partially().await.unwrap_err().to_string(); + assert!(error.contains("ingester is initializing")); + + let error = state.lock_fully().await.unwrap_err().to_string(); + assert!(error.contains("ingester is initializing")); + } + + #[tokio::test] + async fn test_ingester_state_failed() { + let state = IngesterState::new(); + + state.inner.lock().await.set_status(IngesterStatus::Failed); + + let error = state.lock_partially().await.unwrap_err().to_string(); + assert!(error.to_string().ends_with("failed to initialize ingester")); + + let error = state.lock_fully().await.unwrap_err().to_string(); + assert!(error.contains("failed to initialize ingester")); + } + + #[tokio::test] + async fn test_ingester_state_init() { + let mut state = IngesterState::new(); + + let temp_dir = tempfile::tempdir().unwrap(); + state + .init(temp_dir.path(), RateLimiterSettings::default()) + .await; + + timeout( + Duration::from_millis(100), + state + .status_rx + .wait_for(|status| *status == IngesterStatus::Ready), + ) + .await + .unwrap() + .unwrap(); + + state.lock_partially().await.unwrap(); + + let locked_state = state.lock_fully().await.unwrap(); + assert_eq!(locked_state.status(), IngesterStatus::Ready); + assert_eq!(*locked_state.status_tx.borrow(), IngesterStatus::Ready); } } diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs index 7fbc4b6fb2f..7d443defac7 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs @@ -151,7 +151,7 @@ impl DeleteTaskService { for index_uid in index_uids.difference(&pipeline_index_uids) { let index_config = index_config_by_index_id .remove(index_uid) - .expect("Index metadata must be present."); + .expect("index metadata should be present"); if self.spawn_pipeline(index_config, ctx).await.is_err() { warn!("failed to spawn delete pipeline for {}", index_uid.index_id); }