Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add api endpoint to get some control-plan internal info #4339

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use quickwit_config::SourceConfig;
use quickwit_ingest::{IngesterPool, LocalShardsUpdate};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::{
ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse,
ControlPlaneError, ControlPlaneResult, GetDebugStateRequest, GetDebugStateResponse,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, PhysicalIndexingPlanEntry,
ShardTableEntry,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -179,6 +180,37 @@ impl ControlPlane {
.schedule_indexing_plan_if_needed(&self.model);
Ok(())
}

fn debug_state(&self) -> GetDebugStateResponse {
let shard_table = self
.model
.all_shards_with_source()
.map(|(source, shards)| ShardTableEntry {
source_id: source.to_string(),
shards: shards
.map(|shard_entry| shard_entry.shard.clone())
.collect(),
})
.collect();
let physical_index_plan = self
.indexing_scheduler
.observable_state()
.last_applied_physical_plan
.map(|plan| {
plan.indexing_tasks_per_indexer()
.iter()
.map(|(node_id, tasks)| PhysicalIndexingPlanEntry {
node_id: node_id.clone(),
tasks: tasks.clone(),
})
.collect()
})
.unwrap_or_default();
GetDebugStateResponse {
shard_table,
physical_index_plan,
}
}
}

#[async_trait]
Expand Down Expand Up @@ -528,6 +560,19 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
}
}

#[async_trait]
impl Handler<GetDebugStateRequest> for ControlPlane {
type Reply = ControlPlaneResult<GetDebugStateResponse>;

async fn handle(
&mut self,
_: GetDebugStateRequest,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(Ok(self.debug_state()))
}
}

#[derive(Clone)]
pub struct ControlPlaneEventSubscriber(WeakMailbox<ControlPlane>);

Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ impl ControlPlaneModel {
self.shard_table.all_shards()
}

pub(crate) fn all_shards_with_source(
&self,
) -> impl Iterator<Item = (&SourceUid, impl Iterator<Item = &ShardEntry>)> + '_ {
self.shard_table.all_shards_with_source()
}

pub fn list_shards_for_node(
&self,
ingester: &NodeId,
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ impl ShardTable {
.flat_map(|table_entry| table_entry.shard_entries.values())
}

pub(crate) fn all_shards_with_source(
&self,
) -> impl Iterator<Item = (&SourceUid, impl Iterator<Item = &ShardEntry>)> + '_ {
self.table_entries
.iter()
.map(|(source, shard_table)| (source, shard_table.shard_entries.values()))
}

pub(crate) fn all_shards_mut(&mut self) -> impl Iterator<Item = &mut ShardEntry> + '_ {
self.table_entries
.values_mut()
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ syntax = "proto3";

package quickwit.control_plane;

import "quickwit/indexing.proto";
import "quickwit/ingest.proto";
import "quickwit/metastore.proto";

Expand Down Expand Up @@ -59,6 +60,9 @@ service ControlPlaneService {
// Returns the list of open shards for one or several sources. If the control plane is not able to find any
// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
rpc GetOrCreateOpenShards(GetOrCreateOpenShardsRequest) returns (GetOrCreateOpenShardsResponse);

// Return some innerstate of the control plane meant to assist debugging.
rpc GetDebugState(GetDebugStateRequest) returns (GetDebugStateResponse);
}

// Shard API
Expand Down Expand Up @@ -99,3 +103,21 @@ message GetOrCreateOpenShardsFailure {
string source_id = 3;
GetOrCreateOpenShardsFailureReason reason = 4;
}

message GetDebugStateRequest {
}

message GetDebugStateResponse {
repeated ShardTableEntry shard_table = 1;
repeated PhysicalIndexingPlanEntry physical_index_plan = 2;
}

message ShardTableEntry {
string source_id = 1;
repeated quickwit.ingest.Shard shards = 2;
}

message PhysicalIndexingPlanEntry {
string node_id = 1;
repeated quickwit.indexing.IndexingTask tasks = 2;
}
Loading