Skip to content

Commit

Permalink
Fix race condition during ingester state initialization (#4557)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Feb 9, 2024
1 parent 65c3d3f commit 495ae55
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 80 deletions.
8 changes: 5 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ impl BroadcastLocalShardsTask {

async fn snapshot_local_shards(&self) -> Option<LocalShardsSnapshot> {
let state = self.weak_state.upgrade()?;
let mut state_guard = state.lock_partially().await;

let Ok(mut state_guard) = state.lock_partially().await else {
return Some(LocalShardsSnapshot::default());
};
let mut per_source_shard_infos: BTreeMap<SourceUid, ShardInfos> = BTreeMap::new();

let queue_ids: Vec<(QueueId, ShardState)> = state_guard
Expand Down Expand Up @@ -466,7 +468,7 @@ mod tests {
let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)
.await
.unwrap();
let (_temp_dir, state, _status_rx) = IngesterState::for_test().await;
let (_temp_dir, state) = IngesterState::for_test().await;
let weak_state = state.weak();
let task = BroadcastLocalShardsTask {
cluster,
Expand All @@ -475,7 +477,7 @@ mod tests {
let previous_snapshot = task.snapshot_local_shards().await.unwrap();
assert!(previous_snapshot.per_source_shard_infos.is_empty());

let mut state_guard = state.lock_partially().await;
let mut state_guard = state.lock_partially().await.unwrap();

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
Expand Down
70 changes: 28 additions & 42 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use quickwit_proto::ingest::ingester::{
};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState};
use quickwit_proto::types::{queue_id, IndexUid, NodeId, Position, QueueId, SourceId};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};

use super::broadcast::BroadcastLocalShardsTask;
Expand Down Expand Up @@ -87,7 +86,6 @@ pub struct Ingester {
memory_capacity: ByteSize,
rate_limiter_settings: RateLimiterSettings,
replication_factor: usize,
status_rx: watch::Receiver<IngesterStatus>,
}

impl fmt::Debug for Ingester {
Expand All @@ -109,7 +107,7 @@ impl Ingester {
replication_factor: usize,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().into();
let (state, status_rx) = IngesterState::load(wal_dir_path, rate_limiter_settings);
let state = IngesterState::load(wal_dir_path, rate_limiter_settings);
let ingester = Self {
self_node_id,
ingester_pool,
Expand All @@ -118,7 +116,6 @@ impl Ingester {
memory_capacity,
rate_limiter_settings,
replication_factor,
status_rx,
};
let weak_state = state.weak();
BroadcastLocalShardsTask::spawn(cluster, weak_state);
Expand All @@ -128,14 +125,11 @@ impl Ingester {

/// Checks whether the ingester is fully decommissioned and updates its status accordingly.
fn check_decommissioning_status(&self, state: &mut InnerIngesterState) {
if state.status == IngesterStatus::Decommissioning
&& state.shards.values().all(|shard| shard.is_indexed())
{
state.status = IngesterStatus::Decommissioned;
state
.status_tx
.send(IngesterStatus::Decommissioned)
.expect("channel should be open");
if state.status() != IngesterStatus::Decommissioning {
return;
}
if state.shards.values().all(|shard| shard.is_indexed()) {
state.set_status(IngesterStatus::Decommissioned);
}
}

Expand Down Expand Up @@ -308,7 +302,7 @@ impl Ingester {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "persist", "write")?;

if state_guard.status != IngesterStatus::Ready {
if state_guard.status() != IngesterStatus::Ready {
persist_failures.reserve_exact(persist_request.subrequests.len());

for subrequest in persist_request.subrequests {
Expand Down Expand Up @@ -670,9 +664,9 @@ impl Ingester {
let leader_id: NodeId = open_replication_stream_request.leader_id.into();
let follower_id: NodeId = open_replication_stream_request.follower_id.into();

let mut state_guard = self.state.lock_partially().await;
let mut state_guard = self.state.lock_partially().await?;

if state_guard.status != IngesterStatus::Ready {
if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
}
let Entry::Vacant(entry) = state_guard.replication_tasks.entry(leader_id.clone()) else {
Expand Down Expand Up @@ -712,7 +706,7 @@ impl Ingester {
let shard_status_rx = self
.state
.lock_partially()
.await
.await?
.shards
.get(&queue_id)
.ok_or(IngestV2Error::ShardNotFound {
Expand All @@ -734,7 +728,7 @@ impl Ingester {
&mut self,
_open_observation_stream_request: OpenObservationStreamRequest,
) -> IngestV2Result<IngesterServiceStream<ObservationMessage>> {
let status_stream = ServiceStream::from(self.status_rx.clone());
let status_stream = ServiceStream::from(self.state.status_rx.clone());
let self_node_id = self.self_node_id.clone();
let observation_stream = status_stream.map(move |status| {
let observation_message = ObservationMessage {
Expand All @@ -753,7 +747,7 @@ impl Ingester {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "init_shards", "write")?;

if state_guard.status != IngesterStatus::Ready {
if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
}

Expand Down Expand Up @@ -809,7 +803,7 @@ impl Ingester {
close_shards_request: CloseShardsRequest,
) -> IngestV2Result<CloseShardsResponse> {
let mut state_guard =
with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write");
with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write")?;

for shard_ids in close_shards_request.shards {
for queue_id in shard_ids.queue_ids() {
Expand All @@ -823,9 +817,9 @@ impl Ingester {
}

async fn ping_inner(&mut self, ping_request: PingRequest) -> IngestV2Result<PingResponse> {
let state_guard = self.state.lock_partially().await;
let state_guard = self.state.lock_partially().await?;

if state_guard.status != IngesterStatus::Ready {
if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
}
if ping_request.leader_id != self.self_node_id {
Expand Down Expand Up @@ -857,13 +851,13 @@ impl Ingester {
_decommission_request: DecommissionRequest,
) -> IngestV2Result<DecommissionResponse> {
info!("decommissioning ingester");
let mut state_guard = self.state.lock_partially().await;
let mut state_guard = self.state.lock_partially().await?;

for shard in state_guard.shards.values_mut() {
shard.shard_state = ShardState::Closed;
shard.notify_shard_status();
}
state_guard.status = IngesterStatus::Decommissioning;
state_guard.set_status(IngesterStatus::Decommissioning);
self.check_decommissioning_status(&mut state_guard);

Ok(DecommissionResponse {})
Expand Down Expand Up @@ -1291,8 +1285,7 @@ mod tests {
.await
.unwrap();

state_guard.status = IngesterStatus::Initializing;
state_guard.status_tx.send(IngesterStatus::Ready).unwrap();
state_guard.set_status(IngesterStatus::Initializing);

drop(state_guard);

Expand All @@ -1316,8 +1309,7 @@ mod tests {

state_guard.rate_trackers.contains_key(&queue_id_02);

assert_eq!(state_guard.status, IngesterStatus::Ready);
assert_eq!(*state_guard.status_tx.borrow(), IngesterStatus::Ready);
assert_eq!(state_guard.status(), IngesterStatus::Ready);
}

#[tokio::test]
Expand Down Expand Up @@ -2588,7 +2580,7 @@ mod tests {
.await
.unwrap();

let state_guard = ingester.state.lock_partially().await;
let state_guard = ingester.state.lock_partially().await.unwrap();
let shard = state_guard.shards.get(&queue_id).unwrap();
shard.assert_is_closed();

Expand All @@ -2614,11 +2606,8 @@ mod tests {
assert_eq!(observation.node_id, ingester_ctx.node_id);
assert_eq!(observation.status(), IngesterStatus::Ready);

let state_guard = ingester.state.lock_fully().await.unwrap();
state_guard
.status_tx
.send(IngesterStatus::Decommissioning)
.unwrap();
let mut state_guard = ingester.state.lock_fully().await.unwrap();
state_guard.set_status(IngesterStatus::Decommissioning);
drop(state_guard);

let observation = observation_stream.next().await.unwrap().unwrap();
Expand All @@ -2637,15 +2626,13 @@ mod tests {
let mut state_guard = ingester.state.lock_fully().await.unwrap();

ingester.check_decommissioning_status(&mut state_guard);
assert_eq!(state_guard.status, IngesterStatus::Ready);
assert_eq!(*ingester.status_rx.borrow(), IngesterStatus::Ready);
assert_eq!(state_guard.status(), IngesterStatus::Ready);

state_guard.status = IngesterStatus::Decommissioning;
state_guard.set_status(IngesterStatus::Decommissioning);
ingester.check_decommissioning_status(&mut state_guard);
assert_eq!(state_guard.status, IngesterStatus::Decommissioned);
assert_eq!(*ingester.status_rx.borrow(), IngesterStatus::Decommissioned);
assert_eq!(state_guard.status(), IngesterStatus::Decommissioned);

state_guard.status = IngesterStatus::Decommissioning;
state_guard.set_status(IngesterStatus::Decommissioning);

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
Expand All @@ -2659,14 +2646,13 @@ mod tests {
),
);
ingester.check_decommissioning_status(&mut state_guard);
assert_eq!(state_guard.status, IngesterStatus::Decommissioning);
assert_eq!(state_guard.status(), IngesterStatus::Decommissioning);

let shard = state_guard.shards.get_mut(&queue_id_01).unwrap();
shard.truncation_position_inclusive = Position::Beginning.as_eof();

ingester.check_decommissioning_status(&mut state_guard);
assert_eq!(state_guard.status, IngesterStatus::Decommissioned);
assert_eq!(*ingester.status_rx.borrow(), IngesterStatus::Decommissioned);
assert_eq!(state_guard.status(), IngesterStatus::Decommissioned);
}

#[tokio::test]
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ impl ReplicationTask {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully(), "replicate", "write").await?;

if state_guard.status != IngesterStatus::Ready {
if state_guard.status() != IngesterStatus::Ready {
replicate_failures.reserve_exact(replicate_request.subrequests.len());

for subrequest in replicate_request.subrequests {
Expand Down Expand Up @@ -1014,7 +1014,7 @@ mod tests {
async fn test_replication_task_happy_path() {
let leader_id: NodeId = "test-leader".into();
let follower_id: NodeId = "test-follower".into();
let (_temp_dir, state, _status_rx) = IngesterState::for_test().await;
let (_temp_dir, state) = IngesterState::for_test().await;
let (syn_replication_stream_tx, syn_replication_stream) =
ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY);
let (ack_replication_stream_tx, mut ack_replication_stream) =
Expand Down Expand Up @@ -1277,7 +1277,7 @@ mod tests {
async fn test_replication_task_shard_closed() {
let leader_id: NodeId = "test-leader".into();
let follower_id: NodeId = "test-follower".into();
let (_temp_dir, state, _status_rx) = IngesterState::for_test().await;
let (_temp_dir, state) = IngesterState::for_test().await;
let (syn_replication_stream_tx, syn_replication_stream) =
ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY);
let (ack_replication_stream_tx, mut ack_replication_stream) =
Expand Down Expand Up @@ -1352,7 +1352,7 @@ mod tests {
async fn test_replication_task_resource_exhausted() {
let leader_id: NodeId = "test-leader".into();
let follower_id: NodeId = "test-follower".into();
let (_temp_dir, state, _status_rx) = IngesterState::for_test().await;
let (_temp_dir, state) = IngesterState::for_test().await;
let (syn_replication_stream_tx, syn_replication_stream) =
ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY);
let (ack_replication_stream_tx, mut ack_replication_stream) =
Expand Down
Loading

0 comments on commit 495ae55

Please sign in to comment.