Skip to content

Commit

Permalink
moar logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 21, 2024
1 parent a8ef8f3 commit dab1b60
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "2914cad" }
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "87862afe93" }
new_string_template = "1.4.0"
nom = "7.1.3"
num_cpus = "1"
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quickwit_cluster::{Cluster, ListenerHandle};
use quickwit_common::pubsub::{Event, EventBroker};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::types::{Position, ShardId, SourceUid};
use tracing::{error, warn};
use tracing::{error, info, warn};

/// Prefix used in chitchat to publish the shard positions.
const SHARD_POSITIONS_PREFIX: &str = "indexer.shard_positions:";
Expand Down Expand Up @@ -176,7 +176,9 @@ impl Handler<ClusterShardPositionsUpdate> for ShardPositionsService {
source_uid,
shard_positions,
} = update;
info!(shard_positions=?shard_positions, "cluster position update");
let updated_shard_positions = self.apply_update(&source_uid, shard_positions);
info!(updated_shard_positions=?updated_shard_positions, "cluster position update");
if !updated_shard_positions.is_empty() {
self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions);
}
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
};

use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{
AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest,
Expand Down Expand Up @@ -1096,12 +1097,15 @@ impl IngesterService for Ingester {
#[async_trait]
impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) {
info!(positions_update=?shard_positions_update.updated_shard_positions, "received shard position update");
let Some(state) = self.upgrade() else {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
else {
error!("failed to lock the ingester state");
return;
};
let index_uid = shard_positions_update.source_uid.index_uid;
Expand All @@ -1111,8 +1115,10 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
let queue_id = queue_id(&index_uid, &source_id, &shard_id);

if shard_position.is_eof() {
info!(shard=queue_id, "deleting shard");
state_guard.delete_shard(&queue_id).await;
} else {
info!(shard=queue_id, shard_position=%shard_position, "truncating shard");
state_guard.truncate_shard(&queue_id, &shard_position).await;
}
}
Expand Down

0 comments on commit dab1b60

Please sign in to comment.