Skip to content

Commit

Permalink
Refactoring introducing the Cpu capacity.
Browse files Browse the repository at this point in the history
This new CpuCapacity type is used to both expressed
the available capacity in indexer nodes, and
the load associated on shards.

As a result, the quantity we use for the latter
is semantically shifted from the CPU spent on the
indexer actor, to the estimated CPU cost of an
entire pipeline. (x4)
  • Loading branch information
fulmicoton committed Nov 8, 2023
1 parent e302b9a commit 2ee9fca
Show file tree
Hide file tree
Showing 15 changed files with 399 additions and 182 deletions.
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use quickwit_indexing::models::{
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::NodeId;
Expand Down Expand Up @@ -938,7 +939,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
enabled_services: HashSet::new(),
gossip_advertise_addr: config.gossip_advertise_addr,
grpc_advertise_addr: config.grpc_advertise_addr,
indexing_capacity: 0,
indexing_cpu_capacity: CpuCapacity::zero(),
indexing_tasks: Vec::new(),
};
let cluster = Cluster::join(
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ pub async fn create_cluster_for_test_with_id(
transport: &dyn Transport,
self_node_readiness: bool,
) -> anyhow::Result<Cluster> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], node_id).into();
let node_id: NodeId = format!("node_{node_id}").into();
let self_node = ClusterMember {
Expand All @@ -527,7 +528,7 @@ pub async fn create_cluster_for_test_with_id(
gossip_advertise_addr,
grpc_advertise_addr: grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
indexing_tasks: Vec::new(),
indexing_capacity: 1_000,
indexing_cpu_capacity: PIPELINE_FULL_CAPACITY,
};
let failure_detector_config = create_failure_detector_config_for_test();
let cluster = Cluster::join(
Expand Down
13 changes: 7 additions & 6 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use chitchat::transport::UdpTransport;
use chitchat::FailureDetectorConfig;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;

pub use crate::change::ClusterChange;
#[cfg(any(test, feature = "testsuite"))]
pub use crate::cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test};
pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
pub use crate::member::{ClusterMember, INDEXING_CAPACITY_KEY};
pub use crate::member::{ClusterMember, INDEXING_CPU_CAPACITY_KEY};
pub use crate::node::ClusterNode;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -68,10 +69,10 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
let node_id: NodeId = node_config.node_id.clone().into();
let generation_id = GenerationId::now();
let is_ready = false;
let indexing_capacity = if node_config.is_service_enabled(QuickwitService::Indexer) {
node_config.indexer_config.capacity
let indexing_cpu_capacity = if node_config.is_service_enabled(QuickwitService::Indexer) {
node_config.indexer_config.cpu_capacity
} else {
0u32
CpuCapacity::zero()
};
let self_node = ClusterMember {
node_id,
Expand All @@ -81,7 +82,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
gossip_advertise_addr: node_config.gossip_advertise_addr,
grpc_advertise_addr: node_config.grpc_advertise_addr,
indexing_tasks,
indexing_capacity,
indexing_cpu_capacity,
};
let cluster = Cluster::join(
cluster_id,
Expand All @@ -97,7 +98,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
.contains(&QuickwitService::Indexer)
{
cluster
.set_self_key_value(INDEXING_CAPACITY_KEY, indexing_capacity)
.set_self_key_value(INDEXING_CPU_CAPACITY_KEY, indexing_cpu_capacity)
.await;
}
Ok(cluster)
Expand Down
24 changes: 12 additions & 12 deletions quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

use std::collections::HashSet;
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::{anyhow, Context};
use chitchat::{ChitchatId, NodeState};
use itertools::Itertools;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::{error, warn};

Expand All @@ -43,7 +44,7 @@ pub(crate) const READINESS_KEY: &str = "readiness";
pub(crate) const READINESS_VALUE_READY: &str = "READY";
pub(crate) const READINESS_VALUE_NOT_READY: &str = "NOT_READY";

pub const INDEXING_CAPACITY_KEY: &str = "indexing_capacity";
pub const INDEXING_CPU_CAPACITY_KEY: &str = "indexing_cpu_capacity";

pub(crate) trait NodeStateExt {
fn grpc_advertise_addr(&self) -> anyhow::Result<SocketAddr>;
Expand Down Expand Up @@ -93,9 +94,8 @@ pub struct ClusterMember {
/// None if the node is not an indexer or the indexer has not yet started some indexing
/// pipelines.
pub indexing_tasks: Vec<IndexingTask>,
/// Indexing capacity of the node expressed in milli pipeline.
/// An indexer able to index 4 pipelines at full capacity should have a value of 4_000.
pub indexing_capacity: u32,
/// Indexing cpu capacity of the node expressed in milli cpu.
pub indexing_cpu_capacity: CpuCapacity,
pub is_ready: bool,
}

Expand All @@ -115,15 +115,15 @@ impl From<ClusterMember> for ChitchatId {
}
}

fn parse_indexing_capacity(node_state: &NodeState) -> u32 {
let Some(indexing_capacity_str) = node_state.get(INDEXING_CAPACITY_KEY) else {
return 0;
fn parse_indexing_cpu_capacity(node_state: &NodeState) -> CpuCapacity {
let Some(indexing_capacity_str) = node_state.get(INDEXING_CPU_CAPACITY_KEY) else {
return CpuCapacity::zero();
};
if let Ok(indexing_capacity) = indexing_capacity_str.parse::<u32>() {
if let Ok(indexing_capacity) = CpuCapacity::from_str(indexing_capacity_str) {
indexing_capacity
} else {
error!(indexing_capacity=?indexing_capacity_str, "Received an unparseable indexing capacity from node.");
0
CpuCapacity::zero()
}
}

Expand All @@ -147,7 +147,7 @@ pub(crate) fn build_cluster_member(
})?;
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state, &chitchat_id.node_id);
let indexing_capacity = parse_indexing_capacity(node_state);
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
generation_id: chitchat_id.generation_id.into(),
Expand All @@ -156,7 +156,7 @@ pub(crate) fn build_cluster_member(
gossip_advertise_addr: chitchat_id.gossip_advertise_addr,
grpc_advertise_addr,
indexing_tasks,
indexing_capacity,
indexing_cpu_capacity,
};
Ok(member)
}
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use chitchat::{ChitchatId, NodeState};
use quickwit_config::service::QuickwitService;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use tonic::transport::Channel;

use crate::member::build_cluster_member;
Expand All @@ -49,7 +49,7 @@ impl ClusterNode {
enabled_services: member.enabled_services,
grpc_advertise_addr: member.grpc_advertise_addr,
indexing_tasks: member.indexing_tasks,
indexing_capacity: member.indexing_capacity,
indexing_capacity: member.indexing_cpu_capacity,
is_ready: member.is_ready,
is_self_node,
};
Expand Down Expand Up @@ -113,7 +113,7 @@ impl ClusterNode {
&self.inner.indexing_tasks
}

pub fn indexing_capacity(&self) -> u32 {
pub fn indexing_capacity(&self) -> CpuCapacity {
self.inner.indexing_capacity
}

Expand Down Expand Up @@ -154,7 +154,7 @@ struct InnerNode {
enabled_services: HashSet<QuickwitService>,
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
indexing_capacity: u32,
indexing_capacity: CpuCapacity,
is_ready: bool,
is_self_node: bool,
}
66 changes: 53 additions & 13 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use anyhow::{bail, ensure};
use byte_unit::Byte;
use quickwit_common::net::HostAddr;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
use serde::{Deserialize, Serialize};
use tracing::warn;

Expand All @@ -55,8 +56,8 @@ pub struct IndexerConfig {
pub enable_otlp_endpoint: bool,
#[serde(default = "IndexerConfig::default_enable_cooperative_indexing")]
pub enable_cooperative_indexing: bool,
#[serde(default = "IndexerConfig::default_capacity")]
pub capacity: u32,
#[serde(default = "IndexerConfig::default_cpu_capacity")]
pub cpu_capacity: CpuCapacity,
}

impl IndexerConfig {
Expand All @@ -82,8 +83,8 @@ impl IndexerConfig {
/// Default capacity expressed in milli-"number of pipelines".
/// 4_000 means 4 pipeline at full capacity.
// TODO add some validation.
fn default_capacity() -> u32 {
num_cpus::get() as u32 * 1000u32 / 4u32
fn default_cpu_capacity() -> CpuCapacity {
CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32)
}

pub fn default_split_store_max_num_bytes() -> Byte {
Expand All @@ -96,13 +97,14 @@ impl IndexerConfig {

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> anyhow::Result<Self> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
let indexer_config = IndexerConfig {
enable_cooperative_indexing: false,
enable_otlp_endpoint: true,
split_store_max_num_bytes: Byte::from_bytes(1_000_000),
split_store_max_num_splits: 3,
max_concurrent_split_uploads: 4,
capacity: 1_000,
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
};
Ok(indexer_config)
}
Expand All @@ -116,7 +118,7 @@ impl Default for IndexerConfig {
split_store_max_num_bytes: Self::default_split_store_max_num_bytes(),
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
capacity: Self::default_capacity(),
cpu_capacity: Self::default_cpu_capacity(),
}
}
}
Expand Down Expand Up @@ -382,15 +384,53 @@ impl NodeConfig {

#[cfg(test)]
mod tests {
use quickwit_proto::indexing::CpuCapacity;

use crate::IndexerConfig;

#[test]
fn test_default_indexing_capacity() {
let default_indexer_config = IndexerConfig::default();
let capacity = default_indexer_config.capacity;
assert_eq!(capacity % 250, 0);
let quarter_of_pipeline = capacity / 250;
assert!(quarter_of_pipeline > 0);
assert!(quarter_of_pipeline < 64);
fn test_index_config_serialization() {
{
let indexer_config: IndexerConfig = serde_json::from_str(r#"{}"#).unwrap();
assert_eq!(&indexer_config, &IndexerConfig::default());
assert_eq!(
indexer_config.cpu_capacity,
CpuCapacity::from_cpu_millis(8000)
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"cpu_capacity: 1.5"#).unwrap();
assert_eq!(
indexer_config.cpu_capacity,
CpuCapacity::from_cpu_millis(1500)
);
let indexer_config_json = serde_json::to_value(&indexer_config).unwrap();
assert_eq!(
indexer_config_json
.get("cpu_capacity")
.unwrap()
.as_str()
.unwrap(),
"1500m"
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap();
assert_eq!(
indexer_config.cpu_capacity,
CpuCapacity::from_cpu_millis(1500)
);
let indexer_config_json = serde_json::to_value(&indexer_config).unwrap();
assert_eq!(
indexer_config_json
.get("cpu_capacity")
.unwrap()
.as_str()
.unwrap(),
"1500m"
);
}
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ mod tests {
split_store_max_num_bytes: Byte::from_str("1T").unwrap(),
split_store_max_num_splits: 10_000,
max_concurrent_split_uploads: 8,
capacity: IndexerConfig::default_capacity(),
cpu_capacity: IndexerConfig::default_cpu_capacity(),
enable_cooperative_indexing: false,
}
);
Expand Down
Loading

0 comments on commit 2ee9fca

Please sign in to comment.