From 17e911c5710035c1899c08ed785299fd0e63332a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 18 Nov 2023 14:40:30 +0900 Subject: [PATCH] Plugging the shard cleanup to the control plane. Closes #4056 --- .../src/control_plane.rs | 67 +++++++++++++++++-- .../src/control_plane_model.rs | 1 - quickwit/quickwit-indexing/src/lib.rs | 4 +- .../src/models/shard_positions.rs | 7 +- .../src/source/ingest/mod.rs | 2 +- .../file_backed_index/shards.rs | 24 +++---- quickwit/quickwit-serve/src/lib.rs | 38 +++++++++-- 7 files changed, 114 insertions(+), 29 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index f04c9548c85..ac3e9c69828 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -21,6 +21,7 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; +use fnv::FnvHashSet; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe, }; @@ -34,10 +35,11 @@ use quickwit_proto::control_plane::{ use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::metastore::{ serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest, - CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse, MetastoreError, - MetastoreService, MetastoreServiceClient, ToggleSourceRequest, + CreateIndexResponse, DeleteIndexRequest, DeleteShardsRequest, DeleteShardsSubrequest, + DeleteSourceRequest, EmptyResponse, MetastoreError, MetastoreService, MetastoreServiceClient, + ToggleSourceRequest, }; -use quickwit_proto::types::{IndexUid, NodeId}; +use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid}; use serde::Serialize; use tracing::error; @@ -136,15 +138,67 @@ impl Actor for ControlPlane { } } +impl ControlPlane { + async fn delete_shards( + &mut self, + source_uid: &SourceUid, + shards: &[ShardId], + ) -> anyhow::Result<()> { + let delete_shard_sub_request = DeleteShardsSubrequest { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.to_string(), + shard_ids: shards.to_vec(), + }; + let delete_shard_request = DeleteShardsRequest { + subrequests: vec![delete_shard_sub_request], + force: false, + }; + // We use a tiny bit different strategy here than for other handlers + // All metastore errors end up fail/respawn the control plane. + // + // This is because deleting shards is done in reaction to an event + // and we do not really have the freedom to return an error to a caller like for other + // calls: there is no caller. + self.metastore + .delete_shards(delete_shard_request) + .await + .context("failed to delete shards in metastore")?; + self.model + .delete_shards(&source_uid.index_uid, &source_uid.source_id, shards); + self.indexing_scheduler + .schedule_indexing_plan_if_needed(&self.model); + Ok(()) + } +} + #[async_trait] impl Handler for ControlPlane { type Reply = (); async fn handle( &mut self, - shard_positions: ShardPositionsUpdate, - ctx: &ActorContext, + shard_positions_update: ShardPositionsUpdate, + _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + let known_shard_ids: FnvHashSet = self + .model + .list_shards(&shard_positions_update.source_uid) + .into_iter() + .collect(); + // let's identify the shard that have reached EOF but have not yet been removed. + let shard_ids_to_close: Vec = shard_positions_update + .shard_positions + .into_iter() + .filter(|(shard_id, position)| { + (position == &Position::Eof) && known_shard_ids.contains(shard_id) + }) + .map(|(shard_id, _position)| shard_id) + .collect(); + if shard_ids_to_close.is_empty() { + return Ok(()); + } + self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close[..]) + .await?; Ok(()) } } @@ -246,6 +300,9 @@ impl Handler for ControlPlane { self.model.add_index(index_metadata); + self.indexing_scheduler + .schedule_indexing_plan_if_needed(&self.model); + let response = CreateIndexResponse { index_uid: index_uid.into(), }; diff --git a/quickwit/quickwit-control-plane/src/control_plane_model.rs b/quickwit/quickwit-control-plane/src/control_plane_model.rs index d96fdcb76ae..bef904f1cb7 100644 --- a/quickwit/quickwit-control-plane/src/control_plane_model.rs +++ b/quickwit/quickwit-control-plane/src/control_plane_model.rs @@ -276,7 +276,6 @@ impl ControlPlaneModel { } /// Removes the shards identified by their index UID, source ID, and shard IDs. - #[allow(dead_code)] // Will remove this in a future PR. pub fn delete_shards( &mut self, index_uid: &IndexUid, diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index f4ccb5c60ff..47ef1d0e188 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -34,7 +34,7 @@ pub use crate::actors::{ Sequencer, SplitsUpdateMailbox, }; pub use crate::controlled_directory::ControlledDirectory; -use crate::models::{IndexingStatistics, ShardPositionsService}; +use crate::models::IndexingStatistics; pub use crate::split_store::{get_tantivy_directory_from_split_bundle, IndexingSplitStore}; pub mod actors; @@ -77,8 +77,6 @@ pub async fn start_indexing_service( ) -> anyhow::Result> { info!("starting indexer service"); - ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone()); - // Spawn indexing service. let indexing_service = IndexingService::new( config.node_id.clone(), diff --git a/quickwit/quickwit-indexing/src/models/shard_positions.rs b/quickwit/quickwit-indexing/src/models/shard_positions.rs index 93c2095221c..3a481e7699f 100644 --- a/quickwit/quickwit-indexing/src/models/shard_positions.rs +++ b/quickwit/quickwit-indexing/src/models/shard_positions.rs @@ -127,7 +127,7 @@ impl Actor for ShardPositionsService { return; } }; - if let Err(_) = mailbox.try_send_message(shard_positions) { + if mailbox.try_send_message(shard_positions).is_err() { error!("failed to send shard positions to the shard positions service"); } }) @@ -144,7 +144,10 @@ impl ShardPositionsService { spawn_ctx.spawn_builder().spawn(shard_positions_service); event_broker .subscribe_fn::(move |update| { - if let Err(_) = shard_positions_service_mailbox.try_send_message(update) { + if shard_positions_service_mailbox + .try_send_message(update) + .is_err() + { error!("failed to send update to shard positions service"); } }) diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 090c2d0e4ad..7d10fa128da 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -794,7 +794,7 @@ mod tests { let expected_local_update = LocalShardPositionsUpdate::new( SourceUid { index_uid: IndexUid::parse("test-index:0").unwrap(), - source_id: "test_source".to_string(), + source_id: "test-source".to_string(), }, vec![(1, Position::Eof)], ); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs index 175e0db1ec0..d2786b67008 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs @@ -212,23 +212,21 @@ impl Shards { force: bool, ) -> MetastoreResult> { let mut mutation_occurred = false; - for shard_id in subrequest.shard_ids { if let Entry::Occupied(entry) = self.shards.entry(shard_id) { let shard = entry.get(); - if force || shard.publish_position_inclusive() == Position::Eof { - mutation_occurred = true; - info!( - index_id=%self.index_uid.index_id(), - source_id=%self.source_id, - shard_id=%shard.shard_id, - "deleted shard", - ); - entry.remove(); - continue; + if !force && shard.publish_position_inclusive() != Position::Eof { + let message = format!("shard `{shard_id}` is not deletable"); + return Err(MetastoreError::InvalidArgument { message }); } - let message = format!("shard `{shard_id}` is not deletable"); - return Err(MetastoreError::InvalidArgument { message }); + info!( + index_id=%self.index_uid.index_id(), + source_id=%self.source_id, + shard_id=%shard.shard_id, + "deleted shard", + ); + entry.remove(); + mutation_occurred = true; } } Ok(MutationOccurred::from(mutation_occurred)) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index bf4c22591e9..9325eb63423 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -66,6 +66,7 @@ use quickwit_control_plane::control_plane::ControlPlane; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; use quickwit_indexing::actors::IndexingService; +use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ start_ingest_api_service, wait_for_ingester_decommission, GetMemoryCapacity, IngestApiService, @@ -77,7 +78,7 @@ use quickwit_metastore::{ }; use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService}; use quickwit_proto::control_plane::ControlPlaneServiceClient; -use quickwit_proto::indexing::IndexingServiceClient; +use quickwit_proto::indexing::{IndexingServiceClient, ShardPositionsUpdate}; use quickwit_proto::ingest::ingester::IngesterServiceClient; use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_proto::metastore::{ @@ -210,6 +211,7 @@ async fn start_control_plane_if_needed( universe: &Universe, indexer_pool: &IndexerPool, ingester_pool: &IngesterPool, + event_broker: &EventBroker, ) -> anyhow::Result { if node_config.is_service_enabled(QuickwitService::ControlPlane) { check_cluster_configuration( @@ -220,7 +222,7 @@ async fn start_control_plane_if_needed( .await?; let cluster_id = cluster.cluster_id().to_string(); - let self_node_id = cluster.self_node_id().to_string(); + let self_node_id: NodeId = cluster.self_node_id().to_string().into(); let replication_factor = node_config .ingest_api_config @@ -235,6 +237,7 @@ async fn start_control_plane_if_needed( ingester_pool.clone(), metastore_client.clone(), replication_factor, + event_broker, ) .await?; Ok(ControlPlaneServiceClient::from_mailbox( @@ -318,9 +321,21 @@ pub async fn serve_quickwit( &universe, &indexer_pool, &ingester_pool, + &event_broker, ) .await?; + // If one of the two following service is enabled, we need to enable the shard position service: + // - the control plane: as it is in charge of cleaning up shard reach eof. + // - the indexer: as it hosts ingesters, and ingesters use the shard positions to truncate + // their the queue associated to shards in mrecordlog, and publish indexers' progress to + // chitchat. + if node_config.is_service_enabled(QuickwitService::Indexer) + || node_config.is_service_enabled(QuickwitService::ControlPlane) + { + ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone()); + } + // Set up the "control plane proxy" for the metastore. let metastore_through_control_plane = MetastoreServiceClient::new(ControlPlaneMetastore::new( control_plane_service.clone(), @@ -676,16 +691,17 @@ async fn setup_searcher( Ok((search_job_placer, search_service)) } +#[allow(clippy::too_many_arguments)] async fn setup_control_plane( universe: &Universe, cluster_id: String, - self_node_id: String, + self_node_id: NodeId, indexer_pool: IndexerPool, ingester_pool: IngesterPool, metastore: MetastoreServiceClient, replication_factor: usize, + event_broker: &EventBroker, ) -> anyhow::Result> { - let self_node_id: NodeId = self_node_id.into(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( universe, cluster_id, @@ -695,6 +711,20 @@ async fn setup_control_plane( metastore, replication_factor, ); + let weak_control_plane_mailbox = control_plane_mailbox.downgrade(); + event_broker + .subscribe_fn::(move |shard_positions_update| { + let Some(control_plane_mailbox) = weak_control_plane_mailbox.upgrade() else { + return; + }; + if control_plane_mailbox + .try_send_message(shard_positions_update) + .is_err() + { + error!("failed to send shard positions update to control plane"); + } + }) + .forever(); Ok(control_plane_mailbox) }