From 5de8b26b1cc79f71a75d3312533d0646cd512922 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 26 Dec 2023 08:23:56 +0100 Subject: [PATCH] added logs --- quickwit/quickwit-control-plane/src/indexing_plan.rs | 5 ++--- quickwit/quickwit-indexing/src/actors/indexer.rs | 4 +++- quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs | 3 ++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index 7638cb26c05..ea4e121b271 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -65,9 +65,8 @@ impl PhysicalIndexingPlan { } pub fn normalize(&mut self) { - self.indexing_tasks_per_indexer_id.retain(|_indexer_id, tasks| { - !tasks.is_empty() - }); + self.indexing_tasks_per_indexer_id + .retain(|_indexer_id, tasks| !tasks.is_empty()); for tasks in self.indexing_tasks_per_indexer_id.values_mut() { tasks.sort_by(|left, right| { left.index_uid diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 62c8b9db056..1a09358ebda 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -246,6 +246,7 @@ impl IndexerState { indexing_workbench_opt: &'a mut Option, ctx: &'a ActorContext, ) -> anyhow::Result<&'a mut IndexingWorkbench> { + warn!(pipeline_uid=%self.pipeline_id.pipeline_uid.0, index=self.pipeline_id.index_uid.index_id(), "get-or-create-workbench"); if indexing_workbench_opt.is_none() { let indexing_workbench = self.create_workbench(ctx).await?; let commit_timeout_message = CommitTimeout { @@ -488,6 +489,7 @@ impl Handler for Indexer { message: NewPublishLock, _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + warn!("new-publish-lock"); let NewPublishLock(publish_lock) = message; self.indexing_workbench_opt = None; self.indexer_state.publish_lock = publish_lock; @@ -652,7 +654,7 @@ impl Indexer { } let num_splits = splits.len() as u64; let split_ids = splits.iter().map(|split| split.split_id()).join(","); - info!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer"); + warn!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, delta=?checkpoint_delta.source_delta, index=?self.indexer_state.pipeline_id.index_uid.index_id(), pipeline_id=%self.indexer_state.pipeline_id.pipeline_uid, "send-to-index-serializer"); ctx.send_message( &self.index_serializer_mailbox, IndexedSplitBatchBuilder { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index b594e6ced36..e7d172fade2 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -41,7 +41,7 @@ use quickwit_proto::metastore::{ use quickwit_proto::types::ShardId; use quickwit_storage::{Storage, StorageResolver}; use tokio::sync::Semaphore; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; use super::MergePlanner; use crate::actors::doc_processor::DocProcessor; @@ -560,6 +560,7 @@ impl Handler for IndexingPipeline { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.shard_ids = assign_shards_message.0.shard_ids.clone(); + warn!(shard_ids=?self.shard_ids, index=self.params.pipeline_id.index_uid.index_id(), pipeline=%self.params.pipeline_id.pipeline_uid.0, "assign shards"); // If the pipeline is running, we forward the message to its source. // If it is not, it will be respawned soon, and the shards will be assigned afterward. if let Some(handles) = &mut self.handles_opt {