Skip to content

Commit

Permalink
Issue/4469 limit concurrent merges (#4473)
Browse files Browse the repository at this point in the history
* Introducing the merge scheduler to schedule merge operation in a global
manner.

Closes #4469 #4471
  • Loading branch information
fulmicoton authored Jan 30, 2024
1 parent a3cad8f commit 179ca81
Show file tree
Hide file tree
Showing 39 changed files with 873 additions and 565 deletions.
17 changes: 12 additions & 5 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,20 @@ impl<A: Actor> Inbox<A> {
self.rx.try_recv()
}

pub async fn recv_typed_message<M: 'static>(&self) -> Option<M> {
while let Ok(mut envelope) = self.rx.recv().await {
if let Some(msg) = envelope.message_typed() {
return Some(msg);
#[cfg(any(test, feature = "testsuite"))]
pub async fn recv_typed_message<M: 'static>(&self) -> Result<M, RecvError> {
loop {
match self.rx.recv().await {
Ok(mut envelope) => {
if let Some(msg) = envelope.message_typed() {
return Ok(msg);
}
}
Err(err) => {
return Err(err);
}
}
}
None
}

/// Destroys the inbox and returns the list of pending messages or commands
Expand Down
47 changes: 47 additions & 0 deletions quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::ops::Mul;
use std::time::Duration;

use async_trait::async_trait;
use quickwit_common::new_coolid;
use serde::Serialize;

use crate::observation::ObservationType;
Expand Down Expand Up @@ -725,3 +726,49 @@ async fn test_unsync_actor_message() {

universe.assert_quit().await;
}

struct FakeActorService {
// We use a cool id to make sure in the test that we get twice the same instance.
cool_id: String,
}

#[derive(Debug)]
struct GetCoolId;

impl Actor for FakeActorService {
type ObservableState = ();

fn observable_state(&self) {}
}

#[async_trait]
impl Handler<GetCoolId> for FakeActorService {
type Reply = String;

async fn handle(
&mut self,
_: GetCoolId,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self.cool_id.clone())
}
}

impl Default for FakeActorService {
fn default() -> Self {
FakeActorService {
cool_id: new_coolid("fake-actor"),
}
}
}

#[tokio::test]
async fn test_get_or_spawn() {
let universe = Universe::new();
let mailbox1: Mailbox<FakeActorService> = universe.get_or_spawn_one();
let id1 = mailbox1.ask(GetCoolId).await.unwrap();
let mailbox2: Mailbox<FakeActorService> = universe.get_or_spawn_one();
let id2 = mailbox2.ask(GetCoolId).await.unwrap();
assert_eq!(id1, id2);
universe.assert_quit().await;
}
10 changes: 10 additions & 0 deletions quickwit/quickwit-actors/src/universe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ impl Universe {
self.spawn_ctx.registry.get_one::<A>()
}

pub fn get_or_spawn_one<A: Actor + Default>(&self) -> Mailbox<A> {
if let Some(actor_mailbox) = self.spawn_ctx.registry.get_one::<A>() {
actor_mailbox
} else {
let actor_default = A::default();
let (mailbox, _handler) = self.spawn_builder().spawn(actor_default);
mailbox
}
}

pub async fn observe(&self, timeout: Duration) -> Vec<ActorObservation> {
self.spawn_ctx.registry.observe(timeout).await
}
Expand Down
16 changes: 10 additions & 6 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe};
use quickwit_cluster::{ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::runtimes::RuntimesConfig;
Expand All @@ -40,7 +40,9 @@ use quickwit_config::{
VecSourceParams, CLI_INGEST_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergePipelineId};
use quickwit_indexing::actors::{
IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService,
};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
Expand Down Expand Up @@ -451,6 +453,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let universe = Universe::new();
let merge_scheduler_service_mailbox = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id.clone(),
config.data_dir_path.clone(),
Expand All @@ -459,12 +463,12 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
merge_scheduler_service_mailbox,
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
)
.await?;
let universe = Universe::new();
let (indexing_server_mailbox, indexing_server_handle) =
universe.spawn_builder().spawn(indexing_server);
let pipeline_id = indexing_server_mailbox
Expand Down Expand Up @@ -580,10 +584,9 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let indexer_config = IndexerConfig::default();
let universe = Universe::new();
let indexer_config = IndexerConfig {
..Default::default()
};
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id,
config.data_dir_path,
Expand All @@ -592,6 +595,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
cluster,
metastore,
None,
merge_scheduler_service,
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"split_store_max_num_bytes": "1T",
"split_store_max_num_splits": 10000,
"max_concurrent_split_uploads": 8,
"max_merge_write_throughput": "100mb"
"max_merge_write_throughput": "100mb",
"merge_concurrency": 2
},
"ingest_api": {
"replication_factor": 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ split_store_max_num_bytes = "1T"
split_store_max_num_splits = 10_000
max_concurrent_split_uploads = 8
max_merge_write_throughput = "100mb"
merge_concurrency = 2

[ingest_api]
replication_factor = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ indexer:
split_store_max_num_splits: 10000
max_concurrent_split_uploads: 8
max_merge_write_throughput: 100mb
merge_concurrency: 2

ingest_api:
replication_factor: 2
Expand Down
27 changes: 27 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct IndexerConfig {
/// does not starve indexing itself (as it is a latency sensitive operation).
#[serde(default)]
pub max_merge_write_throughput: Option<ByteSize>,
/// Maximum number of merge or delete operation that can be executed concurrently.
/// (defaults to num_cpu / 2).
#[serde(default = "IndexerConfig::default_merge_concurrency")]
pub merge_concurrency: NonZeroUsize,
/// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry
/// Protocol (OTLP).
#[serde(default = "IndexerConfig::default_enable_otlp_endpoint")]
Expand Down Expand Up @@ -134,6 +138,10 @@ impl IndexerConfig {
1_000
}

pub fn default_merge_concurrency() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get() / 2).unwrap_or(NonZeroUsize::new(1).unwrap())
}

fn default_cpu_capacity() -> CpuCapacity {
CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32)
}
Expand All @@ -149,6 +157,7 @@ impl IndexerConfig {
max_concurrent_split_uploads: 4,
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
max_merge_write_throughput: None,
merge_concurrency: NonZeroUsize::new(3).unwrap(),
};
Ok(indexer_config)
}
Expand All @@ -163,6 +172,7 @@ impl Default for IndexerConfig {
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
cpu_capacity: Self::default_cpu_capacity(),
merge_concurrency: Self::default_merge_concurrency(),
max_merge_write_throughput: None,
}
}
Expand Down Expand Up @@ -476,6 +486,23 @@ mod tests {
"1500m"
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"merge_concurrency: 5"#).unwrap();
assert_eq!(
indexer_config.merge_concurrency,
NonZeroUsize::new(5).unwrap()
);
let indexer_config_json = serde_json::to_value(&indexer_config).unwrap();
assert_eq!(
indexer_config_json
.get("merge_concurrency")
.unwrap()
.as_u64()
.unwrap(),
5
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ pub fn node_config_for_test() -> NodeConfig {
mod tests {
use std::env;
use std::net::Ipv4Addr;
use std::num::NonZeroU64;
use std::num::{NonZeroU64, NonZeroUsize};
use std::path::Path;

use bytesize::ByteSize;
Expand Down Expand Up @@ -554,6 +554,7 @@ mod tests {
split_store_max_num_bytes: ByteSize::tb(1),
split_store_max_num_splits: 10_000,
max_concurrent_split_uploads: 8,
merge_concurrency: NonZeroUsize::new(2).unwrap(),
cpu_capacity: IndexerConfig::default_cpu_capacity(),
enable_cooperative_indexing: false,
max_merge_write_throughput: Some(ByteSize::mb(100)),
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/index_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
checkpoint_delta_opt: batch_builder.checkpoint_delta_opt,
publish_lock: batch_builder.publish_lock,
publish_token_opt: batch_builder.publish_token_opt,
merge_operation_opt: None,
merge_task_opt: None,
batch_parent_span: batch_builder.batch_parent_span,
};
ctx.send_message(&self.packager_mailbox, indexed_split_batch)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ mod tests {
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
event_broker: Default::default(),
};
let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx());
Expand Down
12 changes: 11 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use tokio::sync::Semaphore;
use tracing::{debug, error, info, warn};

use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::MergePlanner;
use super::{MergePlanner, MergeSchedulerService};
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
Expand Down Expand Up @@ -121,6 +121,7 @@ pub struct IndexingService {
cluster: Cluster,
metastore: MetastoreServiceClient,
ingest_api_service_opt: Option<Mailbox<IngestApiService>>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
ingester_pool: IngesterPool,
storage_resolver: StorageResolver,
indexing_pipelines: HashMap<PipelineUid, PipelineHandle>,
Expand Down Expand Up @@ -154,6 +155,7 @@ impl IndexingService {
cluster: Cluster,
metastore: MetastoreServiceClient,
ingest_api_service_opt: Option<Mailbox<IngestApiService>>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
ingester_pool: IngesterPool,
storage_resolver: StorageResolver,
event_broker: EventBroker,
Expand Down Expand Up @@ -182,6 +184,7 @@ impl IndexingService {
cluster,
metastore,
ingest_api_service_opt,
merge_scheduler_service,
ingester_pool,
storage_resolver,
local_split_store: Arc::new(local_split_store),
Expand Down Expand Up @@ -297,6 +300,7 @@ impl IndexingService {
indexing_directory: indexing_directory.clone(),
metastore: self.metastore.clone(),
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
Expand Down Expand Up @@ -893,6 +897,7 @@ mod tests {
init_ingest_api(universe, &queues_dir_path, &IngestApiConfig::default())
.await
.unwrap();
let merge_scheduler_mailbox: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
"test-node".to_string(),
data_dir_path.to_path_buf(),
Expand All @@ -901,6 +906,7 @@ mod tests {
cluster,
metastore,
Some(ingest_api_service),
merge_scheduler_mailbox,
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Expand Down Expand Up @@ -1345,6 +1351,7 @@ mod tests {
init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default())
.await
.unwrap();
let merge_scheduler_service = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
"test-node".to_string(),
data_dir_path,
Expand All @@ -1353,6 +1360,7 @@ mod tests {
cluster.clone(),
metastore.clone(),
Some(ingest_api_service),
merge_scheduler_service,
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Expand Down Expand Up @@ -1548,6 +1556,7 @@ mod tests {
let indexer_config = IndexerConfig::for_test().unwrap();
let num_blocking_threads = 1;
let storage_resolver = StorageResolver::unconfigured();
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let mut indexing_server = IndexingService::new(
"test-ingest-api-gc-node".to_string(),
data_dir_path,
Expand All @@ -1556,6 +1565,7 @@ mod tests {
cluster.clone(),
metastore.clone(),
Some(ingest_api_service.clone()),
merge_scheduler_service,
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Expand Down
Loading

0 comments on commit 179ca81

Please sign in to comment.