Skip to content

Commit

Permalink
Batch index_metadata calls in indexing service (#4921)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored May 9, 2024
1 parent a2c1110 commit f3c5528
Show file tree
Hide file tree
Showing 27 changed files with 1,689 additions and 438 deletions.
17 changes: 5 additions & 12 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,11 @@ impl Cluster {
/// - value: Number of indexing tasks in the group.
/// Keys present in chitchat state but not in the given `indexing_tasks` are marked for
/// deletion.
pub async fn update_self_node_indexing_tasks(
&self,
indexing_tasks: &[IndexingTask],
) -> anyhow::Result<()> {
pub async fn update_self_node_indexing_tasks(&self, indexing_tasks: &[IndexingTask]) {
let chitchat = self.chitchat().await;
let mut chitchat_guard = chitchat.lock().await;
let node_state = chitchat_guard.self_node_state();
set_indexing_tasks_in_node_state(indexing_tasks, node_state);
Ok(())
}

pub async fn chitchat(&self) -> Arc<Mutex<Chitchat>> {
Expand Down Expand Up @@ -961,8 +957,7 @@ mod tests {
.await;
cluster2
.update_self_node_indexing_tasks(&[indexing_task1.clone(), indexing_task2.clone()])
.await
.unwrap();
.await;
cluster1
.wait_for_ready_members(|members| members.len() == 2, Duration::from_secs(30))
.await
Expand Down Expand Up @@ -1042,8 +1037,7 @@ mod tests {
.collect_vec();
cluster1
.update_self_node_indexing_tasks(&indexing_tasks)
.await
.unwrap();
.await;
for cluster in [&cluster2, &cluster3] {
let cluster_clone = cluster.clone();
let indexing_tasks_clone = indexing_tasks.clone();
Expand All @@ -1063,7 +1057,7 @@ mod tests {
}

// Mark tasks for deletion.
cluster1.update_self_node_indexing_tasks(&[]).await.unwrap();
cluster1.update_self_node_indexing_tasks(&[]).await;
for cluster in [&cluster2, &cluster3] {
let cluster_clone = cluster.clone();
wait_until_predicate(
Expand All @@ -1084,8 +1078,7 @@ mod tests {
// Re-add tasks.
cluster1
.update_self_node_indexing_tasks(&indexing_tasks)
.await
.unwrap();
.await;
for cluster in [&cluster2, &cluster3] {
let cluster_clone = cluster.clone();
let indexing_tasks_clone = indexing_tasks.clone();
Expand Down
63 changes: 53 additions & 10 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest,
DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest,
IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient,
ToggleSourceRequest,
IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService,
MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest,
};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use serde::Serialize;
Expand Down Expand Up @@ -570,11 +570,39 @@ impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
} else {
reply(Ok(response));
}

Ok(())
}
}

// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
impl Handler<UpdateIndexRequest> for ControlPlane {
type Reply = ControlPlaneResult<IndexMetadataResponse>;

async fn handle(
&mut self,
request: UpdateIndexRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid().clone();
debug!(%index_uid, "updating index");

let response = match ctx
.protect_future(self.metastore.update_index(request))
.await
{
Ok(response) => response,
Err(metastore_error) => {
return convert_metastore_error(metastore_error);
}
};
// TODO: Handle doc mapping and/or indexing settings update here.
info!(%index_uid, "updated index");
Ok(Ok(response))
}
}

// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
Expand Down Expand Up @@ -681,9 +709,9 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
};
info!(%index_uid, source_id, enabled=enable, "toggled source");

let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;
let mutation_occurred = self.model.toggle_source(&index_uid, &source_id, enable)?;

if mutation_occured {
if mutation_occurred {
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
}
Ok(Ok(EmptyResponse {}))
Expand Down Expand Up @@ -1020,9 +1048,10 @@ mod tests {
};
use quickwit_proto::ingest::{Shard, ShardPKey, ShardState};
use quickwit_proto::metastore::{
EntityKind, FindIndexTemplateMatchesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
MetastoreError, MockMetastoreService, OpenShardSubresponse, OpenShardsResponse, SourceType,
DeleteShardsResponse, EntityKind, FindIndexTemplateMatchesResponse,
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest,
ListShardsResponse, ListShardsSubresponse, MetastoreError, MockMetastoreService,
OpenShardSubresponse, OpenShardsResponse, SourceType,
};
use quickwit_proto::types::Position;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -1556,7 +1585,14 @@ mod tests {
assert_eq!(delete_shards_request.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(delete_shards_request.shard_ids, [ShardId::from(17)]);
assert!(!delete_shards_request.force);
Ok(EmptyResponse {})

let response = DeleteShardsResponse {
index_uid: delete_shards_request.index_uid,
source_id: delete_shards_request.source_id,
successes: delete_shards_request.shard_ids,
failures: Vec::new(),
};
Ok(response)
},
);

Expand Down Expand Up @@ -1776,7 +1812,14 @@ mod tests {
assert_eq!(delete_shards_request.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(delete_shards_request.shard_ids, [ShardId::from(17)]);
assert!(!delete_shards_request.force);
Ok(EmptyResponse {})

let response = DeleteShardsResponse {
index_uid: delete_shards_request.index_uid,
source_id: delete_shards_request.source_id,
successes: delete_shards_request.shard_ids,
failures: Vec::new(),
};
Ok(response)
},
);

Expand Down
12 changes: 4 additions & 8 deletions quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() {
// `ApplyIndexingPlanRequest`.
cluster
.update_self_node_indexing_tasks(&indexing_tasks)
.await
.unwrap();
.await;
let scheduler_state = control_plane_mailbox
.ask(Observe)
.await
Expand All @@ -237,8 +236,7 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() {
// receive a new `ApplyIndexingPlanRequest`.
cluster
.update_self_node_indexing_tasks(&[indexing_tasks[0].clone()])
.await
.unwrap();
.await;
tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(1.2)).await;
let scheduler_state = control_plane_mailbox
.ask(Observe)
Expand Down Expand Up @@ -377,12 +375,10 @@ async fn test_scheduler_scheduling_multiple_indexers() {
assert_eq!(indexing_service_inbox_messages_2.len(), 1);
cluster_indexer_1
.update_self_node_indexing_tasks(&indexing_service_inbox_messages_1[0].indexing_tasks)
.await
.unwrap();
.await;
cluster_indexer_2
.update_self_node_indexing_tasks(&indexing_service_inbox_messages_2[0].indexing_tasks)
.await
.unwrap();
.await;

// Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan
// several times.
Expand Down
Loading

0 comments on commit f3c5528

Please sign in to comment.