From dab1b60b8d1b13dc1aa1ece12d7b3046dcb83f87 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 21 Feb 2024 15:05:57 +0900 Subject: [PATCH] moar logging --- quickwit/Cargo.toml | 2 +- quickwit/quickwit-indexing/src/models/shard_positions.rs | 4 +++- quickwit/quickwit-ingest/src/ingest_v2/ingester.rs | 6 ++++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index abf45e56cc8..40db009b020 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -153,7 +153,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" -mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "2914cad" } +mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "87862afe93" } new_string_template = "1.4.0" nom = "7.1.3" num_cpus = "1" diff --git a/quickwit/quickwit-indexing/src/models/shard_positions.rs b/quickwit/quickwit-indexing/src/models/shard_positions.rs index e1c41b72131..23e95f4829b 100644 --- a/quickwit/quickwit-indexing/src/models/shard_positions.rs +++ b/quickwit/quickwit-indexing/src/models/shard_positions.rs @@ -28,7 +28,7 @@ use quickwit_cluster::{Cluster, ListenerHandle}; use quickwit_common::pubsub::{Event, EventBroker}; use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::types::{Position, ShardId, SourceUid}; -use tracing::{error, warn}; +use tracing::{error, info, warn}; /// Prefix used in chitchat to publish the shard positions. const SHARD_POSITIONS_PREFIX: &str = "indexer.shard_positions:"; @@ -176,7 +176,9 @@ impl Handler for ShardPositionsService { source_uid, shard_positions, } = update; + info!(shard_positions=?shard_positions, "cluster position update"); let updated_shard_positions = self.apply_update(&source_uid, shard_positions); + info!(updated_shard_positions=?updated_shard_positions, "cluster position update"); if !updated_shard_positions.is_empty() { self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions); } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 1245dae3316..187f5795329 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -40,6 +40,7 @@ use quickwit_common::{rate_limited_warn, ServiceStream}; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient, }; + use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::ingest::ingester::{ AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest, @@ -1096,12 +1097,15 @@ impl IngesterService for Ingester { #[async_trait] impl EventSubscriber for WeakIngesterState { async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) { + info!(positions_update=?shard_positions_update.updated_shard_positions, "received shard position update"); let Some(state) = self.upgrade() else { + warn!("ingester state update failed"); return; }; let Ok(mut state_guard) = with_lock_metrics!(state.lock_fully().await, "gc_shards", "write") else { + error!("failed to lock the ingester state"); return; }; let index_uid = shard_positions_update.source_uid.index_uid; @@ -1111,8 +1115,10 @@ impl EventSubscriber for WeakIngesterState { let queue_id = queue_id(&index_uid, &source_id, &shard_id); if shard_position.is_eof() { + info!(shard=queue_id, "deleting shard"); state_guard.delete_shard(&queue_id).await; } else { + info!(shard=queue_id, shard_position=%shard_position, "truncating shard"); state_guard.truncate_shard(&queue_id, &shard_position).await; } }