Skip to content

Commit

Permalink
Solving warn received an empty publish shard positions update.
Browse files Browse the repository at this point in the history
This should remove all warn logging `received an empty publish shard positions update`.
We also start logging an info showing highlighting the race condition
that was causing this warning log.

Closes #4888
  • Loading branch information
fulmicoton committed May 10, 2024
1 parent a20009e commit e843726
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use quickwit_proto::ingest::ingester::{
};
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::metastore::{
AcquireShardsRequest, MetastoreService, MetastoreServiceClient, SourceType,
AcquireShardsRequest, AcquireShardsResponse, MetastoreService, MetastoreServiceClient,
SourceType,
};
use quickwit_proto::types::{
NodeId, PipelineUid, Position, PublishToken, ShardId, SourceId, SourceUid,
Expand Down Expand Up @@ -297,6 +298,9 @@ impl IngestSource {
}

async fn truncate(&mut self, truncate_up_to_positions: Vec<(ShardId, Position)>) {
if truncate_up_to_positions.is_empty() {
return;
}
let shard_positions_update = LocalShardPositionsUpdate::new(
self.client_id.source_uid.clone(),
truncate_up_to_positions.clone(),
Expand Down Expand Up @@ -535,19 +539,35 @@ impl Source for IngestSource {
.filter(|shard_id| !self.assigned_shards.contains_key(shard_id))
.collect();

assert!(!added_shard_ids.is_empty());
info!(added_shards=?added_shard_ids, "adding shards assignment");

let acquire_shards_request = AcquireShardsRequest {
index_uid: Some(self.client_id.source_uid.index_uid.clone()),
source_id: self.client_id.source_uid.source_id.clone(),
shard_ids: added_shard_ids,
shard_ids: added_shard_ids.clone(),
publish_token: self.publish_token.clone(),
};
let acquire_shards_response = ctx
let acquire_shards_response: AcquireShardsResponse = ctx
.protect_future(self.metastore.acquire_shards(acquire_shards_request))
.await
.context("failed to acquire shards")?;

if acquire_shards_response.acquired_shards.len() != added_shard_ids.len() {
let missing_shards = added_shard_ids
.iter()
.filter(|shard_id| {
!acquire_shards_response
.acquired_shards
.iter()
.any(|acquired_shard| acquired_shard.shard_id() == *shard_id)
})
.collect::<Vec<_>>();
// This can happen if the shards have been deleted by the control plane, after building
// the plan and before the apply terminated. See #4888.
info!(missing_shards=?missing_shards, "failed to acquire all assigned shards");
}

let mut truncate_up_to_positions =
Vec::with_capacity(acquire_shards_response.acquired_shards.len());

Expand Down Expand Up @@ -597,6 +617,7 @@ impl Source for IngestSource {
};
self.assigned_shards.insert(shard_id, assigned_shard);
}

self.truncate(truncate_up_to_positions).await;

Ok(())
Expand All @@ -614,9 +635,7 @@ impl Source for IngestSource {
(shard_id, position)
})
.collect();
if !truncate_up_to_positions.is_empty() {
self.truncate(truncate_up_to_positions).await;
}
self.truncate(truncate_up_to_positions).await;
Ok(())
}

Expand Down

0 comments on commit e843726

Please sign in to comment.