Skip to content

Commit

Permalink
Report closed shards and unavailable leaders to control plane (#3934)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 25, 2023
1 parent fdc60f8 commit 63a2dca
Show file tree
Hide file tree
Showing 78 changed files with 1,356 additions and 452 deletions.
361 changes: 199 additions & 162 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::SearchResponse;
use quickwit_proto::NodeId;
use quickwit_proto::types::NodeId;
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::collections::BTreeMap;
use chitchat::{ChitchatId, NodeState};
use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator};
use quickwit_common::tower::{make_channel, warmup_channel};
use quickwit_proto::NodeId;
use quickwit_proto::types::NodeId;
use tonic::transport::Channel;
use tracing::{info, warn};

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use chitchat::{
use futures::Stream;
use itertools::Itertools;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::NodeId;
use quickwit_proto::types::NodeId;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, Mutex, RwLock};
use tokio::time::timeout;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chitchat::transport::UdpTransport;
use chitchat::FailureDetectorConfig;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::NodeId;
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;

pub use crate::change::ClusterChange;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use anyhow::{anyhow, Context};
use chitchat::{ChitchatId, NodeState};
use itertools::Itertools;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::NodeId;
use quickwit_proto::types::NodeId;
use tracing::warn;

use crate::{GenerationId, QuickwitService};
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use quickwit_doc_mapper::{
DefaultDocMapper, DefaultDocMapperBuilder, DocMapper, FieldMappingEntry, Mode, ModeType,
QuickwitJsonOptions, TokenizerEntry,
};
use quickwit_proto::IndexId;
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,9 @@ mod tests {
subrequests: vec![GetOrCreateOpenShardsSubrequest {
index_id: "test-index".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
closed_shards: Vec::new(),
}],
unavailable_ingesters: Vec::new(),
closed_shards: Vec::new(),
unavailable_leaders: Vec::new(),
};
let get_open_shards_response = control_plane_mailbox
.ask_for_res(get_open_shards_request)
Expand Down
87 changes: 75 additions & 12 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ use quickwit_metastore::{
};
use quickwit_proto::control_plane::ControlPlaneResult;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{
EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, MetastoreService,
MetastoreServiceClient,
};
use quickwit_proto::{metastore, IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use serde::Serialize;
use tracing::{error, info};

Expand Down Expand Up @@ -87,7 +88,7 @@ impl ShardTableEntry {
///
/// Upon starts, it loads its entire state from the metastore.
#[derive(Default, Debug)]
pub struct ControlPlaneModel {
pub(crate) struct ControlPlaneModel {
index_uid_table: FnvHashMap<IndexId, IndexUid>,
index_table: FnvHashMap<IndexUid, IndexMetadata>,
shard_table: ShardTable,
Expand Down Expand Up @@ -259,17 +260,31 @@ impl ControlPlaneModel {
.delete_shards(index_uid, source_id, shard_ids);
}

#[cfg(test)]
pub fn shards(&mut self) -> impl Iterator<Item = &Shard> + '_ {
self.shard_table
.table_entries
.values()
.flat_map(|table_entry| table_entry.shards.values())
}

pub fn shards_mut(&mut self) -> impl Iterator<Item = &mut Shard> + '_ {
self.shard_table
.table_entries
.values_mut()
.flat_map(|table_entry| table_entry.shards.values_mut())
}

/// Sets the state of the shards identified by their index UID, source ID, and shard IDs to
/// `Closed`.
#[allow(dead_code)] // Will remove this in a future PR.
pub fn close_shards(
&mut self,
index_uid: &IndexUid,
source_id: &SourceId,
shard_ids: &[ShardId],
) {
) -> Vec<ShardId> {
self.shard_table
.close_shards(index_uid, source_id, shard_ids);
.close_shards(index_uid, source_id, shard_ids)
}

pub fn index_uid(&self, index_id: &str) -> Option<IndexUid> {
Expand Down Expand Up @@ -426,24 +441,29 @@ impl ShardTable {

/// Sets the state of the shards identified by their index UID, source ID, and shard IDs to
/// `Closed`.
#[allow(dead_code)] // Will remove this in a future PR.
pub fn close_shards(
&mut self,
index_uid: &IndexUid,
source_id: &SourceId,
shard_ids: &[ShardId],
) {
) -> Vec<ShardId> {
let mut closed_shard_ids = Vec::new();

let source_uid = SourceUid {
index_uid: index_uid.clone(),
source_id: source_id.clone(),
};
if let Some(table_entry) = self.table_entries.get_mut(&source_uid) {
for shard_id in shard_ids {
if let Some(shard) = table_entry.shards.get_mut(shard_id) {
shard.shard_state = ShardState::Closed as i32;
if !shard.is_closed() {
shard.shard_state = ShardState::Closed as i32;
closed_shard_ids.push(*shard_id);
}
}
}
}
closed_shard_ids
}

/// Removes the shards identified by their index UID, source ID, and shard IDs.
Expand Down Expand Up @@ -623,6 +643,53 @@ mod tests {
assert_eq!(table_entry.next_shard_id, 3);
}

#[test]
fn test_shard_table_close_shards() {
let index_uid_0: IndexUid = "test-index:0".into();
let index_uid_1: IndexUid = "test-index:1".into();
let source_id = "test-source".to_string();

let mut shard_table = ShardTable::default();

let shard_01 = Shard {
index_uid: index_uid_0.clone().into(),
source_id: source_id.clone(),
shard_id: 1,
leader_id: "test-leader-0".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
};
let shard_02 = Shard {
index_uid: index_uid_0.clone().into(),
source_id: source_id.clone(),
shard_id: 2,
leader_id: "test-leader-0".to_string(),
shard_state: ShardState::Closed as i32,
..Default::default()
};
let shard_11 = Shard {
index_uid: index_uid_1.clone().into(),
source_id: source_id.clone(),
shard_id: 1,
leader_id: "test-leader-0".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
};
shard_table.update_shards(&index_uid_0, &source_id, &[shard_01, shard_02], 3);
shard_table.update_shards(&index_uid_0, &source_id, &[shard_11], 2);

let closed_shard_ids = shard_table.close_shards(&index_uid_0, &source_id, &[1, 2, 3]);
assert_eq!(closed_shard_ids, &[1]);

let source_uid_0 = SourceUid {
index_uid: index_uid_0,
source_id,
};
let table_entry = shard_table.table_entries.get(&source_uid_0).unwrap();
let shards = table_entry.shards();
assert_eq!(shards[0].shard_state, ShardState::Closed as i32);
}

#[test]
fn test_shard_table_delete_shards() {
let index_uid_0: IndexUid = "test-index:0".into();
Expand Down Expand Up @@ -770,8 +837,4 @@ mod tests {
assert_eq!(shards.len(), 0);
assert_eq!(table_entry.next_shard_id, 1);
}
#[tokio::test]
async fn test_ingest_controller_close_shards() {
// TODO: Write test when the RPC is actually called by ingesters.
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ mod tests {
};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask};
use quickwit_proto::IndexUid;
use quickwit_proto::types::IndexUid;
use rand::seq::SliceRandom;
use serde_json::json;
use tonic::transport::Endpoint;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/indexing_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::NodeId;
use quickwit_proto::types::NodeId;
use serde::Serialize;
use tracing::{debug, error, info, warn};

Expand Down
Loading

0 comments on commit 63a2dca

Please sign in to comment.