Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing the node capacity via chitchat #4083

Merged
merged 3 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ base64 = "0.21"
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = "0.6"
chitchat = "0.7"
chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] }
clap = { version = "4.4.1", features = ["env", "string"] }
colored = "2.0.0"
Expand Down
19 changes: 11 additions & 8 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use quickwit_indexing::models::{
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::NodeId;
Expand Down Expand Up @@ -931,15 +932,16 @@ impl ThroughputCalculator {

async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
let node_id: NodeId = config.node_id.clone().into();
let self_node = ClusterMember::new(
let self_node = ClusterMember {
node_id,
quickwit_cluster::GenerationId::now(),
false,
HashSet::new(),
config.gossip_advertise_addr,
config.grpc_advertise_addr,
Vec::new(),
);
generation_id: quickwit_cluster::GenerationId::now(),
is_ready: false,
enabled_services: HashSet::new(),
gossip_advertise_addr: config.gossip_advertise_addr,
grpc_advertise_addr: config.grpc_advertise_addr,
indexing_cpu_capacity: CpuCapacity::zero(),
indexing_tasks: Vec::new(),
};
let cluster = Cluster::join(
config.cluster_id.clone(),
self_node,
Expand All @@ -949,5 +951,6 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
&ChannelTransport::default(),
)
.await?;

Ok(cluster)
}
20 changes: 11 additions & 9 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -229,7 +229,7 @@ impl Cluster {
}

/// Sets a key-value pair on the cluster node's state.
pub async fn set_self_key_value<K: Into<String>, V: Into<String>>(&self, key: K, value: V) {
pub async fn set_self_key_value(&self, key: impl Display, value: impl Display) {
self.chitchat()
.await
.lock()
Expand Down Expand Up @@ -517,17 +517,19 @@ pub async fn create_cluster_for_test_with_id(
transport: &dyn Transport,
self_node_readiness: bool,
) -> anyhow::Result<Cluster> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], node_id).into();
let node_id: NodeId = format!("node_{node_id}").into();
let self_node = ClusterMember::new(
let self_node = ClusterMember {
node_id,
crate::GenerationId(1),
self_node_readiness,
enabled_services.clone(),
generation_id: crate::GenerationId(1),
is_ready: self_node_readiness,
enabled_services: enabled_services.clone(),
gossip_advertise_addr,
grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
Vec::new(),
);
grpc_advertise_addr: grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
indexing_tasks: Vec::new(),
indexing_cpu_capacity: PIPELINE_FULL_CAPACITY,
};
let failure_detector_config = create_failure_detector_config_for_test();
let cluster = Cluster::join(
cluster_id,
Expand Down
27 changes: 21 additions & 6 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use chitchat::transport::UdpTransport;
use chitchat::FailureDetectorConfig;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;

pub use crate::change::ClusterChange;
#[cfg(any(test, feature = "testsuite"))]
pub use crate::cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test};
pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
pub use crate::member::ClusterMember;
pub use crate::member::{ClusterMember, INDEXING_CPU_CAPACITY_KEY};
pub use crate::node::ClusterNode;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -68,15 +69,21 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
let node_id: NodeId = node_config.node_id.clone().into();
let generation_id = GenerationId::now();
let is_ready = false;
let self_node = ClusterMember::new(
let indexing_cpu_capacity = if node_config.is_service_enabled(QuickwitService::Indexer) {
node_config.indexer_config.cpu_capacity
} else {
CpuCapacity::zero()
};
let self_node = ClusterMember {
node_id,
generation_id,
is_ready,
node_config.enabled_services.clone(),
node_config.gossip_advertise_addr,
node_config.grpc_advertise_addr,
enabled_services: node_config.enabled_services.clone(),
gossip_advertise_addr: node_config.gossip_advertise_addr,
grpc_advertise_addr: node_config.grpc_advertise_addr,
indexing_tasks,
);
indexing_cpu_capacity,
};
let cluster = Cluster::join(
cluster_id,
self_node,
Expand All @@ -86,5 +93,13 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
&UdpTransport,
)
.await?;
if node_config
.enabled_services
.contains(&QuickwitService::Indexer)
{
cluster
.set_self_key_value(INDEXING_CPU_CAPACITY_KEY, indexing_cpu_capacity)
.await;
}
Ok(cluster)
}
53 changes: 26 additions & 27 deletions quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

use std::collections::HashSet;
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::{anyhow, Context};
use chitchat::{ChitchatId, NodeState};
use itertools::Itertools;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::warn;
use tracing::{error, warn};

use crate::{GenerationId, QuickwitService};

Expand All @@ -43,6 +44,8 @@ pub(crate) const READINESS_KEY: &str = "readiness";
pub(crate) const READINESS_VALUE_READY: &str = "READY";
pub(crate) const READINESS_VALUE_NOT_READY: &str = "NOT_READY";

pub const INDEXING_CPU_CAPACITY_KEY: &str = "indexing_cpu_capacity";

pub(crate) trait NodeStateExt {
fn grpc_advertise_addr(&self) -> anyhow::Result<SocketAddr>;

Expand Down Expand Up @@ -91,30 +94,12 @@ pub struct ClusterMember {
/// None if the node is not an indexer or the indexer has not yet started some indexing
/// pipelines.
pub indexing_tasks: Vec<IndexingTask>,
/// Indexing cpu capacity of the node expressed in milli cpu.
pub indexing_cpu_capacity: CpuCapacity,
pub is_ready: bool,
}

impl ClusterMember {
pub fn new(
node_id: NodeId,
generation_id: GenerationId,
is_ready: bool,
enabled_services: HashSet<QuickwitService>,
gossip_advertise_addr: SocketAddr,
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
) -> Self {
Self {
node_id,
generation_id,
is_ready,
enabled_services,
gossip_advertise_addr,
grpc_advertise_addr,
indexing_tasks,
}
}

pub fn chitchat_id(&self) -> ChitchatId {
ChitchatId::new(
self.node_id.clone().into(),
Expand All @@ -130,6 +115,18 @@ impl From<ClusterMember> for ChitchatId {
}
}

fn parse_indexing_cpu_capacity(node_state: &NodeState) -> CpuCapacity {
let Some(indexing_capacity_str) = node_state.get(INDEXING_CPU_CAPACITY_KEY) else {
return CpuCapacity::zero();
};
if let Ok(indexing_capacity) = CpuCapacity::from_str(indexing_capacity_str) {
indexing_capacity
} else {
error!(indexing_capacity=?indexing_capacity_str, "Received an unparseable indexing capacity from node.");
CpuCapacity::zero()
}
}

// Builds a cluster member from a [`NodeState`].
pub(crate) fn build_cluster_member(
chitchat_id: ChitchatId,
Expand All @@ -150,15 +147,17 @@ pub(crate) fn build_cluster_member(
})?;
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
let indexing_tasks = parse_indexing_tasks(node_state, &chitchat_id.node_id);
let member = ClusterMember::new(
chitchat_id.node_id.into(),
chitchat_id.generation_id.into(),
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
let member = ClusterMember {
node_id: chitchat_id.node_id.into(),
generation_id: chitchat_id.generation_id.into(),
is_ready,
enabled_services,
chitchat_id.gossip_advertise_addr,
gossip_advertise_addr: chitchat_id.gossip_advertise_addr,
grpc_advertise_addr,
indexing_tasks,
);
indexing_cpu_capacity,
};
Ok(member)
}

Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use chitchat::{ChitchatId, NodeState};
use quickwit_config::service::QuickwitService;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use tonic::transport::Channel;

use crate::member::build_cluster_member;
Expand All @@ -49,6 +49,7 @@ impl ClusterNode {
enabled_services: member.enabled_services,
grpc_advertise_addr: member.grpc_advertise_addr,
indexing_tasks: member.indexing_tasks,
indexing_capacity: member.indexing_cpu_capacity,
is_ready: member.is_ready,
is_self_node,
};
Expand Down Expand Up @@ -112,6 +113,10 @@ impl ClusterNode {
&self.inner.indexing_tasks
}

pub fn indexing_capacity(&self) -> CpuCapacity {
self.inner.indexing_capacity
}

pub fn is_ready(&self) -> bool {
self.inner.is_ready
}
Expand Down Expand Up @@ -149,6 +154,7 @@ struct InnerNode {
enabled_services: HashSet<QuickwitService>,
grpc_advertise_addr: SocketAddr,
indexing_tasks: Vec<IndexingTask>,
indexing_capacity: CpuCapacity,
is_ready: bool,
is_self_node: bool,
}
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ humantime = { workspace = true }
itertools = { workspace = true }
json_comments = { workspace = true }
new_string_template = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ mod tests {
}

#[test]
fn test_index_config_default_values() {
fn test_indexer_config_default_values() {
let default_index_root_uri = Uri::for_test("s3://defaultbucket/");
{
let index_config_filepath = get_index_config_filepath("minimal-hdfs-logs.yaml");
Expand Down
Loading