Skip to content

Commit

Permalink
adding more logs to the control plane.
Browse files Browse the repository at this point in the history
These logs are related to the removal of shards.
  • Loading branch information
fulmicoton committed Mar 26, 2024
1 parent 6b3e021 commit bc64037
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 2 deletions.
3 changes: 3 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl ControlPlane {
ingester_pool: IngesterPool,
metastore: MetastoreServiceClient,
) -> (Mailbox<Self>, ActorHandle<Supervisor<Self>>) {
info!("starting control plane");
universe.spawn_builder().supervise_fn(move || {
let cluster_id = cluster_config.cluster_id.clone();
let replication_factor = cluster_config.replication_factor;
Expand Down Expand Up @@ -260,6 +261,7 @@ impl ControlPlane {
shard_ids: &[ShardId],
progress: &Progress,
) -> anyhow::Result<()> {
info!(shard_ids=?shard_ids, "delete shards");
let delete_shards_request = DeleteShardsRequest {
index_uid: Some(source_uid.index_uid.clone()),
source_id: source_uid.source_id.clone(),
Expand Down Expand Up @@ -367,6 +369,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
Some(shard.publish_position_inclusive().max(&position).clone());
if position.is_eof() {
// identify shards that have reached EOF but have not yet been removed.
info!(shard_id=%shard_id, position=?position, "received eof shard via gossip");
shard_ids_to_close.push(shard_id);
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl ControlPlaneModel {

/// Removes the shards identified by their index UID, source ID, and shard IDs.
pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) {
info!(source_uid=%source_uid, shard_ids=?shard_ids, "removing shards from model");
self.shard_table.delete_shards(source_uid, shard_ids);
}

Expand Down
13 changes: 11 additions & 2 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quickwit_common::tower::ConstantRate;
use quickwit_ingest::{RateMibPerSec, ShardInfo, ShardInfos};
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid};
use tracing::{error, warn};
use tracing::{error, info, warn};

/// Limits the number of shards that can be opened for scaling up a source to 5 per minute.
const SCALING_UP_RATE_LIMITER_SETTINGS: RateLimiterSettings = RateLimiterSettings {
Expand Down Expand Up @@ -124,6 +124,7 @@ pub(crate) struct ShardTable {
// Removes the shards from the ingester_shards map.
//
// This function is used to maintain the shard table invariant.
// Logs an error if the shard was not found in the ingester_shards map.
fn remove_shard_from_ingesters_internal(
source_uid: &SourceUid,
shard: &Shard,
Expand All @@ -134,7 +135,13 @@ fn remove_shard_from_ingesters_internal(
.get_mut(&node)
.expect("shard table reached inconsistent state");
let shard_ids = ingester_shards.get_mut(source_uid).unwrap();
shard_ids.remove(shard.shard_id());
let shard_was_removed = shard_ids.remove(shard.shard_id());
if !shard_was_removed {
error!(
"shard table has reached an inconsistent state. shard {shard:?} was removed from \
the shard table but was apparently not in the ingester_shards map."
);
}
}
}

Expand Down Expand Up @@ -470,6 +477,8 @@ impl ShardTable {
shard_entry.set_shard_state(ShardState::Closed);
closed_shard_ids.push(shard_id.clone());
}
} else {
info!(shard=%shard_id, "ignoring attempt to close shard: it is unknown (probably because it has been deleted)");
}
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ impl FullyLockedIngesterState<'_> {
/// Deletes and truncates the shards as directed by the `advise_reset_shards_response` returned
/// by the control plane.
pub async fn reset_shards(&mut self, advise_reset_shards_response: &AdviseResetShardsResponse) {
info!("reset shards");
for shard_ids in &advise_reset_shards_response.shards_to_delete {
for queue_id in shard_ids.queue_ids() {
self.delete_shard(&queue_id).await;
Expand Down

0 comments on commit bc64037

Please sign in to comment.