Skip to content

Commit

Permalink
Batch index_metadata calls in indexing service
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 8, 2024
1 parent e16a7f4 commit 8c1ddfb
Show file tree
Hide file tree
Showing 25 changed files with 1,559 additions and 306 deletions.
63 changes: 53 additions & 10 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest,
DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest,
IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient,
ToggleSourceRequest,
IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService,
MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest,
};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use serde::Serialize;
Expand Down Expand Up @@ -570,11 +570,39 @@ impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
} else {
reply(Ok(response));
}

Ok(())
}
}

// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
impl Handler<UpdateIndexRequest> for ControlPlane {
type Reply = ControlPlaneResult<IndexMetadataResponse>;

async fn handle(
&mut self,
request: UpdateIndexRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid().clone();
debug!(%index_uid, "updating index");

let response = match ctx
.protect_future(self.metastore.update_index(request))
.await
{
Ok(response) => response,
Err(metastore_error) => {
return convert_metastore_error(metastore_error);
}
};
// TODO: Handle doc mapping and/or indexing settings update here.
info!(%index_uid, "updated index");
Ok(Ok(response))
}
}

// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
Expand Down Expand Up @@ -681,9 +709,9 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
};
info!(%index_uid, source_id, enabled=enable, "toggled source");

let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;
let mutation_occurred = self.model.toggle_source(&index_uid, &source_id, enable)?;

if mutation_occured {
if mutation_occurred {
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
}
Ok(Ok(EmptyResponse {}))
Expand Down Expand Up @@ -1020,9 +1048,10 @@ mod tests {
};
use quickwit_proto::ingest::{Shard, ShardPKey, ShardState};
use quickwit_proto::metastore::{
EntityKind, FindIndexTemplateMatchesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
MetastoreError, MockMetastoreService, OpenShardSubresponse, OpenShardsResponse, SourceType,
DeleteShardsResponse, EntityKind, FindIndexTemplateMatchesResponse,
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest,
ListShardsResponse, ListShardsSubresponse, MetastoreError, MockMetastoreService,
OpenShardSubresponse, OpenShardsResponse, SourceType,
};
use quickwit_proto::types::Position;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -1556,7 +1585,14 @@ mod tests {
assert_eq!(delete_shards_request.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(delete_shards_request.shard_ids, [ShardId::from(17)]);
assert!(!delete_shards_request.force);
Ok(EmptyResponse {})

let response = DeleteShardsResponse {
index_uid: delete_shards_request.index_uid,
source_id: delete_shards_request.source_id,
successes: delete_shards_request.shard_ids,
failures: Vec::new(),
};
Ok(response)
},
);

Expand Down Expand Up @@ -1776,7 +1812,14 @@ mod tests {
assert_eq!(delete_shards_request.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(delete_shards_request.shard_ids, [ShardId::from(17)]);
assert!(!delete_shards_request.force);
Ok(EmptyResponse {})

let response = DeleteShardsResponse {
index_uid: delete_shards_request.index_uid,
source_id: delete_shards_request.source_id,
successes: delete_shards_request.shard_ids,
failures: Vec::new(),
};
Ok(response)
},
);

Expand Down
74 changes: 50 additions & 24 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use fnv::FnvHashSet;
use futures::future::try_join_all;
use itertools::Itertools;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox,
Expand All @@ -43,13 +42,17 @@ use quickwit_ingest::{
DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest,
QUEUES_DIR_NAME,
};
use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt};
use quickwit_metastore::{
IndexMetadata, IndexMetadataResponseExt, IndexesMetadataResponseExt,
ListIndexesMetadataResponseExt,
};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId,
IndexingTask, PipelineMetrics,
};
use quickwit_proto::metastore::{
IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
IndexMetadataRequest, IndexMetadataSubrequest, IndexesMetadataRequest,
ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::{IndexId, IndexUid, PipelineUid};
use quickwit_storage::StorageResolver;
Expand Down Expand Up @@ -366,20 +369,48 @@ impl IndexingService {
}

async fn index_metadata(
&self,
&mut self,
ctx: &ActorContext<Self>,
index_id: &str,
) -> Result<IndexMetadata, IndexingError> {
let _protect_guard = ctx.protect_zone();
let _protected_zone_guard = ctx.protect_zone();
let index_metadata_response = self
.metastore
.clone()
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
.await?;
let index_metadata = index_metadata_response.deserialize_index_metadata()?;
Ok(index_metadata)
}

async fn indexes_metadata(
&mut self,
ctx: &ActorContext<Self>,
indexing_pipeline_ids: &[IndexingPipelineId],
) -> Result<Vec<IndexMetadata>, IndexingError> {
let index_metadata_subrequests: Vec<IndexMetadataSubrequest> = indexing_pipeline_ids
.iter()
// Remove duplicate subrequests
.unique_by(|pipeline_id| &pipeline_id.index_uid)
.map(|pipeline_id| IndexMetadataSubrequest {
index_id: None,
index_uid: Some(pipeline_id.index_uid.clone()),
})
.collect();
let indexes_metadata_request = IndexesMetadataRequest {
subrequests: index_metadata_subrequests,
};
let _protected_zone_guard = ctx.protect_zone();

let indexes_metadata_response = self
.metastore
.indexes_metadata(indexes_metadata_request)
.await?;
let indexes_metadata = indexes_metadata_response
.deserialize_indexes_metadata()
.await?;
Ok(indexes_metadata)
}

async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> {
self.indexing_pipelines
.retain(|pipeline_uid, pipeline_handle| {
Expand All @@ -399,7 +430,7 @@ impl IndexingService {
// and are themselves in charge of supervising the pipeline actors.
error!(
pipeline_uid=%pipeline_uid,
"Indexing pipeline exited with failure. This should never happen."
"indexing pipeline exited with failure: this should never happen, please report"
);
self.counters.num_failed_pipelines += 1;
self.counters.num_running_pipelines -= 1;
Expand Down Expand Up @@ -489,9 +520,9 @@ impl IndexingService {
let pipeline_uids_to_remove: Vec<PipelineUid> = self
.indexing_pipelines
.keys()
.cloned()
.filter(|pipeline_uid| !pipeline_uids_in_plan.contains(pipeline_uid))
.collect::<Vec<_>>();
.cloned()
.collect();

// Shut down currently running pipelines that are missing in the new plan.
self.shutdown_pipelines(&pipeline_uids_to_remove).await;
Expand All @@ -508,15 +539,15 @@ impl IndexingService {
let pipeline_uid = indexing_task.pipeline_uid();
!self.indexing_pipelines.contains_key(&pipeline_uid)
})
.flat_map(|indexing_task| {
.map(|indexing_task| {
let pipeline_uid = indexing_task.pipeline_uid();
let index_uid = indexing_task.index_uid().clone();
Some(IndexingPipelineId {
IndexingPipelineId {
node_id: self.node_id.clone(),
index_uid,
source_id: indexing_task.source_id.clone(),
pipeline_uid,
})
}
})
.collect();
self.spawn_pipelines(&pipeline_ids_to_add, ctx).await
Expand Down Expand Up @@ -580,13 +611,9 @@ impl IndexingService {
ctx: &ActorContext<Self>,
) -> Result<Vec<IndexingPipelineId>, IndexingError> {
// We fetch the new indexes metadata.
let indexes_metadata_futures = added_pipeline_ids
.iter()
// No need to emit two request for the same `index_uid`
.unique_by(|pipeline_id| pipeline_id.index_uid.clone())
.map(|pipeline_id| self.index_metadata(ctx, &pipeline_id.index_uid.index_id));
let indexes_metadata = try_join_all(indexes_metadata_futures).await?;
let indexes_metadata_by_index_id: HashMap<IndexUid, IndexMetadata> = indexes_metadata
let indexes_metadata = self.indexes_metadata(ctx, added_pipeline_ids).await?;

let per_index_uid_indexes_metadata: HashMap<IndexUid, IndexMetadata> = indexes_metadata
.into_iter()
.map(|index_metadata| (index_metadata.index_uid.clone(), index_metadata))
.collect();
Expand All @@ -596,7 +623,7 @@ impl IndexingService {
// Add new pipelines.
for new_pipeline_id in added_pipeline_ids {
if let Some(index_metadata) =
indexes_metadata_by_index_id.get(&new_pipeline_id.index_uid)
per_index_uid_indexes_metadata.get(&new_pipeline_id.index_uid)
{
if let Some(source_config) = index_metadata.sources.get(&new_pipeline_id.source_id)
{
Expand All @@ -618,13 +645,12 @@ impl IndexingService {
}
} else {
error!(
"Failed to spawn pipeline: index {} no longer exists.",
&new_pipeline_id.index_uid.to_string()
"failed to spawn pipeline: index `{}` no longer exists",
new_pipeline_id.index_uid
);
failed_spawning_pipeline_ids.push(new_pipeline_id.clone());
}
}

Ok(failed_spawning_pipeline_ids)
}

Expand All @@ -640,7 +666,7 @@ impl IndexingService {
for &pipeline_uid_to_remove in pipeline_uids {
match self.detach_pipeline(pipeline_uid_to_remove).await {
Ok(pipeline_handle) => {
// Killing the pipeline ensure that all pipeline actors will stop.
// Killing the pipeline ensures that all the pipeline actors will stop.
pipeline_handle.kill().await;
}
Err(error) => {
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_8, Version
pub use metastore::postgres::PostgresqlMetastore;
pub use metastore::{
file_backed, AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata,
IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery,
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
UpdateIndexRequestExt,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClie
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteSourceRequest,
DeleteSplitsRequest, DeleteTask, EmptyResponse, FindIndexTemplateMatchesRequest,
FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataRequest, IndexMetadataResponse, LastDeleteOpstampRequest,
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult,
MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest,
OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse,
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest,
GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest,
IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse,
ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest,
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService,
MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse,
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
};

/// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can
Expand Down Expand Up @@ -86,6 +86,19 @@ impl MetastoreService for ControlPlaneMetastore {
Ok(response)
}

// Technically, proxying this call via the control plane is not necessary at the moment because
// it does not modify attributes the control plane cares about (retention policy, search
// settings). However, it would be easy to forget to do so when we add the ability to update the
// doc mapping or merge policy of an index, so we've already set up the proxy here since calling
// `update_index` is very infrequent anyway.
async fn update_index(
&mut self,
request: UpdateIndexRequest,
) -> MetastoreResult<IndexMetadataResponse> {
let response = self.control_plane.update_index(request).await?;
Ok(response)
}

async fn delete_index(
&mut self,
request: DeleteIndexRequest,
Expand Down Expand Up @@ -117,21 +130,20 @@ impl MetastoreService for ControlPlaneMetastore {

// Other metastore API calls.

async fn update_index(
&mut self,
request: UpdateIndexRequest,
) -> MetastoreResult<IndexMetadataResponse> {
let response = self.metastore.update_index(request).await?;
Ok(response)
}

async fn index_metadata(
&mut self,
request: IndexMetadataRequest,
) -> MetastoreResult<IndexMetadataResponse> {
self.metastore.index_metadata(request).await
}

async fn indexes_metadata(
&mut self,
request: IndexesMetadataRequest,
) -> MetastoreResult<IndexesMetadataResponse> {
self.metastore.indexes_metadata(request).await
}

async fn list_indexes_metadata(
&mut self,
request: ListIndexesMetadataRequest,
Expand Down Expand Up @@ -244,7 +256,7 @@ impl MetastoreService for ControlPlaneMetastore {
async fn delete_shards(
&mut self,
request: DeleteShardsRequest,
) -> MetastoreResult<EmptyResponse> {
) -> MetastoreResult<DeleteShardsResponse> {
self.metastore.delete_shards(request).await
}

Expand Down
Loading

0 comments on commit 8c1ddfb

Please sign in to comment.