Skip to content

Commit

Permalink
Plugging the shard cleanup to the control plane.
Browse files Browse the repository at this point in the history
Closes #4056
  • Loading branch information
fulmicoton committed Nov 21, 2023
1 parent 8d6e731 commit 17e911c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 29 deletions.
67 changes: 62 additions & 5 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
use fnv::FnvHashSet;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe,
};
Expand All @@ -34,10 +35,11 @@ use quickwit_proto::control_plane::{
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse, MetastoreError,
MetastoreService, MetastoreServiceClient, ToggleSourceRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteShardsRequest, DeleteShardsSubrequest,
DeleteSourceRequest, EmptyResponse, MetastoreError, MetastoreService, MetastoreServiceClient,
ToggleSourceRequest,
};
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid};
use serde::Serialize;
use tracing::error;

Expand Down Expand Up @@ -136,15 +138,67 @@ impl Actor for ControlPlane {
}
}

impl ControlPlane {
async fn delete_shards(
&mut self,
source_uid: &SourceUid,
shards: &[ShardId],
) -> anyhow::Result<()> {
let delete_shard_sub_request = DeleteShardsSubrequest {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
shard_ids: shards.to_vec(),
};
let delete_shard_request = DeleteShardsRequest {
subrequests: vec![delete_shard_sub_request],
force: false,
};
// We use a tiny bit different strategy here than for other handlers
// All metastore errors end up fail/respawn the control plane.
//
// This is because deleting shards is done in reaction to an event
// and we do not really have the freedom to return an error to a caller like for other
// calls: there is no caller.
self.metastore
.delete_shards(delete_shard_request)
.await
.context("failed to delete shards in metastore")?;
self.model
.delete_shards(&source_uid.index_uid, &source_uid.source_id, shards);
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
Ok(())
}
}

#[async_trait]
impl Handler<ShardPositionsUpdate> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
shard_positions: ShardPositionsUpdate,
ctx: &ActorContext<Self>,
shard_positions_update: ShardPositionsUpdate,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let known_shard_ids: FnvHashSet<ShardId> = self
.model
.list_shards(&shard_positions_update.source_uid)
.into_iter()
.collect();
// let's identify the shard that have reached EOF but have not yet been removed.
let shard_ids_to_close: Vec<ShardId> = shard_positions_update
.shard_positions
.into_iter()
.filter(|(shard_id, position)| {
(position == &Position::Eof) && known_shard_ids.contains(shard_id)
})
.map(|(shard_id, _position)| shard_id)
.collect();
if shard_ids_to_close.is_empty() {
return Ok(());
}
self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close[..])
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -246,6 +300,9 @@ impl Handler<CreateIndexRequest> for ControlPlane {

self.model.add_index(index_metadata);

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);

let response = CreateIndexResponse {
index_uid: index_uid.into(),
};
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ impl ControlPlaneModel {
}

/// Removes the shards identified by their index UID, source ID, and shard IDs.
#[allow(dead_code)] // Will remove this in a future PR.
pub fn delete_shards(
&mut self,
index_uid: &IndexUid,
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use crate::actors::{
Sequencer, SplitsUpdateMailbox,
};
pub use crate::controlled_directory::ControlledDirectory;
use crate::models::{IndexingStatistics, ShardPositionsService};
use crate::models::IndexingStatistics;
pub use crate::split_store::{get_tantivy_directory_from_split_bundle, IndexingSplitStore};

pub mod actors;
Expand Down Expand Up @@ -77,8 +77,6 @@ pub async fn start_indexing_service(
) -> anyhow::Result<Mailbox<IndexingService>> {
info!("starting indexer service");

ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone());

// Spawn indexing service.
let indexing_service = IndexingService::new(
config.node_id.clone(),
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Actor for ShardPositionsService {
return;
}
};
if let Err(_) = mailbox.try_send_message(shard_positions) {
if mailbox.try_send_message(shard_positions).is_err() {
error!("failed to send shard positions to the shard positions service");
}
})
Expand All @@ -144,7 +144,10 @@ impl ShardPositionsService {
spawn_ctx.spawn_builder().spawn(shard_positions_service);
event_broker
.subscribe_fn::<LocalShardPositionsUpdate>(move |update| {
if let Err(_) = shard_positions_service_mailbox.try_send_message(update) {
if shard_positions_service_mailbox
.try_send_message(update)
.is_err()
{
error!("failed to send update to shard positions service");
}
})
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ mod tests {
let expected_local_update = LocalShardPositionsUpdate::new(
SourceUid {
index_uid: IndexUid::parse("test-index:0").unwrap(),
source_id: "test_source".to_string(),
source_id: "test-source".to_string(),
},
vec![(1, Position::Eof)],
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,23 +212,21 @@ impl Shards {
force: bool,
) -> MetastoreResult<MutationOccurred<()>> {
let mut mutation_occurred = false;

for shard_id in subrequest.shard_ids {
if let Entry::Occupied(entry) = self.shards.entry(shard_id) {
let shard = entry.get();
if force || shard.publish_position_inclusive() == Position::Eof {
mutation_occurred = true;
info!(
index_id=%self.index_uid.index_id(),
source_id=%self.source_id,
shard_id=%shard.shard_id,
"deleted shard",
);
entry.remove();
continue;
if !force && shard.publish_position_inclusive() != Position::Eof {
let message = format!("shard `{shard_id}` is not deletable");
return Err(MetastoreError::InvalidArgument { message });
}
let message = format!("shard `{shard_id}` is not deletable");
return Err(MetastoreError::InvalidArgument { message });
info!(
index_id=%self.index_uid.index_id(),
source_id=%self.source_id,
shard_id=%shard.shard_id,
"deleted shard",
);
entry.remove();
mutation_occurred = true;
}
}
Ok(MutationOccurred::from(mutation_occurred))
Expand Down
38 changes: 34 additions & 4 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use quickwit_control_plane::control_plane::ControlPlane;
use quickwit_control_plane::{IndexerNodeInfo, IndexerPool};
use quickwit_index_management::{IndexService as IndexManager, IndexServiceError};
use quickwit_indexing::actors::IndexingService;
use quickwit_indexing::models::ShardPositionsService;
use quickwit_indexing::start_indexing_service;
use quickwit_ingest::{
start_ingest_api_service, wait_for_ingester_decommission, GetMemoryCapacity, IngestApiService,
Expand All @@ -77,7 +78,7 @@ use quickwit_metastore::{
};
use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService};
use quickwit_proto::control_plane::ControlPlaneServiceClient;
use quickwit_proto::indexing::IndexingServiceClient;
use quickwit_proto::indexing::{IndexingServiceClient, ShardPositionsUpdate};
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -210,6 +211,7 @@ async fn start_control_plane_if_needed(
universe: &Universe,
indexer_pool: &IndexerPool,
ingester_pool: &IngesterPool,
event_broker: &EventBroker,
) -> anyhow::Result<ControlPlaneServiceClient> {
if node_config.is_service_enabled(QuickwitService::ControlPlane) {
check_cluster_configuration(
Expand All @@ -220,7 +222,7 @@ async fn start_control_plane_if_needed(
.await?;

let cluster_id = cluster.cluster_id().to_string();
let self_node_id = cluster.self_node_id().to_string();
let self_node_id: NodeId = cluster.self_node_id().to_string().into();

let replication_factor = node_config
.ingest_api_config
Expand All @@ -235,6 +237,7 @@ async fn start_control_plane_if_needed(
ingester_pool.clone(),
metastore_client.clone(),
replication_factor,
event_broker,
)
.await?;
Ok(ControlPlaneServiceClient::from_mailbox(
Expand Down Expand Up @@ -318,9 +321,21 @@ pub async fn serve_quickwit(
&universe,
&indexer_pool,
&ingester_pool,
&event_broker,
)
.await?;

// If one of the two following service is enabled, we need to enable the shard position service:
// - the control plane: as it is in charge of cleaning up shard reach eof.
// - the indexer: as it hosts ingesters, and ingesters use the shard positions to truncate
// their the queue associated to shards in mrecordlog, and publish indexers' progress to
// chitchat.
if node_config.is_service_enabled(QuickwitService::Indexer)
|| node_config.is_service_enabled(QuickwitService::ControlPlane)
{
ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone());
}

// Set up the "control plane proxy" for the metastore.
let metastore_through_control_plane = MetastoreServiceClient::new(ControlPlaneMetastore::new(
control_plane_service.clone(),
Expand Down Expand Up @@ -676,16 +691,17 @@ async fn setup_searcher(
Ok((search_job_placer, search_service))
}

#[allow(clippy::too_many_arguments)]
async fn setup_control_plane(
universe: &Universe,
cluster_id: String,
self_node_id: String,
self_node_id: NodeId,
indexer_pool: IndexerPool,
ingester_pool: IngesterPool,
metastore: MetastoreServiceClient,
replication_factor: usize,
event_broker: &EventBroker,
) -> anyhow::Result<Mailbox<ControlPlane>> {
let self_node_id: NodeId = self_node_id.into();
let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
universe,
cluster_id,
Expand All @@ -695,6 +711,20 @@ async fn setup_control_plane(
metastore,
replication_factor,
);
let weak_control_plane_mailbox = control_plane_mailbox.downgrade();
event_broker
.subscribe_fn::<ShardPositionsUpdate>(move |shard_positions_update| {
let Some(control_plane_mailbox) = weak_control_plane_mailbox.upgrade() else {
return;
};
if control_plane_mailbox
.try_send_message(shard_positions_update)
.is_err()
{
error!("failed to send shard positions update to control plane");
}
})
.forever();
Ok(control_plane_mailbox)
}

Expand Down

0 comments on commit 17e911c

Please sign in to comment.