From bc6403737bc604a859e21c981e54c05520f310db Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 26 Mar 2024 15:00:59 +0900 Subject: [PATCH] adding more logs to the control plane. These logs are related to the removal of shards. --- .../quickwit-control-plane/src/control_plane.rs | 3 +++ quickwit/quickwit-control-plane/src/model/mod.rs | 1 + .../quickwit-control-plane/src/model/shard_table.rs | 13 +++++++++++-- quickwit/quickwit-ingest/src/ingest_v2/state.rs | 1 + 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 6cc379241fb..43995831b10 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -113,6 +113,7 @@ impl ControlPlane { ingester_pool: IngesterPool, metastore: MetastoreServiceClient, ) -> (Mailbox, ActorHandle>) { + info!("starting control plane"); universe.spawn_builder().supervise_fn(move || { let cluster_id = cluster_config.cluster_id.clone(); let replication_factor = cluster_config.replication_factor; @@ -260,6 +261,7 @@ impl ControlPlane { shard_ids: &[ShardId], progress: &Progress, ) -> anyhow::Result<()> { + info!(shard_ids=?shard_ids, "delete shards"); let delete_shards_request = DeleteShardsRequest { index_uid: Some(source_uid.index_uid.clone()), source_id: source_uid.source_id.clone(), @@ -367,6 +369,7 @@ impl Handler for ControlPlane { Some(shard.publish_position_inclusive().max(&position).clone()); if position.is_eof() { // identify shards that have reached EOF but have not yet been removed. + info!(shard_id=%shard_id, position=?position, "received eof shard via gossip"); shard_ids_to_close.push(shard_id); } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 145fd8dfda0..bfd4f9ae55a 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -344,6 +344,7 @@ impl ControlPlaneModel { /// Removes the shards identified by their index UID, source ID, and shard IDs. pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) { + info!(source_uid=%source_uid, shard_ids=?shard_ids, "removing shards from model"); self.shard_table.delete_shards(source_uid, shard_ids); } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 42092b82d74..a6783ff9717 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -28,7 +28,7 @@ use quickwit_common::tower::ConstantRate; use quickwit_ingest::{RateMibPerSec, ShardInfo, ShardInfos}; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; -use tracing::{error, warn}; +use tracing::{error, info, warn}; /// Limits the number of shards that can be opened for scaling up a source to 5 per minute. const SCALING_UP_RATE_LIMITER_SETTINGS: RateLimiterSettings = RateLimiterSettings { @@ -124,6 +124,7 @@ pub(crate) struct ShardTable { // Removes the shards from the ingester_shards map. // // This function is used to maintain the shard table invariant. +// Logs an error if the shard was not found in the ingester_shards map. fn remove_shard_from_ingesters_internal( source_uid: &SourceUid, shard: &Shard, @@ -134,7 +135,13 @@ fn remove_shard_from_ingesters_internal( .get_mut(&node) .expect("shard table reached inconsistent state"); let shard_ids = ingester_shards.get_mut(source_uid).unwrap(); - shard_ids.remove(shard.shard_id()); + let shard_was_removed = shard_ids.remove(shard.shard_id()); + if !shard_was_removed { + error!( + "shard table has reached an inconsistent state. shard {shard:?} was removed from \ + the shard table but was apparently not in the ingester_shards map." + ); + } } } @@ -470,6 +477,8 @@ impl ShardTable { shard_entry.set_shard_state(ShardState::Closed); closed_shard_ids.push(shard_id.clone()); } + } else { + info!(shard=%shard_id, "ignoring attempt to close shard: it is unknown (probably because it has been deleted)"); } } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 769760dccf0..36a2e1786a2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -388,6 +388,7 @@ impl FullyLockedIngesterState<'_> { /// Deletes and truncates the shards as directed by the `advise_reset_shards_response` returned /// by the control plane. pub async fn reset_shards(&mut self, advise_reset_shards_response: &AdviseResetShardsResponse) { + info!("reset shards"); for shard_ids in &advise_reset_shards_response.shards_to_delete { for queue_id in shard_ids.queue_ids() { self.delete_shard(&queue_id).await;