Skip to content

Commit

Permalink
Adding more logs to diagnostic shard truncation problems. (#4635)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Feb 27, 2024
1 parent c81467d commit 9c82b7a
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "2914cad" }
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "bc6a998" }
new_string_template = "1.4.0"
nom = "7.1.3"
num_cpus = "1"
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_cluster::{Cluster, ListenerHandle};
use quickwit_common::pubsub::{Event, EventBroker};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::types::{Position, ShardId, SourceUid};
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

/// Prefix used in chitchat to publish the shard positions.
const SHARD_POSITIONS_PREFIX: &str = "indexer.shard_positions:";
Expand Down Expand Up @@ -223,6 +223,7 @@ impl Handler<ClusterShardPositionsUpdate> for ShardPositionsService {
shard_positions,
} = update;
let updated_shard_positions = self.apply_update(&source_uid, shard_positions);
debug!(updated_shard_positions=?updated_shard_positions, "cluster position update");
if !updated_shard_positions.is_empty() {
self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions);
}
Expand Down Expand Up @@ -276,6 +277,7 @@ impl ShardPositionsService {
source_uid: SourceUid,
shard_positions: Vec<(ShardId, Position)>,
) {
debug!(shard_positions=?shard_positions, "shard positions updates");
self.event_broker.publish(ShardPositionsUpdate {
source_uid,
updated_shard_positions: shard_positions,
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,22 +1098,25 @@ impl IngesterService for Ingester {
impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) {
let Some(state) = self.upgrade() else {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
else {
error!("failed to lock the ingester state");
return;
};
let index_uid = shard_positions_update.source_uid.index_uid;
let source_id = shard_positions_update.source_uid.source_id;

for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
let queue_id = queue_id(&index_uid, &source_id, &shard_id);

if shard_position.is_eof() {
info!(shard = queue_id, "deleting shard");
state_guard.delete_shard(&queue_id).await;
} else {
info!(shard=queue_id, shard_position=%shard_position, "truncating shard");
state_guard.truncate_shard(&queue_id, &shard_position).await;
}
}
Expand Down

0 comments on commit 9c82b7a

Please sign in to comment.