Skip to content

Commit

Permalink
added logs
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Dec 26, 2023
1 parent f309022 commit 5de8b26
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
5 changes: 2 additions & 3 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ impl PhysicalIndexingPlan {
}

pub fn normalize(&mut self) {
self.indexing_tasks_per_indexer_id.retain(|_indexer_id, tasks| {
!tasks.is_empty()
});
self.indexing_tasks_per_indexer_id
.retain(|_indexer_id, tasks| !tasks.is_empty());
for tasks in self.indexing_tasks_per_indexer_id.values_mut() {
tasks.sort_by(|left, right| {
left.index_uid
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl IndexerState {
indexing_workbench_opt: &'a mut Option<IndexingWorkbench>,
ctx: &'a ActorContext<Indexer>,
) -> anyhow::Result<&'a mut IndexingWorkbench> {
warn!(pipeline_uid=%self.pipeline_id.pipeline_uid.0, index=self.pipeline_id.index_uid.index_id(), "get-or-create-workbench");
if indexing_workbench_opt.is_none() {
let indexing_workbench = self.create_workbench(ctx).await?;
let commit_timeout_message = CommitTimeout {
Expand Down Expand Up @@ -488,6 +489,7 @@ impl Handler<NewPublishLock> for Indexer {
message: NewPublishLock,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
warn!("new-publish-lock");
let NewPublishLock(publish_lock) = message;
self.indexing_workbench_opt = None;
self.indexer_state.publish_lock = publish_lock;
Expand Down Expand Up @@ -652,7 +654,7 @@ impl Indexer {
}
let num_splits = splits.len() as u64;
let split_ids = splits.iter().map(|split| split.split_id()).join(",");
info!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer");
warn!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, delta=?checkpoint_delta.source_delta, index=?self.indexer_state.pipeline_id.index_uid.index_id(), pipeline_id=%self.indexer_state.pipeline_id.pipeline_uid, "send-to-index-serializer");
ctx.send_message(
&self.index_serializer_mailbox,
IndexedSplitBatchBuilder {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use quickwit_proto::metastore::{
use quickwit_proto::types::ShardId;
use quickwit_storage::{Storage, StorageResolver};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, warn};

use super::MergePlanner;
use crate::actors::doc_processor::DocProcessor;
Expand Down Expand Up @@ -560,6 +560,7 @@ impl Handler<AssignShards> for IndexingPipeline {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.shard_ids = assign_shards_message.0.shard_ids.clone();
warn!(shard_ids=?self.shard_ids, index=self.params.pipeline_id.index_uid.index_id(), pipeline=%self.params.pipeline_id.pipeline_uid.0, "assign shards");
// If the pipeline is running, we forward the message to its source.
// If it is not, it will be respawned soon, and the shards will be assigned afterward.
if let Some(handles) = &mut self.handles_opt {
Expand Down

0 comments on commit 5de8b26

Please sign in to comment.