From f3c5528cf126600315fd08e8c0a619e6ac78f377 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 9 May 2024 10:28:19 -0600 Subject: [PATCH] Batch `index_metadata` calls in indexing service (#4921) --- quickwit/quickwit-cluster/src/cluster.rs | 17 +- .../src/control_plane.rs | 63 ++- quickwit/quickwit-control-plane/src/tests.rs | 12 +- .../src/actors/indexing_service.rs | 307 +++++++------- quickwit/quickwit-metastore/src/lib.rs | 4 +- .../src/metastore/control_plane_metastore.rs | 54 ++- .../file_backed/file_backed_index/mod.rs | 24 +- .../file_backed/file_backed_index/shards.rs | 124 ++++-- .../src/metastore/file_backed/mod.rs | 156 +++++-- .../src/metastore/index_metadata/mod.rs | 24 +- .../quickwit-metastore/src/metastore/mod.rs | 64 ++- .../src/metastore/postgres/metastore.rs | 346 ++++++++++------ .../postgres/queries/indexes_metadata.sql | 7 + .../postgres/queries/shards/delete.sql | 2 +- .../queries/shards/find_not_deletable.sql | 9 + .../quickwit-metastore/src/tests/index.rs | 110 ++++- quickwit/quickwit-metastore/src/tests/mod.rs | 16 +- .../quickwit-metastore/src/tests/shard.rs | 30 +- quickwit/quickwit-proto/build.rs | 5 +- .../protos/quickwit/control_plane.proto | 4 + .../protos/quickwit/metastore.proto | 46 ++- .../quickwit/quickwit.control_plane.rs | 258 ++++++++++++ .../codegen/quickwit/quickwit.metastore.rs | 385 ++++++++++++++++-- quickwit/quickwit-proto/src/getters.rs | 1 + quickwit/quickwit-proto/src/metastore/mod.rs | 15 - .../quickwit-proto/src/types/index_uid.rs | 23 ++ quickwit/quickwit-proto/src/types/shard_id.rs | 21 + 27 files changed, 1689 insertions(+), 438 deletions(-) create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/indexes_metadata.sql create mode 100644 quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/find_not_deletable.sql diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index a3d83e81573..2b847f12c26 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -418,15 +418,11 @@ impl Cluster { /// - value: Number of indexing tasks in the group. /// Keys present in chitchat state but not in the given `indexing_tasks` are marked for /// deletion. - pub async fn update_self_node_indexing_tasks( - &self, - indexing_tasks: &[IndexingTask], - ) -> anyhow::Result<()> { + pub async fn update_self_node_indexing_tasks(&self, indexing_tasks: &[IndexingTask]) { let chitchat = self.chitchat().await; let mut chitchat_guard = chitchat.lock().await; let node_state = chitchat_guard.self_node_state(); set_indexing_tasks_in_node_state(indexing_tasks, node_state); - Ok(()) } pub async fn chitchat(&self) -> Arc> { @@ -961,8 +957,7 @@ mod tests { .await; cluster2 .update_self_node_indexing_tasks(&[indexing_task1.clone(), indexing_task2.clone()]) - .await - .unwrap(); + .await; cluster1 .wait_for_ready_members(|members| members.len() == 2, Duration::from_secs(30)) .await @@ -1042,8 +1037,7 @@ mod tests { .collect_vec(); cluster1 .update_self_node_indexing_tasks(&indexing_tasks) - .await - .unwrap(); + .await; for cluster in [&cluster2, &cluster3] { let cluster_clone = cluster.clone(); let indexing_tasks_clone = indexing_tasks.clone(); @@ -1063,7 +1057,7 @@ mod tests { } // Mark tasks for deletion. - cluster1.update_self_node_indexing_tasks(&[]).await.unwrap(); + cluster1.update_self_node_indexing_tasks(&[]).await; for cluster in [&cluster2, &cluster3] { let cluster_clone = cluster.clone(); wait_until_predicate( @@ -1084,8 +1078,7 @@ mod tests { // Re-add tasks. cluster1 .update_self_node_indexing_tasks(&indexing_tasks) - .await - .unwrap(); + .await; for cluster in [&cluster2, &cluster3] { let cluster_clone = cluster.clone(); let indexing_tasks_clone = indexing_tasks.clone(); diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 7d65cadccf3..61232a76fed 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -48,8 +48,8 @@ use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest, DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest, - IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - ToggleSourceRequest, + IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService, + MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest, }; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; use serde::Serialize; @@ -570,11 +570,39 @@ impl DeferableReplyHandler for ControlPlane { } else { reply(Ok(response)); } - Ok(()) } } +// This handler is a metastore call proxied through the control plane: we must first forward the +// request to the metastore, and then act on the event. +#[async_trait] +impl Handler for ControlPlane { + type Reply = ControlPlaneResult; + + async fn handle( + &mut self, + request: UpdateIndexRequest, + ctx: &ActorContext, + ) -> Result { + let index_uid: IndexUid = request.index_uid().clone(); + debug!(%index_uid, "updating index"); + + let response = match ctx + .protect_future(self.metastore.update_index(request)) + .await + { + Ok(response) => response, + Err(metastore_error) => { + return convert_metastore_error(metastore_error); + } + }; + // TODO: Handle doc mapping and/or indexing settings update here. + info!(%index_uid, "updated index"); + Ok(Ok(response)) + } +} + // This handler is a metastore call proxied through the control plane: we must first forward the // request to the metastore, and then act on the event. #[async_trait] @@ -681,9 +709,9 @@ impl Handler for ControlPlane { }; info!(%index_uid, source_id, enabled=enable, "toggled source"); - let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?; + let mutation_occurred = self.model.toggle_source(&index_uid, &source_id, enable)?; - if mutation_occured { + if mutation_occurred { let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); } Ok(Ok(EmptyResponse {})) @@ -1020,9 +1048,10 @@ mod tests { }; use quickwit_proto::ingest::{Shard, ShardPKey, ShardState}; use quickwit_proto::metastore::{ - EntityKind, FindIndexTemplateMatchesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse, - MetastoreError, MockMetastoreService, OpenShardSubresponse, OpenShardsResponse, SourceType, + DeleteShardsResponse, EntityKind, FindIndexTemplateMatchesResponse, + ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, + ListShardsResponse, ListShardsSubresponse, MetastoreError, MockMetastoreService, + OpenShardSubresponse, OpenShardsResponse, SourceType, }; use quickwit_proto::types::Position; use tokio::sync::Mutex; @@ -1556,7 +1585,14 @@ mod tests { assert_eq!(delete_shards_request.source_id, INGEST_V2_SOURCE_ID); assert_eq!(delete_shards_request.shard_ids, [ShardId::from(17)]); assert!(!delete_shards_request.force); - Ok(EmptyResponse {}) + + let response = DeleteShardsResponse { + index_uid: delete_shards_request.index_uid, + source_id: delete_shards_request.source_id, + successes: delete_shards_request.shard_ids, + failures: Vec::new(), + }; + Ok(response) }, ); @@ -1776,7 +1812,14 @@ mod tests { assert_eq!(delete_shards_request.source_id, INGEST_V2_SOURCE_ID); assert_eq!(delete_shards_request.shard_ids, [ShardId::from(17)]); assert!(!delete_shards_request.force); - Ok(EmptyResponse {}) + + let response = DeleteShardsResponse { + index_uid: delete_shards_request.index_uid, + source_id: delete_shards_request.source_id, + successes: delete_shards_request.shard_ids, + failures: Vec::new(), + }; + Ok(response) }, ); diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index d5d8c946125..288a84f1a41 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -221,8 +221,7 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { // `ApplyIndexingPlanRequest`. cluster .update_self_node_indexing_tasks(&indexing_tasks) - .await - .unwrap(); + .await; let scheduler_state = control_plane_mailbox .ask(Observe) .await @@ -237,8 +236,7 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { // receive a new `ApplyIndexingPlanRequest`. cluster .update_self_node_indexing_tasks(&[indexing_tasks[0].clone()]) - .await - .unwrap(); + .await; tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(1.2)).await; let scheduler_state = control_plane_mailbox .ask(Observe) @@ -377,12 +375,10 @@ async fn test_scheduler_scheduling_multiple_indexers() { assert_eq!(indexing_service_inbox_messages_2.len(), 1); cluster_indexer_1 .update_self_node_indexing_tasks(&indexing_service_inbox_messages_1[0].indexing_tasks) - .await - .unwrap(); + .await; cluster_indexer_2 .update_self_node_indexing_tasks(&indexing_service_inbox_messages_2[0].indexing_tasks) - .await - .unwrap(); + .await; // Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan // several times. diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 04dd2124190..1a988f5661f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -24,8 +24,6 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; -use fnv::FnvHashSet; -use futures::future::try_join_all; use itertools::Itertools; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox, @@ -43,15 +41,19 @@ use quickwit_ingest::{ DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest, QUEUES_DIR_NAME, }; -use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt}; +use quickwit_metastore::{ + IndexMetadata, IndexMetadataResponseExt, IndexesMetadataResponseExt, + ListIndexesMetadataResponseExt, +}; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId, IndexingTask, PipelineMetrics, }; use quickwit_proto::metastore::{ - IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, + IndexMetadataRequest, IndexMetadataSubrequest, IndexesMetadataRequest, + ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, }; -use quickwit_proto::types::{IndexId, IndexUid, PipelineUid}; +use quickwit_proto::types::{IndexId, IndexUid, PipelineUid, ShardId}; use quickwit_storage::StorageResolver; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; @@ -199,15 +201,15 @@ impl IndexingService { }) } - async fn detach_pipeline( + async fn detach_indexing_pipeline( &mut self, - pipeline_uid: PipelineUid, + pipeline_uid: &PipelineUid, ) -> Result, IndexingError> { let pipeline_handle = self .indexing_pipelines - .remove(&pipeline_uid) + .remove(pipeline_uid) .ok_or_else(|| { - let message = format!("indexing pipeline `{pipeline_uid}` not found"); + let message = format!("could not find indexing pipeline `{pipeline_uid}`"); IndexingError::Internal(message) })?; self.counters.num_running_pipelines -= 1; @@ -222,7 +224,7 @@ impl IndexingService { .merge_pipeline_handles .remove(pipeline_id) .ok_or_else(|| { - let message = format!("merge pipeline `{pipeline_id}` not found"); + let message = format!("could not find merge pipeline `{pipeline_id}`"); IndexingError::Internal(message) })?; self.counters.num_running_merge_pipelines -= 1; @@ -231,13 +233,13 @@ impl IndexingService { async fn observe_pipeline( &mut self, - pipeline_uid: PipelineUid, + pipeline_uid: &PipelineUid, ) -> Result, IndexingError> { let pipeline_handle = &self .indexing_pipelines - .get(&pipeline_uid) + .get(pipeline_uid) .ok_or_else(|| { - let message = format!("indexing pipeline `{pipeline_uid}` not found"); + let message = format!("could not find indexing pipeline `{pipeline_uid}`"); IndexingError::Internal(message) })? .handle; @@ -366,20 +368,48 @@ impl IndexingService { } async fn index_metadata( - &self, + &mut self, ctx: &ActorContext, index_id: &str, ) -> Result { - let _protect_guard = ctx.protect_zone(); + let _protected_zone_guard = ctx.protect_zone(); let index_metadata_response = self .metastore - .clone() .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) .await?; let index_metadata = index_metadata_response.deserialize_index_metadata()?; Ok(index_metadata) } + async fn indexes_metadata( + &mut self, + ctx: &ActorContext, + indexing_pipeline_ids: &[IndexingPipelineId], + ) -> Result, IndexingError> { + let index_metadata_subrequests: Vec = indexing_pipeline_ids + .iter() + // Remove duplicate subrequests + .unique_by(|pipeline_id| &pipeline_id.index_uid) + .map(|pipeline_id| IndexMetadataSubrequest { + index_id: None, + index_uid: Some(pipeline_id.index_uid.clone()), + }) + .collect(); + let indexes_metadata_request = IndexesMetadataRequest { + subrequests: index_metadata_subrequests, + }; + let _protected_zone_guard = ctx.protect_zone(); + + let indexes_metadata_response = self + .metastore + .indexes_metadata(indexes_metadata_request) + .await?; + let indexes_metadata = indexes_metadata_response + .deserialize_indexes_metadata() + .await?; + Ok(indexes_metadata) + } + async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> { self.indexing_pipelines .retain(|pipeline_uid, pipeline_handle| { @@ -399,7 +429,7 @@ impl IndexingService { // and are themselves in charge of supervising the pipeline actors. error!( pipeline_uid=%pipeline_uid, - "Indexing pipeline exited with failure. This should never happen." + "indexing pipeline exited with failure: this should never happen, please report" ); self.counters.num_failed_pipelines += 1; self.counters.num_running_pipelines -= 1; @@ -407,27 +437,32 @@ impl IndexingService { } } }); - // Evict and kill merge pipelines that are not needed. - let needed_merge_pipeline_ids: HashSet = self + let merge_pipelines_to_retain: HashSet = self .indexing_pipelines .values() .map(|pipeline_handle| MergePipelineId::from(&pipeline_handle.indexing_pipeline_id)) .collect(); - let current_merge_pipeline_ids: HashSet = - self.merge_pipeline_handles.keys().cloned().collect(); - for merge_pipeline_id_to_shut_down in - current_merge_pipeline_ids.difference(&needed_merge_pipeline_ids) - { + + let merge_pipelines_to_shutdown: Vec = self + .merge_pipeline_handles + .keys() + .filter(|running_merge_pipeline_id| { + !merge_pipelines_to_retain.contains(running_merge_pipeline_id) + }) + .cloned() + .collect(); + + for merge_pipeline_to_shutdown in merge_pipelines_to_shutdown { if let Some((_, merge_pipeline_handle)) = self .merge_pipeline_handles - .remove_entry(merge_pipeline_id_to_shut_down) + .remove_entry(&merge_pipeline_to_shutdown) { // We kill the merge pipeline to avoid waiting a merge operation to finish as it can // be long. info!( - index_id=%merge_pipeline_id_to_shut_down.index_uid.index_id, - source_id=%merge_pipeline_id_to_shut_down.source_id, - "No more indexing pipeline on this index and source, killing merge pipeline." + index_uid=%merge_pipeline_to_shutdown.index_uid, + source_id=%merge_pipeline_to_shutdown.source_id, + "no more indexing pipeline on this index and source, killing merge pipeline" ); merge_pipeline_handle.handle.kill().await; } @@ -438,8 +473,7 @@ impl IndexingService { merge_pipeline_mailbox_handle.handle.state().is_running() }); self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len(); - self.update_cluster_running_indexing_tasks_in_chitchat() - .await; + self.update_chitchat_running_plan().await; let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self .indexing_pipelines @@ -480,48 +514,6 @@ impl IndexingService { Ok(merge_planner_mailbox) } - async fn find_and_shutdown_decommissioned_pipelines(&mut self, tasks: &[IndexingTask]) { - let pipeline_uids_in_plan: FnvHashSet = tasks - .iter() - .map(|indexing_task| indexing_task.pipeline_uid()) - .collect(); - - let pipeline_uids_to_remove: Vec = self - .indexing_pipelines - .keys() - .cloned() - .filter(|pipeline_uid| !pipeline_uids_in_plan.contains(pipeline_uid)) - .collect::>(); - - // Shut down currently running pipelines that are missing in the new plan. - self.shutdown_pipelines(&pipeline_uids_to_remove).await; - } - - async fn find_and_spawn_new_pipelines( - &mut self, - tasks: &[IndexingTask], - ctx: &ActorContext, - ) -> Result, IndexingError> { - let pipeline_ids_to_add: Vec = tasks - .iter() - .filter(|indexing_task| { - let pipeline_uid = indexing_task.pipeline_uid(); - !self.indexing_pipelines.contains_key(&pipeline_uid) - }) - .flat_map(|indexing_task| { - let pipeline_uid = indexing_task.pipeline_uid(); - let index_uid = indexing_task.index_uid().clone(); - Some(IndexingPipelineId { - node_id: self.node_id.clone(), - index_uid, - source_id: indexing_task.source_id.clone(), - pipeline_uid, - }) - }) - .collect(); - self.spawn_pipelines(&pipeline_ids_to_add, ctx).await - } - /// For all Ingest V2 pipelines, assigns the set of shards they should be working on. /// This is done regardless of whether there has been a change in their shard list /// or not. @@ -542,7 +534,7 @@ impl IndexingService { let message = AssignShards(assignment); if let Err(error) = pipeline_handle.mailbox.send_message(message).await { - error!(error=%error, "failed to assign shards to indexing pipeline"); + error!(%error, "failed to assign shards to indexing pipeline"); } } } @@ -550,106 +542,141 @@ impl IndexingService { /// Applies the indexing plan by: /// - Stopping the running pipelines not present in the provided plan. /// - Starting the pipelines that are not running. - /// Note: the indexing is a list of `IndexingTask` and has no ordinal - /// like a pipeline. We assign an ordinal for each `IndexingTask` from - /// [0, n) with n the number of indexing tasks given a (index_id, source_id). async fn apply_indexing_plan( &mut self, tasks: &[IndexingTask], ctx: &ActorContext, ) -> Result<(), IndexingError> { - self.find_and_shutdown_decommissioned_pipelines(tasks).await; - let failed_spawning_pipeline_ids = self.find_and_spawn_new_pipelines(tasks, ctx).await?; + let pipeline_diff = self.compute_pipeline_diff(tasks); + + if !pipeline_diff.pipelines_to_shutdown.is_empty() { + self.shutdown_pipelines(&pipeline_diff.pipelines_to_shutdown) + .await; + } + let mut spawn_pipeline_failures: Vec = Vec::new(); + + if !pipeline_diff.pipelines_to_spawn.is_empty() { + spawn_pipeline_failures = self + .spawn_pipelines(&pipeline_diff.pipelines_to_spawn, ctx) + .await?; + } self.assign_shards_to_pipelines(tasks).await; - self.update_cluster_running_indexing_tasks_in_chitchat() - .await; - if !failed_spawning_pipeline_ids.is_empty() { + self.update_chitchat_running_plan().await; + + if !spawn_pipeline_failures.is_empty() { let message = format!( "failed to spawn indexing pipelines: {:?}", - failed_spawning_pipeline_ids + spawn_pipeline_failures ); return Err(IndexingError::Internal(message)); } Ok(()) } + /// Identifies the pipelines to spawn and shutdown by comparing the scheduled plan with the + /// current running plan. + fn compute_pipeline_diff(&self, tasks: &[IndexingTask]) -> IndexingPipelineDiff { + let mut pipelines_to_spawn: Vec = Vec::new(); + let mut scheduled_pipeline_uids: HashSet = HashSet::with_capacity(tasks.len()); + + for task in tasks { + let pipeline_uid = task.pipeline_uid(); + + if !self.indexing_pipelines.contains_key(&pipeline_uid) { + let pipeline_id = IndexingPipelineId { + node_id: self.node_id.clone(), + index_uid: task.index_uid().clone(), + source_id: task.source_id.clone(), + pipeline_uid, + }; + pipelines_to_spawn.push(pipeline_id); + } + scheduled_pipeline_uids.insert(pipeline_uid); + } + let pipelines_to_shutdown: Vec = self + .indexing_pipelines + .keys() + .filter(|pipeline_uid| !scheduled_pipeline_uids.contains(pipeline_uid)) + .copied() + .collect(); + + IndexingPipelineDiff { + pipelines_to_shutdown, + pipelines_to_spawn, + } + } + /// Spawns the pipelines with supplied ids and returns a list of failed pipelines. async fn spawn_pipelines( &mut self, - added_pipeline_ids: &[IndexingPipelineId], + pipelines_to_spawn: &[IndexingPipelineId], ctx: &ActorContext, ) -> Result, IndexingError> { - // We fetch the new indexes metadata. - let indexes_metadata_futures = added_pipeline_ids - .iter() - // No need to emit two request for the same `index_uid` - .unique_by(|pipeline_id| pipeline_id.index_uid.clone()) - .map(|pipeline_id| self.index_metadata(ctx, &pipeline_id.index_uid.index_id)); - let indexes_metadata = try_join_all(indexes_metadata_futures).await?; - let indexes_metadata_by_index_id: HashMap = indexes_metadata + let indexes_metadata = self.indexes_metadata(ctx, pipelines_to_spawn).await?; + + let per_index_uid_indexes_metadata: HashMap = indexes_metadata .into_iter() .map(|index_metadata| (index_metadata.index_uid.clone(), index_metadata)) .collect(); - let mut failed_spawning_pipeline_ids: Vec = Vec::new(); + let mut spawn_pipeline_failures: Vec = Vec::new(); - // Add new pipelines. - for new_pipeline_id in added_pipeline_ids { + for pipeline_to_spawn in pipelines_to_spawn { if let Some(index_metadata) = - indexes_metadata_by_index_id.get(&new_pipeline_id.index_uid) + per_index_uid_indexes_metadata.get(&pipeline_to_spawn.index_uid) { - if let Some(source_config) = index_metadata.sources.get(&new_pipeline_id.source_id) + if let Some(source_config) = + index_metadata.sources.get(&pipeline_to_spawn.source_id) { if let Err(error) = self .spawn_pipeline_inner( ctx, - new_pipeline_id.clone(), + pipeline_to_spawn.clone(), index_metadata.index_config.clone(), source_config.clone(), ) .await { - error!(pipeline_id=?new_pipeline_id, err=?error, "failed to spawn pipeline"); - failed_spawning_pipeline_ids.push(new_pipeline_id.clone()); + error!(pipeline_id=?pipeline_to_spawn, %error, "failed to spawn pipeline"); + spawn_pipeline_failures.push(pipeline_to_spawn.clone()); } } else { - error!(pipeline_id=?new_pipeline_id, "failed to spawn pipeline: source does not exist"); - failed_spawning_pipeline_ids.push(new_pipeline_id.clone()); + error!(pipeline_id=?pipeline_to_spawn, "failed to spawn pipeline: source not found"); + spawn_pipeline_failures.push(pipeline_to_spawn.clone()); } } else { error!( - "Failed to spawn pipeline: index {} no longer exists.", - &new_pipeline_id.index_uid.to_string() + "failed to spawn pipeline: index `{}` no longer exists", + pipeline_to_spawn.index_uid ); - failed_spawning_pipeline_ids.push(new_pipeline_id.clone()); + spawn_pipeline_failures.push(pipeline_to_spawn.clone()); } } - - Ok(failed_spawning_pipeline_ids) + Ok(spawn_pipeline_failures) } /// Shuts down the pipelines with supplied ids and performs necessary cleanup. - async fn shutdown_pipelines(&mut self, pipeline_uids: &[PipelineUid]) { - let should_gc_ingest_api_queues = pipeline_uids + async fn shutdown_pipelines(&mut self, pipelines_to_shutdown: &[PipelineUid]) { + let should_gc_ingest_api_queues = pipelines_to_shutdown .iter() .flat_map(|pipeline_uid| self.indexing_pipelines.get(pipeline_uid)) .any(|pipeline_handle| { pipeline_handle.indexing_pipeline_id.source_id == INGEST_API_SOURCE_ID }); - for &pipeline_uid_to_remove in pipeline_uids { - match self.detach_pipeline(pipeline_uid_to_remove).await { + for pipeline_to_shutdown in pipelines_to_shutdown { + match self.detach_indexing_pipeline(pipeline_to_shutdown).await { Ok(pipeline_handle) => { - // Killing the pipeline ensure that all pipeline actors will stop. + // Killing the pipeline ensures that all the pipeline actors will stop. pipeline_handle.kill().await; } Err(error) => { // Just log the detach error, it can only come from a missing pipeline in the // `indexing_pipeline_handles`. error!( - pipeline_id=?pipeline_uid_to_remove, - err=?error, - "Detach error.", + pipeline_uid=%pipeline_to_shutdown, + ?error, + "failed to detach indexing pipeline", ); } } @@ -660,42 +687,42 @@ impl IndexingService { if should_gc_ingest_api_queues { if let Err(error) = self.run_ingest_api_queues_gc().await { warn!( - err=?error, - "Ingest API queues garbage collect error.", + %error, + "failed to garbage collect ingest API queues", ); } } } - async fn update_cluster_running_indexing_tasks_in_chitchat(&self) { + /// Broadcasts the current running plan via chitchat. + async fn update_chitchat_running_plan(&self) { let mut indexing_tasks: Vec = self .indexing_pipelines .values() - .map(|handle| IndexingTask { - index_uid: Some(handle.indexing_pipeline_id.index_uid.clone()), - source_id: handle.indexing_pipeline_id.source_id.clone(), - pipeline_uid: Some(handle.indexing_pipeline_id.pipeline_uid), - shard_ids: handle + .map(|pipeline_handle| { + let shard_ids: Vec = pipeline_handle .handle .last_observation() .shard_ids .iter() .cloned() - .collect(), + .collect(); + + IndexingTask { + index_uid: Some(pipeline_handle.indexing_pipeline_id.index_uid.clone()), + source_id: pipeline_handle.indexing_pipeline_id.source_id.clone(), + pipeline_uid: Some(pipeline_handle.indexing_pipeline_id.pipeline_uid), + shard_ids, + } }) .collect(); + + // TODO: Does anybody why we sort the indexing tasks by pipeline_uid here? indexing_tasks.sort_unstable_by_key(|task| task.pipeline_uid); - if let Err(error) = self - .cluster + self.cluster .update_self_node_indexing_tasks(&indexing_tasks) - .await - { - error!( - "Error when updating the cluster state with indexing running tasks: {}", - error - ); - } + .await; } /// Garbage collects ingest API queues of deleted indexes. @@ -763,7 +790,8 @@ impl Handler for IndexingService { msg: ObservePipeline, _ctx: &ActorContext, ) -> Result { - let observation = self.observe_pipeline(msg.pipeline_id.pipeline_uid).await; + let pipeline_uid = msg.pipeline_id.pipeline_uid; + let observation = self.observe_pipeline(&pipeline_uid).await; Ok(observation) } } @@ -777,7 +805,9 @@ impl Handler for IndexingService { msg: DetachIndexingPipeline, _ctx: &ActorContext, ) -> Result { - Ok(self.detach_pipeline(msg.pipeline_id.pipeline_uid).await) + let pipeline_uid = msg.pipeline_id.pipeline_uid; + let detach_pipeline_result = self.detach_indexing_pipeline(&pipeline_uid).await; + Ok(detach_pipeline_result) } } @@ -875,6 +905,11 @@ impl Handler for IndexingService { } } +struct IndexingPipelineDiff { + pipelines_to_shutdown: Vec, + pipelines_to_spawn: Vec, +} + #[cfg(test)] mod tests { use std::num::NonZeroUsize; @@ -1325,7 +1360,7 @@ mod tests { } #[tokio::test] - async fn test_indexing_service_shut_down_merge_pipeline_when_no_indexing_pipeline() { + async fn test_indexing_service_shutdown_merge_pipeline_when_no_indexing_pipeline() { quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) @@ -1457,7 +1492,7 @@ mod tests { } #[tokio::test] - async fn test_indexing_service_does_not_shut_down_pipelines_on_indexing_pipeline_freeze() { + async fn test_indexing_service_does_not_shutdown_pipelines_on_indexing_pipeline_freeze() { quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index db01e9c48b8..4108887fcb9 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -49,8 +49,8 @@ pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_8, Version pub use metastore::postgres::PostgresqlMetastore; pub use metastore::{ file_backed, AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, - IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, - ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, + IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt, + ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, }; diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 5e4928794c2..7f16211c462 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -25,18 +25,18 @@ use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClie use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, - DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteSourceRequest, - DeleteSplitsRequest, DeleteTask, EmptyResponse, FindIndexTemplateMatchesRequest, - FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, - IndexMetadataRequest, IndexMetadataResponse, LastDeleteOpstampRequest, - LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, - ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, - MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, - OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, - UpdateSplitsDeleteOpstampResponse, + DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, + DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, + FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, + GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, + IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, + ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest, + ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, + ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, + ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService, + MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, + PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -86,6 +86,19 @@ impl MetastoreService for ControlPlaneMetastore { Ok(response) } + // Technically, proxying this call via the control plane is not necessary at the moment because + // it does not modify attributes the control plane cares about (retention policy, search + // settings). However, it would be easy to forget to do so when we add the ability to update the + // doc mapping or merge policy of an index, so we've already set up the proxy here since calling + // `update_index` is very infrequent anyway. + async fn update_index( + &mut self, + request: UpdateIndexRequest, + ) -> MetastoreResult { + let response = self.control_plane.update_index(request).await?; + Ok(response) + } + async fn delete_index( &mut self, request: DeleteIndexRequest, @@ -117,14 +130,6 @@ impl MetastoreService for ControlPlaneMetastore { // Other metastore API calls. - async fn update_index( - &mut self, - request: UpdateIndexRequest, - ) -> MetastoreResult { - let response = self.metastore.update_index(request).await?; - Ok(response) - } - async fn index_metadata( &mut self, request: IndexMetadataRequest, @@ -132,6 +137,13 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.index_metadata(request).await } + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> MetastoreResult { + self.metastore.indexes_metadata(request).await + } + async fn list_indexes_metadata( &mut self, request: ListIndexesMetadataRequest, @@ -244,7 +256,7 @@ impl MetastoreService for ControlPlaneMetastore { async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> MetastoreResult { + ) -> MetastoreResult { self.metastore.delete_shards(request).await } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 5f7f91c81e0..5e846d5a933 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -32,9 +32,9 @@ use itertools::Itertools; use quickwit_common::pretty::PrettySample; use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; use quickwit_proto::metastore::{ - AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteTask, - EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, - OpenShardSubrequest, OpenShardSubresponse, + AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, + DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, + MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, }; use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId}; use serde::{Deserialize, Serialize}; @@ -213,18 +213,14 @@ impl FileBackedIndex { &self.metadata } - /// Replaces the search settings in the index config, returning whether a mutation occurred. - pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { - let is_mutation = self.metadata.index_config.search_settings != search_settings; - self.metadata.index_config.search_settings = search_settings; - is_mutation - } - /// Replaces the retention policy in the index config, returning whether a mutation occurred. pub fn set_retention_policy(&mut self, retention_policy_opt: Option) -> bool { - let is_mutation = self.metadata.index_config.retention_policy_opt != retention_policy_opt; - self.metadata.index_config.retention_policy_opt = retention_policy_opt; - is_mutation + self.metadata.set_retention_policy(retention_policy_opt) + } + + /// Replaces the search settings in the index config, returning whether a mutation occurred. + pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { + self.metadata.set_search_settings(search_settings) } /// Stages a single split. @@ -625,7 +621,7 @@ impl FileBackedIndex { pub(crate) fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> MetastoreResult> { + ) -> MetastoreResult> { self.get_shards_for_source_mut(&request.source_id)? .delete_shards(request) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index 0f67db8b143..ba43cbc995d 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -21,10 +21,11 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; +use itertools::Itertools; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ - AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, EntityKind, - ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, + AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, DeleteShardsResponse, + EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, }; use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId}; @@ -135,9 +136,9 @@ impl Shards { entry.insert(shard.clone()); info!( - index_id=%self.index_uid.index_id, + index_uid=%self.index_uid, source_id=%self.source_id, - shard_id=%shard_id, + %shard_id, leader_id=%shard.leader_id, follower_id=?shard.follower_id, "opened shard" @@ -172,14 +173,15 @@ impl Shards { acquired_shards.push(shard.clone()); } else { warn!( - index_id=%self.index_uid.index_id, + index_uid=%self.index_uid, source_id=%self.source_id, - shard_id=%shard_id, + %shard_id, "shard not found" ); } } let response = AcquireShardsResponse { acquired_shards }; + if mutation_occurred { Ok(MutationOccurred::Yes(response)) } else { @@ -190,26 +192,48 @@ impl Shards { pub(super) fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> MetastoreResult> { + ) -> MetastoreResult> { + let mut successes = Vec::with_capacity(request.shard_ids.len()); + let mut failures = Vec::new(); let mut mutation_occurred = false; + for shard_id in request.shard_ids { if let Entry::Occupied(entry) = self.shards.entry(shard_id.clone()) { let shard = entry.get(); if !request.force && !shard.publish_position_inclusive().is_eof() { - warn!("shard `{shard_id}` is not deletable"); + failures.push(shard_id); continue; } info!( - index_id=%self.index_uid.index_id, + index_uid=%self.index_uid, source_id=%self.source_id, - shard_id=%shard_id, + %shard_id, "deleted shard", ); entry.remove(); mutation_occurred = true; } + successes.push(shard_id); + } + if !failures.is_empty() { + warn!( + index_uid=%self.index_uid, + source_id=%self.source_id, + "failed to delete shards `{}`: shards are not fully indexed", + failures.iter().join(", ") + ); + } + let response = DeleteShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + successes, + failures, + }; + if mutation_occurred { + Ok(MutationOccurred::Yes(response)) + } else { + Ok(MutationOccurred::No(response)) } - Ok(MutationOccurred::from(mutation_occurred)) } pub(super) fn list_shards( @@ -311,7 +335,7 @@ mod tests { }; let MutationOccurred::Yes(subresponse) = shards.open_shard(subrequest.clone()).unwrap() else { - panic!("Expected `MutationOccured::Yes`"); + panic!("expected `MutationOccured::Yes`"); }; assert_eq!(subresponse.subrequest_id, 0); @@ -420,7 +444,7 @@ mod tests { let mut shards = Shards::empty(index_uid.clone(), source_id.clone()); let request = AcquireShardsRequest { - index_uid: index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_ids: Vec::new(), publish_token: "test-publish-token".to_string(), @@ -431,7 +455,7 @@ mod tests { assert!(response.acquired_shards.is_empty()); let request = AcquireShardsRequest { - index_uid: index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_ids: vec![ShardId::from(0), ShardId::from(1)], publish_token: "test-publish-token".to_string(), @@ -444,7 +468,7 @@ mod tests { shards.shards.insert( ShardId::from(0), Shard { - index_uid: index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, @@ -454,7 +478,7 @@ mod tests { ); let MutationOccurred::Yes(response) = shards.acquire_shards(request.clone()).unwrap() else { - panic!("Expected `MutationOccured::Yes`"); + panic!("expected `MutationOccured::Yes`"); }; assert_eq!(response.acquired_shards.len(), 1); let acquired_shard = &response.acquired_shards[0]; @@ -477,27 +501,38 @@ mod tests { let mut shards = Shards::empty(index_uid.clone(), source_id.clone()); let request = DeleteShardsRequest { - index_uid: index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_ids: Vec::new(), force: false, }; - let response = shards.delete_shards(request).unwrap(); - assert!(matches!(response, MutationOccurred::No(()))); + let MutationOccurred::No(response) = shards.delete_shards(request).unwrap() else { + panic!("expected `MutationOccured::No`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + assert!(response.successes.is_empty()); + assert!(response.failures.is_empty()); let request = DeleteShardsRequest { - index_uid: index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_ids: vec![ShardId::from(0)], force: false, }; - let response = shards.delete_shards(request.clone()).unwrap(); - assert!(matches!(response, MutationOccurred::No(()))); + let MutationOccurred::No(response) = shards.delete_shards(request).unwrap() else { + panic!("expected `MutationOccured::No`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.successes[0], ShardId::from(0)); + assert!(response.failures.is_empty()); shards.shards.insert( ShardId::from(0), Shard { - index_uid: index_uid.clone().into(), + index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_id: Some(ShardId::from(0)), shard_state: ShardState::Open as i32, @@ -505,8 +540,47 @@ mod tests { ..Default::default() }, ); - let response = shards.delete_shards(request).unwrap(); - assert!(matches!(response, MutationOccurred::Yes(()))); + shards.shards.insert( + ShardId::from(1), + Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::offset(0u64)), + ..Default::default() + }, + ); + let request = DeleteShardsRequest { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_ids: vec![ShardId::from(0), ShardId::from(1)], + force: false, + }; + let MutationOccurred::Yes(response) = shards.delete_shards(request).unwrap() else { + panic!("expected `MutationOccured::Yes`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.successes[0], ShardId::from(0)); + assert_eq!(response.failures.len(), 1); + assert_eq!(response.failures[0], ShardId::from(1)); + + let request = DeleteShardsRequest { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_ids: vec![ShardId::from(1)], + force: true, + }; + let MutationOccurred::Yes(response) = shards.delete_shards(request).unwrap() else { + panic!("expected `MutationOccured::Yes`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.successes[0], ShardId::from(1)); + assert!(response.failures.is_empty()); assert!(shards.shards.is_empty()); } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index a0ec0f51264..58ea6e8302b 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -39,23 +39,27 @@ use std::time::Duration; use async_trait::async_trait; use futures::future::try_join_all; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use itertools::Itertools; use quickwit_common::ServiceStream; use quickwit_config::IndexTemplate; use quickwit_proto::metastore::{ serde_utils, AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, - DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteSourceRequest, - DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest, - FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, - IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, LastDeleteOpstampRequest, - LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, - ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, - MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, - OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, - StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, + DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, + FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, + GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, + IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest, + IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, + ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest, + ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, + ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, + ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, + MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, + OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, + ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid}; @@ -72,8 +76,8 @@ use self::state::MetastoreState; use self::store_operations::{delete_index, index_exists, load_index, put_index}; use super::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt, - ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, - PublishSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, + IndexesMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, + ListSplitsResponseExt, PublishSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, STREAM_SPLITS_CHUNK_SIZE, }; use crate::checkpoint::IndexCheckpointDelta; @@ -335,9 +339,46 @@ impl FileBackedMetastore { Ok(index_mutex) } + async fn index_metadata_inner( + &mut self, + index_id_opt: Option, + index_uid_opt: Option, + ) -> Result, Option)> { + let index_id = if let Some(index_id) = &index_id_opt { + index_id + } else if let Some(index_uid) = &index_uid_opt { + &index_uid.index_id + } else { + let message = "invalid request: neither `index_id` nor `index_uid` is set".to_string(); + let metastore_error = MetastoreError::Internal { + message, + cause: "".to_string(), + }; + return Err((metastore_error, index_id_opt, index_uid_opt)); + }; + let index_metadata = match self + .read_any(index_id, |index| Ok(index.metadata().clone())) + .await + { + Ok(index_metadata) => index_metadata, + Err(metastore_error) => { + return Err((metastore_error, index_id_opt, index_uid_opt)); + } + }; + if let Some(index_uid) = &index_uid_opt { + if index_metadata.index_uid != *index_uid { + let metastore_error = MetastoreError::NotFound(EntityKind::Index { + index_id: index_id.to_string(), + }); + return Err((metastore_error, index_id_opt, index_uid_opt)); + } + } + Ok(index_metadata) + } + /// Returns the list of splits for the given request. /// No error is returned if any of the requested `index_uid` does not exist. - async fn inner_list_splits(&self, request: ListSplitsRequest) -> MetastoreResult> { + async fn list_splits_inner(&self, request: ListSplitsRequest) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; let mut all_splits = Vec::new(); for index_uid in &list_splits_query.index_uids { @@ -461,22 +502,25 @@ impl MetastoreService for FileBackedMetastore { &mut self, request: UpdateIndexRequest, ) -> MetastoreResult { - let search_settings = request.deserialize_search_settings()?; let retention_policy_opt = request.deserialize_retention_policy()?; + let search_settings = request.deserialize_search_settings()?; let index_uid = request.index_uid(); - let metadata = self + let index_metadata = self .mutate(index_uid, |index| { - let search_settings_mutated = index.set_search_settings(search_settings); - let retention_policy_mutated = index.set_retention_policy(retention_policy_opt); - if search_settings_mutated || retention_policy_mutated { - Ok(MutationOccurred::Yes(index.metadata().clone())) + let mut mutation_occurred = index.set_retention_policy(retention_policy_opt); + mutation_occurred |= index.set_search_settings(search_settings); + + let index_metadata = index.metadata().clone(); + + if mutation_occurred { + Ok(MutationOccurred::Yes(index_metadata)) } else { - Ok(MutationOccurred::No(index.metadata().clone())) + Ok(MutationOccurred::No(index_metadata)) } }) .await?; - IndexMetadataResponse::try_from_index_metadata(&metadata) + IndexMetadataResponse::try_from_index_metadata(&index_metadata) } async fn delete_index( @@ -694,7 +738,7 @@ impl MetastoreService for FileBackedMetastore { &mut self, request: ListSplitsRequest, ) -> MetastoreResult> { - let splits = self.inner_list_splits(request).await?; + let splits = self.list_splits_inner(request).await?; let splits_responses: Vec> = splits .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| ListSplitsResponse::try_from_splits(chunk.to_vec())) @@ -715,7 +759,7 @@ impl MetastoreService for FileBackedMetastore { .with_limit(request.num_splits as usize); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?; - let splits = self.inner_list_splits(list_splits_request).await?; + let splits = self.list_splits_inner(list_splits_request).await?; ListSplitsResponse::try_from_splits(splits) } @@ -723,18 +767,57 @@ impl MetastoreService for FileBackedMetastore { &mut self, request: IndexMetadataRequest, ) -> MetastoreResult { - let index_id = request.get_index_id()?; let index_metadata = self - .read_any(&index_id, |index| Ok(index.metadata().clone())) - .await?; - if let Some(index_uid) = &request.index_uid { - if index_metadata.index_uid != *index_uid { - return Err(MetastoreError::NotFound(EntityKind::Index { - index_id: index_id.to_string(), - })); + .index_metadata_inner(request.index_id, request.index_uid) + .await + .map_err(|(metastore_error, _index_id_opt, _index_uid_opt)| metastore_error)?; + let response = IndexMetadataResponse::try_from_index_metadata(&index_metadata)?; + Ok(response) + } + + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> MetastoreResult { + let mut indexes_metadata: Vec = + Vec::with_capacity(request.subrequests.len()); + let mut failures: Vec = Vec::new(); + + let mut index_metadata_futures = FuturesUnordered::new(); + + for subrequest in request.subrequests { + let mut metastore = self.clone(); + let index_metadata_future = async move { + metastore + .index_metadata_inner(subrequest.index_id, subrequest.index_uid) + .await + }; + index_metadata_futures.push(index_metadata_future); + } + while let Some(index_metadata_result) = index_metadata_futures.next().await { + match index_metadata_result { + Ok(index_metadata) => indexes_metadata.push(index_metadata), + Err((MetastoreError::NotFound(_), index_id, index_uid)) => { + let failure = IndexMetadataFailure { + index_id, + index_uid, + reason: IndexMetadataFailureReason::NotFound as i32, + }; + failures.push(failure) + } + // All other errors are considered internal errors. + Err((_metastore_error, index_id, index_uid)) => { + let failure = IndexMetadataFailure { + index_id, + index_uid, + reason: IndexMetadataFailureReason::Internal as i32, + }; + failures.push(failure) + } } } - let response = IndexMetadataResponse::try_from_index_metadata(&index_metadata)?; + let response = + IndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata, failures).await?; Ok(response) } @@ -815,11 +898,12 @@ impl MetastoreService for FileBackedMetastore { async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> MetastoreResult { + ) -> MetastoreResult { let index_uid = request.index_uid().clone(); - self.mutate(&index_uid, |index| index.delete_shards(request)) + let response = self + .mutate(&index_uid, |index| index.delete_shards(request)) .await?; - Ok(EmptyResponse {}) + Ok(response) } async fn list_shards( diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index 501e487fa95..36db32f24e7 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -23,7 +23,9 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use quickwit_common::uri::Uri; -use quickwit_config::{IndexConfig, SourceConfig, TestableForRegression}; +use quickwit_config::{ + IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, TestableForRegression, +}; use quickwit_proto::metastore::{EntityKind, MetastoreError, MetastoreResult}; use quickwit_proto::types::{IndexUid, Position, SourceId}; use serde::{Deserialize, Serialize}; @@ -99,6 +101,26 @@ impl IndexMetadata { &self.index_config().index_uri } + /// Replaces or removes the current retention policy, returning whether a mutation occurred. + pub fn set_retention_policy(&mut self, retention_policy_opt: Option) -> bool { + if self.index_config.retention_policy_opt != retention_policy_opt { + self.index_config.retention_policy_opt = retention_policy_opt; + true + } else { + false + } + } + + /// Replaces or removes the current search settings, returning whether a mutation occurred. + pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { + if self.index_config.search_settings != search_settings { + self.index_config.search_settings = search_settings; + true + } else { + false + } + } + /// Adds a source to the index. Returns an error if the source already exists. pub fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> { match self.sources.entry(source_config.source_id.clone()) { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 3a231f0029c..8fe8c173ca5 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -35,9 +35,10 @@ use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, - IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, - ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, + IndexMetadataFailure, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataResponse, + ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse, MetastoreError, + MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, + PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -255,6 +256,63 @@ impl IndexMetadataResponseExt for IndexMetadataResponse { } } +/// Helper trait to build a [`IndexesMetadataResponse`] and deserialize its payload. +#[async_trait] +pub trait IndexesMetadataResponseExt { + /// Creates a new `IndexesMetadataResponse` from a `Vec` of [`IndexMetadata`]. + async fn try_from_indexes_metadata( + indexes_metadata: Vec, + failures: Vec, + ) -> MetastoreResult; + + /// Deserializes the payload of an `IndexesMetadataResponse` into a `Vec`` of [`IndexMetadata`]. + async fn deserialize_indexes_metadata(self) -> MetastoreResult>; + + /// Creates a new `IndexesMetadataResponse` from a `Vec` of [`IndexMetadata`] synchronously. + #[cfg(any(test, feature = "testsuite"))] + fn for_test( + indexes_metadata: Vec, + failures: Vec, + ) -> IndexesMetadataResponse { + use futures::executor; + + executor::block_on(Self::try_from_indexes_metadata(indexes_metadata, failures)).unwrap() + } +} + +#[async_trait] +impl IndexesMetadataResponseExt for IndexesMetadataResponse { + async fn try_from_indexes_metadata( + indexes_metadata: Vec, + failures: Vec, + ) -> MetastoreResult { + let indexes_metadata_json_zstd = tokio::task::spawn_blocking(move || { + serde_utils::to_json_zstd(&indexes_metadata, 0).map(Bytes::from) + }) + .await + .map_err(|join_error| MetastoreError::Internal { + message: "failed to serialize indexes metadata".to_string(), + cause: join_error.to_string(), + })??; + let response = Self { + indexes_metadata_json_zstd, + failures, + }; + Ok(response) + } + + async fn deserialize_indexes_metadata(self) -> MetastoreResult> { + tokio::task::spawn_blocking(move || { + serde_utils::from_json_zstd(&self.indexes_metadata_json_zstd) + }) + .await + .map_err(|join_error| MetastoreError::Internal { + message: "failed to deserialize indexes metadata".to_string(), + cause: join_error.to_string(), + })? + } +} + /// Helper trait to build a `ListIndexesResponse` and deserialize its payload. #[async_trait] pub trait ListIndexesMetadataResponseExt { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 9202c24cbea..5c668eb51d9 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -21,6 +21,7 @@ use std::fmt::{self, Write}; use async_trait::async_trait; use futures::StreamExt; +use itertools::Itertools; use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; @@ -32,20 +33,22 @@ use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ serde_utils, AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest, - DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteSourceRequest, - DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest, - FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, - IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, LastDeleteOpstampRequest, - LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse, - ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, + DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, + DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind, + FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, + GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, + IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest, + IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, + ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest, + ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, + ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest, + ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, + MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; -use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, SourceId}; +use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId}; use sea_query::{Asterisk, PostgresQueryBuilder, Query}; use sea_query_binder::SqlxBinder; use sqlx::{Acquire, Executor, Postgres, Transaction}; @@ -62,7 +65,9 @@ use crate::checkpoint::{ }; use crate::file_backed::MutationOccurred; use crate::metastore::postgres::utils::split_maturity_timestamp; -use crate::metastore::{PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE}; +use crate::metastore::{ + IndexesMetadataResponseExt, PublishSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, +}; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, @@ -155,7 +160,7 @@ where FOR UPDATE "#, ) - .bind(index_uid.to_string()) + .bind(&index_uid) .fetch_optional(executor) .await?; Ok(index_opt) @@ -201,7 +206,7 @@ async fn try_apply_delta_v2( FOR UPDATE "#, ) - .bind(index_uid.to_string()) + .bind(index_uid) .bind(source_id) .bind(shard_ids) .fetch_all(tx.as_mut()) @@ -255,7 +260,7 @@ async fn try_apply_delta_v2( AND shards.shard_id = new_positions.shard_id "#, ) - .bind(index_uid.to_string()) + .bind(index_uid) .bind(source_id) .bind(shard_ids) .bind(new_positions) @@ -279,10 +284,10 @@ macro_rules! run_with_tx { let op_fut = move || async move { $x }; let op_result: MetastoreResult<_> = op_fut().await; if op_result.is_ok() { - debug!("commit"); + debug!("committing transaction"); tx.commit().await?; } else { - warn!("rollback"); + warn!("rolling transaction back"); tx.rollback().await?; } op_result @@ -324,7 +329,7 @@ where "#, ) .bind(index_metadata_json) - .bind(index_uid.to_string()) + .bind(&index_uid) .execute(tx.as_mut()) .await?; if update_index_res.rows_affected() == 0 { @@ -346,29 +351,12 @@ impl MetastoreService for PostgresqlMetastore { vec![self.uri.clone()] } - #[instrument(skip(self))] - async fn list_indexes_metadata( - &mut self, - request: ListIndexesMetadataRequest, - ) -> MetastoreResult { - let sql = - build_index_id_patterns_sql_query(&request.index_id_patterns).map_err(|error| { - MetastoreError::Internal { - message: "failed to build `list_indexes_metadata` SQL query".to_string(), - cause: error.to_string(), - } - })?; - let pg_indexes = sqlx::query_as::<_, PgIndex>(&sql) - .fetch_all(&self.connection_pool) - .await?; - let indexes_metadata = pg_indexes - .into_iter() - .map(|pg_index| pg_index.index_metadata()) - .collect::>>()?; - let response = - ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata).await?; - Ok(response) - } + // Index API: + // - `create_index` + // - `update_index` + // - `index_metadata` + // - `indexes_metadata` + // - `list_indexes_metadata` #[instrument(skip(self))] async fn create_index( @@ -409,21 +397,139 @@ impl MetastoreService for PostgresqlMetastore { let retention_policy_opt = request.deserialize_retention_policy()?; let search_settings = request.deserialize_search_settings()?; let index_uid: IndexUid = request.index_uid().clone(); - let updated_metadata = run_with_tx!(self.connection_pool, tx, { + let updated_index_metadata = run_with_tx!(self.connection_pool, tx, { mutate_index_metadata::(tx, index_uid, |index_metadata| { - if index_metadata.index_config.search_settings != search_settings - || index_metadata.index_config.retention_policy_opt != retention_policy_opt - { - index_metadata.index_config.search_settings = search_settings; - index_metadata.index_config.retention_policy_opt = retention_policy_opt; - Ok(MutationOccurred::Yes(())) - } else { - Ok(MutationOccurred::No(())) - } + let mut mutation_occurred = + index_metadata.set_retention_policy(retention_policy_opt); + mutation_occurred |= index_metadata.set_search_settings(search_settings); + Ok(MutationOccurred::from(mutation_occurred)) }) .await })?; - IndexMetadataResponse::try_from_index_metadata(&updated_metadata) + IndexMetadataResponse::try_from_index_metadata(&updated_index_metadata) + } + + #[instrument(skip(self))] + async fn index_metadata( + &mut self, + request: IndexMetadataRequest, + ) -> MetastoreResult { + let response = if let Some(index_uid) = &request.index_uid { + index_opt_for_uid(&self.connection_pool, index_uid.clone()).await? + } else if let Some(index_id) = &request.index_id { + index_opt(&self.connection_pool, index_id).await? + } else { + let message = "invalid request: neither `index_id` nor `index_uid` is set".to_string(); + return Err(MetastoreError::Internal { + message, + cause: "".to_string(), + }); + }; + let index_metadata = response + .ok_or(MetastoreError::NotFound(EntityKind::Index { + index_id: request.index_id.expect("`index_id` should be set"), + }))? + .index_metadata()?; + let response = IndexMetadataResponse::try_from_index_metadata(&index_metadata)?; + Ok(response) + } + + #[instrument(skip(self))] + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> MetastoreResult { + const INDEXES_METADATA_QUERY: &str = include_str!("queries/indexes_metadata.sql"); + + let num_subrequests = request.subrequests.len(); + + if num_subrequests == 0 { + return Ok(Default::default()); + } + let mut index_ids: Vec = Vec::new(); + let mut index_uids: Vec = Vec::with_capacity(num_subrequests); + let mut failures: Vec = Vec::new(); + + for subrequest in request.subrequests { + if let Some(index_id) = subrequest.index_id { + index_ids.push(index_id); + } else if let Some(index_uid) = subrequest.index_uid { + index_uids.push(index_uid); + } else { + let failure = IndexMetadataFailure { + index_id: subrequest.index_id, + index_uid: subrequest.index_uid, + reason: IndexMetadataFailureReason::Internal as i32, + }; + failures.push(failure); + } + } + let pg_indexes: Vec = sqlx::query_as::<_, PgIndex>(INDEXES_METADATA_QUERY) + .bind(&index_ids) + .bind(&index_uids) + .fetch_all(&self.connection_pool) + .await?; + + let indexes_metadata: Vec = pg_indexes + .iter() + .map(|pg_index| pg_index.index_metadata()) + .collect::>()?; + + if pg_indexes.len() + failures.len() < num_subrequests { + for index_id in index_ids { + if pg_indexes + .iter() + .all(|pg_index| pg_index.index_id != index_id) + { + let failure = IndexMetadataFailure { + index_id: Some(index_id), + index_uid: None, + reason: IndexMetadataFailureReason::NotFound as i32, + }; + failures.push(failure); + } + } + for index_uid in index_uids { + if pg_indexes + .iter() + .all(|pg_index| pg_index.index_uid != index_uid) + { + let failure = IndexMetadataFailure { + index_id: None, + index_uid: Some(index_uid), + reason: IndexMetadataFailureReason::NotFound as i32, + }; + failures.push(failure); + } + } + } + let response = + IndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata, failures).await?; + Ok(response) + } + + #[instrument(skip(self))] + async fn list_indexes_metadata( + &mut self, + request: ListIndexesMetadataRequest, + ) -> MetastoreResult { + let sql = + build_index_id_patterns_sql_query(&request.index_id_patterns).map_err(|error| { + MetastoreError::Internal { + message: "failed to build `list_indexes_metadata` SQL query".to_string(), + cause: error.to_string(), + } + })?; + let pg_indexes = sqlx::query_as::<_, PgIndex>(&sql) + .fetch_all(&self.connection_pool) + .await?; + let indexes_metadata: Vec = pg_indexes + .into_iter() + .map(|pg_index| pg_index.index_metadata()) + .collect::>()?; + let response = + ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata).await?; + Ok(response) } #[instrument(skip_all, fields(index_id=%request.index_uid()))] @@ -433,7 +539,7 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let index_uid: IndexUid = request.index_uid().clone(); let delete_result = sqlx::query("DELETE FROM indexes WHERE index_uid = $1") - .bind(index_uid.to_string()) + .bind(&index_uid) .execute(&self.connection_pool) .await?; // FIXME: This is not idempotent. @@ -526,7 +632,7 @@ impl MetastoreService for PostgresqlMetastore { .bind(delete_opstamps) .bind(maturity_timestamps) .bind(SplitState::Staged.as_str()) - .bind(index_uid.to_string()) + .bind(&index_uid) .fetch_all(tx.as_mut()) .await .map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?; @@ -682,7 +788,7 @@ impl MetastoreService for PostgresqlMetastore { not_marked_split_ids, ): (i64, i64, Vec, Vec, Vec) = sqlx::query_as(PUBLISH_SPLITS_QUERY) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(index_metadata_json) .bind(staged_split_ids) .bind(replaced_split_ids) @@ -812,7 +918,7 @@ impl MetastoreService for PostgresqlMetastore { "#; let (num_found_splits, num_marked_splits, not_found_split_ids): (i64, i64, Vec) = sqlx::query_as(MARK_SPLITS_FOR_DELETION_QUERY) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(split_ids.clone()) .fetch_one(&self.connection_pool) .await @@ -895,7 +1001,7 @@ impl MetastoreService for PostgresqlMetastore { Vec, Vec, ) = sqlx::query_as(DELETE_SPLITS_QUERY) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(split_ids) .fetch_one(&self.connection_pool) .await @@ -933,32 +1039,6 @@ impl MetastoreService for PostgresqlMetastore { Ok(EmptyResponse {}) } - #[instrument(skip(self))] - async fn index_metadata( - &mut self, - request: IndexMetadataRequest, - ) -> MetastoreResult { - let response = if let Some(index_uid) = &request.index_uid { - index_opt_for_uid(&self.connection_pool, index_uid.clone()).await? - } else if let Some(index_id) = &request.index_id { - index_opt(&self.connection_pool, index_id).await? - } else { - return Err(MetastoreError::Internal { - message: "either `index_id` or `index_uid` must be set".to_string(), - cause: "missing index identifier".to_string(), - }); - }; - let index_metadata = response - .ok_or({ - MetastoreError::NotFound(EntityKind::Index { - index_id: request.get_index_id().expect("index_id is set").to_string(), - }) - })? - .index_metadata()?; - let response = IndexMetadataResponse::try_from_index_metadata(&index_metadata)?; - Ok(response) - } - #[instrument(skip(self))] async fn add_source(&mut self, request: AddSourceRequest) -> MetastoreResult { let source_config = request.deserialize_source_config()?; @@ -1015,7 +1095,7 @@ impl MetastoreService for PostgresqlMetastore { AND source_id = $2 "#, ) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(source_id) .execute(tx.as_mut()) .await?; @@ -1057,7 +1137,7 @@ impl MetastoreService for PostgresqlMetastore { WHERE index_uid = $1 "#, ) - .bind(request.index_uid().to_string()) + .bind(request.index_uid()) .fetch_one(&self.connection_pool) .await .map_err(|error| MetastoreError::Db { @@ -1126,7 +1206,7 @@ impl MetastoreService for PostgresqlMetastore { "#, ) .bind(request.delete_opstamp as i64) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(split_ids) .execute(&self.connection_pool) .await?; @@ -1159,7 +1239,7 @@ impl MetastoreService for PostgresqlMetastore { AND opstamp > $2 "#, ) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(request.opstamp_start as i64) .fetch_all(&self.connection_pool) .await?; @@ -1192,7 +1272,7 @@ impl MetastoreService for PostgresqlMetastore { LIMIT $4 "#, ) - .bind(index_uid.to_string()) + .bind(&index_uid) .bind(request.delete_opstamp as i64) .bind(SplitState::Published.as_str()) .bind(request.num_splits as i64) @@ -1207,6 +1287,7 @@ impl MetastoreService for PostgresqlMetastore { Ok(response) } + // TODO: Issue a single SQL query. async fn open_shards( &mut self, request: OpenShardsRequest, @@ -1233,16 +1314,11 @@ impl MetastoreService for PostgresqlMetastore { if request.shard_ids.is_empty() { return Ok(Default::default()); } - let shard_ids: Vec<&str> = request - .shard_ids - .iter() - .map(|shard_id| shard_id.as_str()) - .collect(); let pg_shards: Vec = sqlx::query_as(ACQUIRE_SHARDS_QUERY) - .bind(request.index_uid().to_string()) + .bind(request.index_uid()) .bind(&request.source_id) - .bind(&shard_ids) - .bind(request.publish_token) + .bind(&request.shard_ids) + .bind(&request.publish_token) .fetch_all(&self.connection_pool) .await?; let acquired_shards = pg_shards @@ -1253,6 +1329,7 @@ impl MetastoreService for PostgresqlMetastore { Ok(response) } + // TODO: Issue a single SQL query. async fn list_shards( &mut self, request: ListShardsRequest, @@ -1292,34 +1369,73 @@ impl MetastoreService for PostgresqlMetastore { async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> MetastoreResult { + ) -> MetastoreResult { const DELETE_SHARDS_QUERY: &str = include_str!("queries/shards/delete.sql"); + const FIND_NOT_DELETABLE_SHARDS_QUERY: &str = + include_str!("queries/shards/find_not_deletable.sql"); + if request.shard_ids.is_empty() { return Ok(Default::default()); } - let shard_ids: Vec<&str> = request - .shard_ids - .iter() - .map(|shard_id| shard_id.as_str()) - .collect(); - let pg_shards: Vec = sqlx::query_as(DELETE_SHARDS_QUERY) - .bind(request.index_uid().to_string()) + let query_result = sqlx::query(DELETE_SHARDS_QUERY) + .bind(request.index_uid()) .bind(&request.source_id) - .bind(&shard_ids) + .bind(&request.shard_ids) .bind(request.force) + .execute(&self.connection_pool) + .await?; + + // Happy path: all shards were deleted. + if request.force || query_result.rows_affected() == request.shard_ids.len() as u64 { + let response = DeleteShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + successes: request.shard_ids, + failures: Vec::new(), + }; + return Ok(response); + } + // Unhappy path: some shards were not deleted because they do not exist or are not fully + // indexed. + let not_deletable_pg_shards: Vec = sqlx::query_as(FIND_NOT_DELETABLE_SHARDS_QUERY) + .bind(request.index_uid()) + .bind(&request.source_id) + .bind(&request.shard_ids) .fetch_all(&self.connection_pool) .await?; - if !request.force - && pg_shards.into_iter().any(|pg_shard| { - let position: Position = pg_shard.publish_position_inclusive.into(); - position.is_eof() - }) - { - let message = "failed to delete shard ``: shard is not fully indexed".to_string(); - return Err(MetastoreError::InvalidArgument { message }); + + if not_deletable_pg_shards.is_empty() { + let response = DeleteShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + successes: request.shard_ids, + failures: Vec::new(), + }; + return Ok(response); } - Ok(EmptyResponse {}) + let failures: Vec = not_deletable_pg_shards + .into_iter() + .map(|pg_shard| pg_shard.shard_id) + .collect(); + warn!( + index_uid=%request.index_uid(), + source_id=%request.source_id, + "failed to delete shards `{}`: shards are not fully indexed", + failures.iter().join(", ") + ); + let successes: Vec = request + .shard_ids + .into_iter() + .filter(|shard_id| !failures.contains(shard_id)) + .collect(); + let response = DeleteShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + successes, + failures, + }; + Ok(response) } // Index Template API @@ -1634,7 +1750,7 @@ mod tests { for shard in shards { sqlx::query(INSERT_SHARD_QUERY) - .bind(index_uid.to_string()) + .bind(index_uid) .bind(source_id) .bind(shard.shard_id().as_str()) .bind(shard.shard_state().as_json_str_name()) @@ -1658,7 +1774,7 @@ mod tests { AND source_id = $2 "#, ) - .bind(index_uid.to_string()) + .bind(index_uid) .bind(source_id.as_str()) .fetch_all(&self.connection_pool) .await diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/indexes_metadata.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/indexes_metadata.sql new file mode 100644 index 00000000000..3c6b8f472d2 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/indexes_metadata.sql @@ -0,0 +1,7 @@ +SELECT + * +FROM + indexes +WHERE + index_id = ANY ($1) + OR index_uid = ANY ($2) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/delete.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/delete.sql index 2fc490fa008..e45808d5623 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/delete.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/delete.sql @@ -2,5 +2,5 @@ DELETE FROM shards WHERE index_uid = $1 AND source_id = $2 AND shard_id = ANY ($3) - AND ($4 = TRUE + AND ($4 OR publish_position_inclusive LIKE '~%') diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/find_not_deletable.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/find_not_deletable.sql new file mode 100644 index 00000000000..893fe596dfb --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/find_not_deletable.sql @@ -0,0 +1,9 @@ +SELECT + * +FROM + shards +WHERE + index_uid = $1 + AND source_id = $2 + AND shard_id = ANY ($3) + AND publish_position_inclusive NOT LIKE '~%' diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index a75ff115297..48dbcd14655 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -33,17 +33,19 @@ use quickwit_config::{ }; use quickwit_doc_mapper::FieldMappingType; use quickwit_proto::metastore::{ - CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, - ListIndexesMetadataRequest, MetastoreError, MetastoreService, StageSplitsRequest, - UpdateIndexRequest, + CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataFailure, + IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, + IndexesMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, + StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::IndexUid; use super::DefaultForTest; use crate::tests::cleanup_index; use crate::{ - CreateIndexRequestExt, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, - MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, UpdateIndexRequestExt, + CreateIndexRequestExt, IndexMetadataResponseExt, IndexesMetadataResponseExt, + ListIndexesMetadataResponseExt, MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, + UpdateIndexRequestExt, }; pub async fn test_metastore_create_index< @@ -306,6 +308,104 @@ pub async fn test_metastore_index_metadata< cleanup_index(&mut metastore, index_uid).await; } +pub async fn test_metastore_indexes_metadata< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let index_id_0 = append_random_suffix("test-indexes-metadata-0"); + let index_uri_0 = format!("ram:///indexes/{index_id_0}"); + let index_config_0 = IndexConfig::for_test(&index_id_0, &index_uri_0); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_0).unwrap(); + let index_uid_0: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let index_id_1 = append_random_suffix("test-indexes-metadata-1"); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let indexes_metadata_request = IndexesMetadataRequest { + subrequests: vec![ + IndexMetadataSubrequest { + index_id: None, + index_uid: None, + }, + IndexMetadataSubrequest { + index_id: Some(index_id_0.clone()), + index_uid: None, + }, + IndexMetadataSubrequest { + index_id: Some("test-indexes-metadata-foo".to_string()), + index_uid: None, + }, + IndexMetadataSubrequest { + index_id: None, + index_uid: Some(index_uid_1.clone()), + }, + IndexMetadataSubrequest { + index_id: None, + index_uid: Some(IndexUid::for_test("test-indexes-metadata-bar", 123)), + }, + ], + }; + let mut indexes_metadata_response = metastore + .indexes_metadata(indexes_metadata_request) + .await + .unwrap(); + + let failures = &mut indexes_metadata_response.failures; + assert_eq!(failures.len(), 3); + + failures.sort_by(|left, right| left.index_id().cmp(right.index_id())); + + let expected_failure_0 = IndexMetadataFailure { + index_id: None, + index_uid: None, + reason: IndexMetadataFailureReason::Internal as i32, + }; + assert_eq!(failures[0], expected_failure_0); + + let expected_failure_1 = IndexMetadataFailure { + index_id: None, + index_uid: Some(IndexUid::for_test("test-indexes-metadata-bar", 123)), + reason: IndexMetadataFailureReason::NotFound as i32, + }; + assert_eq!(failures[1], expected_failure_1); + + let expected_failure_2 = IndexMetadataFailure { + index_id: Some("test-indexes-metadata-foo".to_string()), + index_uid: None, + reason: IndexMetadataFailureReason::NotFound as i32, + }; + assert_eq!(failures[2], expected_failure_2); + + let mut indexes_metadata = indexes_metadata_response + .deserialize_indexes_metadata() + .await + .unwrap(); + assert_eq!(indexes_metadata.len(), 2); + + indexes_metadata.sort_by(|left, right| left.index_id().cmp(right.index_id())); + assert_eq!(indexes_metadata[0].index_id(), index_id_0); + assert_eq!(indexes_metadata[1].index_id(), index_id_1); + + cleanup_index(&mut metastore, index_uid_0).await; + cleanup_index(&mut metastore, index_uid_1).await; +} + pub async fn test_metastore_list_all_indexes< MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 3caf221a569..ecabfe513ac 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -175,8 +175,10 @@ macro_rules! metastore_test_suite { // Index API tests // // - create_index + // - update_index // - index_exists // - index_metadata + // - indexes_metadata // - list_indexes // - delete_index @@ -187,15 +189,15 @@ macro_rules! metastore_test_suite { } #[tokio::test] - async fn test_metastore_update_index() { + async fn test_metastore_create_index_with_sources() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; + $crate::tests::index::test_metastore_create_index_with_sources::<$metastore_type>().await; } #[tokio::test] - async fn test_metastore_create_index_with_sources() { + async fn test_metastore_update_index() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_create_index_with_sources::<$metastore_type>().await; + $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; } #[tokio::test] @@ -219,6 +221,12 @@ macro_rules! metastore_test_suite { $crate::tests::index::test_metastore_index_metadata::<$metastore_type>().await; } + #[tokio::test] + async fn test_metastore_indexes_metadata() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_indexes_metadata::<$metastore_type>().await; + } + #[tokio::test] async fn test_metastore_list_indexes() { let _ = tracing_subscriber::fmt::try_init(); diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index be08ff559cc..0c8b47228fa 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -450,7 +450,20 @@ pub async fn test_metastore_delete_shards< ], force: false, }; - metastore.delete_shards(delete_index_request).await.unwrap(); + let mut response = metastore.delete_shards(delete_index_request).await.unwrap(); + + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + assert_eq!(response.successes.len(), 2); + assert_eq!(response.failures.len(), 2); + + response.successes.sort_unstable(); + assert_eq!(response.successes[0], ShardId::from(3)); + assert_eq!(response.successes[1], ShardId::from(4)); + + response.failures.sort_unstable(); + assert_eq!(response.failures[0], ShardId::from(1)); + assert_eq!(response.failures[1], ShardId::from(2)); let mut all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) @@ -474,11 +487,24 @@ pub async fn test_metastore_delete_shards< ], force: true, }; - metastore.delete_shards(delete_index_request).await.unwrap(); + let mut response = metastore.delete_shards(delete_index_request).await.unwrap(); + + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + + assert_eq!(response.successes.len(), 4); + assert_eq!(response.failures.len(), 0); + + response.successes.sort_unstable(); + assert_eq!(response.successes[0], ShardId::from(1)); + assert_eq!(response.successes[1], ShardId::from(2)); + assert_eq!(response.successes[2], ShardId::from(3)); + assert_eq!(response.successes[3], ShardId::from(4)); let all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) .await; + assert_eq!(all_shards.len(), 0); cleanup_index(&mut metastore, test_index.index_uid).await; diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 16985111bb5..9bfe67424c1 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -87,7 +87,10 @@ fn main() -> Result<(), Box> { // Metastore service. let mut prost_config = prost_build::Config::default(); prost_config - .bytes(["ListIndexesMetadataResponse.indexes_metadata_json_zstd"]) + .bytes([ + "IndexesMetadataResponse.indexes_metadata_json_zstd", + "ListIndexesMetadataResponse.indexes_metadata_json_zstd", + ]) .extern_path(".quickwit.ingest.ShardId", "crate::types::ShardId") .extern_path(".quickwit.common.IndexUid", "crate::types::IndexUid") .field_attribute("DeleteQuery.index_uid", "#[serde(alias = \"index_id\")]") diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index 2d5468cf0bd..963734cd157 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -32,6 +32,7 @@ service ControlPlaneService { // The following RPCs are forwarded and handled by the metastore: // - `create_index` + // - `update_index` // - `delete_index` // - `add_source` // - `toggle_source` @@ -42,6 +43,9 @@ service ControlPlaneService { // Creates a new index. rpc CreateIndex(quickwit.metastore.CreateIndexRequest) returns (quickwit.metastore.CreateIndexResponse); + // Updates an index. + rpc UpdateIndex(quickwit.metastore.UpdateIndexRequest) returns (quickwit.metastore.IndexMetadataResponse); + // Deletes an index. rpc DeleteIndex(quickwit.metastore.DeleteIndexRequest) returns (quickwit.metastore.EmptyResponse); diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 9c3d7e9a6a2..732f1c39f17 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -102,6 +102,9 @@ service MetastoreService { // Returns the `IndexMetadata` of an index identified by its IndexID or its IndexUID. rpc IndexMetadata(IndexMetadataRequest) returns (IndexMetadataResponse); + // Fetches the metadata of a list of indexes identified by their Index IDs or UIDs. + rpc IndexesMetadata(IndexesMetadataRequest) returns (IndexesMetadataResponse); + // Gets an indexes metadatas. rpc ListIndexesMetadata(ListIndexesMetadataRequest) returns (ListIndexesMetadataResponse); @@ -163,9 +166,9 @@ service MetastoreService { // list of acquired shards along with the positions to index from. rpc AcquireShards(AcquireShardsRequest) returns (AcquireShardsResponse); - // Deletes a set of shards. This RPC deletes the shards from the metastore and the storage. + // Deletes a set of shards. This RPC deletes the shards from the metastore. // If the shard did not exist to begin with, the operation is successful and does not return any error. - rpc DeleteShards(DeleteShardsRequest) returns (EmptyResponse); + rpc DeleteShards(DeleteShardsRequest) returns (DeleteShardsResponse); rpc ListShards(ListShardsRequest) returns (ListShardsResponse); @@ -220,6 +223,7 @@ message ListIndexesMetadataResponse { // Deprecated (v0.9.0), use `indexes_metadata_json_zstd` instead. optional string indexes_metadata_json_opt = 1; // A JSON serialized then ZSTD compressed list of `IndexMetadata`: `Vec | JSON | ZSTD`. + // We don't use `repeated` here to increase the compression rate and ratio. bytes indexes_metadata_json_zstd = 2; } @@ -240,6 +244,34 @@ message IndexMetadataResponse { string index_metadata_serialized_json = 1; } +message IndexesMetadataRequest { + repeated IndexMetadataSubrequest subrequests = 1; +} + +message IndexMetadataSubrequest { + optional string index_id = 1; + optional quickwit.common.IndexUid index_uid = 2; +} + +message IndexesMetadataResponse { + // A JSON serialized then ZSTD compressed list of `IndexMetadata`: `Vec | JSON | ZSTD`. + // We don't use `repeated` here to increase the compression rate and ratio. + bytes indexes_metadata_json_zstd = 1; + repeated IndexMetadataFailure failures = 2; +} + +message IndexMetadataFailure { + optional string index_id = 1; + optional quickwit.common.IndexUid index_uid = 2; + IndexMetadataFailureReason reason = 3; +} + +enum IndexMetadataFailureReason { + INDEX_METADATA_FAILURE_REASON_UNSPECIFIED = 0; + INDEX_METADATA_FAILURE_REASON_NOT_FOUND = 1; + INDEX_METADATA_FAILURE_REASON_INTERNAL = 2; +} + message ListSplitsRequest { // Predicate used to filter splits. // The predicate is expressed as a JSON serialized @@ -398,6 +430,16 @@ message DeleteShardsRequest { bool force = 4; } +message DeleteShardsResponse { + quickwit.common.IndexUid index_uid = 1; + string source_id = 2; + // List of shard IDs that were successfully deleted. + repeated quickwit.ingest.ShardId successes = 3; + // List of shard IDs that could not be deleted because `force` was set to `false` in the request, + // and the shards are not at EOF, i.e., not fully indexed. + repeated quickwit.ingest.ShardId failures = 4; +} + message ListShardsRequest { repeated ListShardsSubrequest subrequests = 1; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 7554977f89d..2fb173e079a 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -137,6 +137,13 @@ pub trait ControlPlaneService: std::fmt::Debug + dyn_clone::DynClone + Send + Sy &mut self, request: super::metastore::CreateIndexRequest, ) -> crate::control_plane::ControlPlaneResult; + /// Updates an index. + async fn update_index( + &mut self, + request: super::metastore::UpdateIndexRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::metastore::IndexMetadataResponse, + >; /// Deletes an index. async fn delete_index( &mut self, @@ -271,6 +278,14 @@ impl ControlPlaneService for ControlPlaneServiceClient { > { self.inner.create_index(request).await } + async fn update_index( + &mut self, + request: super::metastore::UpdateIndexRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::metastore::IndexMetadataResponse, + > { + self.inner.update_index(request).await + } async fn delete_index( &mut self, request: super::metastore::DeleteIndexRequest, @@ -325,6 +340,14 @@ pub mod mock_control_plane_service { > { self.inner.lock().await.create_index(request).await } + async fn update_index( + &mut self, + request: super::super::metastore::UpdateIndexRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::super::metastore::IndexMetadataResponse, + > { + self.inner.lock().await.update_index(request).await + } async fn delete_index( &mut self, request: super::super::metastore::DeleteIndexRequest, @@ -393,6 +416,23 @@ for Box { Box::pin(fut) } } +impl tower::Service +for Box { + type Response = super::metastore::IndexMetadataResponse; + type Error = crate::control_plane::ControlPlaneError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: super::metastore::UpdateIndexRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.update_index(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = super::metastore::EmptyResponse; @@ -502,6 +542,11 @@ struct ControlPlaneServiceTowerServiceStack { super::metastore::CreateIndexResponse, crate::control_plane::ControlPlaneError, >, + update_index_svc: quickwit_common::tower::BoxService< + super::metastore::UpdateIndexRequest, + super::metastore::IndexMetadataResponse, + crate::control_plane::ControlPlaneError, + >, delete_index_svc: quickwit_common::tower::BoxService< super::metastore::DeleteIndexRequest, super::metastore::EmptyResponse, @@ -538,6 +583,7 @@ impl Clone for ControlPlaneServiceTowerServiceStack { Self { inner: self.inner.clone(), create_index_svc: self.create_index_svc.clone(), + update_index_svc: self.update_index_svc.clone(), delete_index_svc: self.delete_index_svc.clone(), add_source_svc: self.add_source_svc.clone(), toggle_source_svc: self.toggle_source_svc.clone(), @@ -557,6 +603,14 @@ impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { > { self.create_index_svc.ready().await?.call(request).await } + async fn update_index( + &mut self, + request: super::metastore::UpdateIndexRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::metastore::IndexMetadataResponse, + > { + self.update_index_svc.ready().await?.call(request).await + } async fn delete_index( &mut self, request: super::metastore::DeleteIndexRequest, @@ -604,6 +658,16 @@ type CreateIndexLayer = quickwit_common::tower::BoxLayer< super::metastore::CreateIndexResponse, crate::control_plane::ControlPlaneError, >; +type UpdateIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::UpdateIndexRequest, + super::metastore::IndexMetadataResponse, + crate::control_plane::ControlPlaneError, + >, + super::metastore::UpdateIndexRequest, + super::metastore::IndexMetadataResponse, + crate::control_plane::ControlPlaneError, +>; type DeleteIndexLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< super::metastore::DeleteIndexRequest, @@ -667,6 +731,7 @@ type AdviseResetShardsLayer = quickwit_common::tower::BoxLayer< #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerLayerStack { create_index_layers: Vec, + update_index_layers: Vec, delete_index_layers: Vec, add_source_layers: Vec, toggle_source_layers: Vec, @@ -704,6 +769,33 @@ impl ControlPlaneServiceTowerLayerStack { >>::Service as tower::Service< super::metastore::CreateIndexRequest, >>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::UpdateIndexRequest, + super::metastore::IndexMetadataResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + super::metastore::UpdateIndexRequest, + Response = super::metastore::IndexMetadataResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service< + super::metastore::UpdateIndexRequest, + >>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< super::metastore::DeleteIndexRequest, @@ -867,6 +959,8 @@ impl ControlPlaneServiceTowerLayerStack { { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.update_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.delete_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.add_source_layers @@ -902,6 +996,27 @@ impl ControlPlaneServiceTowerLayerStack { self.create_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_update_index_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::UpdateIndexRequest, + super::metastore::IndexMetadataResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + super::metastore::UpdateIndexRequest, + Response = super::metastore::IndexMetadataResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.update_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_delete_index_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -1089,6 +1204,14 @@ impl ControlPlaneServiceTowerLayerStack { quickwit_common::tower::BoxService::new(boxed_instance.clone()), |svc, layer| layer.layer(svc), ); + let update_index_svc = self + .update_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); let delete_index_svc = self .delete_index_layers .into_iter() @@ -1140,6 +1263,7 @@ impl ControlPlaneServiceTowerLayerStack { let tower_svc_stack = ControlPlaneServiceTowerServiceStack { inner: boxed_instance.clone(), create_index_svc, + update_index_svc, delete_index_svc, add_source_svc, toggle_source_svc, @@ -1231,6 +1355,15 @@ where crate::control_plane::ControlPlaneError, >, > + + tower::Service< + super::metastore::UpdateIndexRequest, + Response = super::metastore::IndexMetadataResponse, + Error = crate::control_plane::ControlPlaneError, + Future = BoxFuture< + super::metastore::IndexMetadataResponse, + crate::control_plane::ControlPlaneError, + >, + > + tower::Service< super::metastore::DeleteIndexRequest, Response = super::metastore::EmptyResponse, @@ -1294,6 +1427,14 @@ where > { self.call(request).await } + async fn update_index( + &mut self, + request: super::metastore::UpdateIndexRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::metastore::IndexMetadataResponse, + > { + self.call(request).await + } async fn delete_index( &mut self, request: super::metastore::DeleteIndexRequest, @@ -1380,6 +1521,21 @@ where super::metastore::CreateIndexRequest::rpc_name(), )) } + async fn update_index( + &mut self, + request: super::metastore::UpdateIndexRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::metastore::IndexMetadataResponse, + > { + self.inner + .update_index(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::UpdateIndexRequest::rpc_name(), + )) + } async fn delete_index( &mut self, request: super::metastore::DeleteIndexRequest, @@ -1485,6 +1641,20 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn update_index( + &self, + request: tonic::Request, + ) -> Result< + tonic::Response, + tonic::Status, + > { + self.inner + .clone() + .update_index(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } async fn delete_index( &self, request: tonic::Request, @@ -1670,6 +1840,37 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Updates an index. + pub async fn update_index( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.control_plane.ControlPlaneService/UpdateIndex", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.control_plane.ControlPlaneService", + "UpdateIndex", + ), + ); + self.inner.unary(req, path, codec).await + } /// Deletes an index. pub async fn delete_index( &mut self, @@ -1878,6 +2079,14 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; + /// Updates an index. + async fn update_index( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Deletes an index. async fn delete_index( &self, @@ -2057,6 +2266,55 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } + "/quickwit.control_plane.ControlPlaneService/UpdateIndex" => { + #[allow(non_camel_case_types)] + struct UpdateIndexSvc(pub Arc); + impl< + T: ControlPlaneServiceGrpc, + > tonic::server::UnaryService< + super::super::metastore::UpdateIndexRequest, + > for UpdateIndexSvc { + type Response = super::super::metastore::IndexMetadataResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::metastore::UpdateIndexRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).update_index(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = UpdateIndexSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.control_plane.ControlPlaneService/DeleteIndex" => { #[allow(non_camel_case_types)] struct DeleteIndexSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index b20f316904f..2970e59ad46 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -51,6 +51,7 @@ pub struct ListIndexesMetadataResponse { ::prost::alloc::string::String, >, /// A JSON serialized then ZSTD compressed list of `IndexMetadata`: `Vec | JSON | ZSTD`. + /// We don't use `repeated` here to increase the compression rate and ratio. #[prost(bytes = "bytes", tag = "2")] pub indexes_metadata_json_zstd: ::prost::bytes::Bytes, } @@ -84,6 +85,44 @@ pub struct IndexMetadataResponse { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexesMetadataRequest { + #[prost(message, repeated, tag = "1")] + pub subrequests: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexMetadataSubrequest { + #[prost(string, optional, tag = "1")] + pub index_id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "2")] + pub index_uid: ::core::option::Option, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexesMetadataResponse { + /// A JSON serialized then ZSTD compressed list of `IndexMetadata`: `Vec | JSON | ZSTD`. + /// We don't use `repeated` here to increase the compression rate and ratio. + #[prost(bytes = "bytes", tag = "1")] + pub indexes_metadata_json_zstd: ::prost::bytes::Bytes, + #[prost(message, repeated, tag = "2")] + pub failures: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexMetadataFailure { + #[prost(string, optional, tag = "1")] + pub index_id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "2")] + pub index_uid: ::core::option::Option, + #[prost(enumeration = "IndexMetadataFailureReason", tag = "3")] + pub reason: i32, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListSplitsRequest { /// Predicate used to filter splits. /// The predicate is expressed as a JSON serialized @@ -348,6 +387,22 @@ pub struct DeleteShardsRequest { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteShardsResponse { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, + #[prost(string, tag = "2")] + pub source_id: ::prost::alloc::string::String, + /// List of shard IDs that were successfully deleted. + #[prost(message, repeated, tag = "3")] + pub successes: ::prost::alloc::vec::Vec, + /// List of shard IDs that could not be deleted because `force` was set to `false` in the request, + /// and the shards are not at EOF, i.e., not fully indexed. + #[prost(message, repeated, tag = "4")] + pub failures: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListShardsRequest { #[prost(message, repeated, tag = "1")] pub subrequests: ::prost::alloc::vec::Vec, @@ -509,6 +564,43 @@ impl SourceType { } } } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum IndexMetadataFailureReason { + Unspecified = 0, + NotFound = 1, + Internal = 2, +} +impl IndexMetadataFailureReason { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + IndexMetadataFailureReason::Unspecified => { + "INDEX_METADATA_FAILURE_REASON_UNSPECIFIED" + } + IndexMetadataFailureReason::NotFound => { + "INDEX_METADATA_FAILURE_REASON_NOT_FOUND" + } + IndexMetadataFailureReason::Internal => { + "INDEX_METADATA_FAILURE_REASON_INTERNAL" + } + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INDEX_METADATA_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), + "INDEX_METADATA_FAILURE_REASON_NOT_FOUND" => Some(Self::NotFound), + "INDEX_METADATA_FAILURE_REASON_INTERNAL" => Some(Self::Internal), + _ => None, + } + } +} /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; @@ -529,6 +621,11 @@ impl RpcName for IndexMetadataRequest { "index_metadata" } } +impl RpcName for IndexesMetadataRequest { + fn rpc_name() -> &'static str { + "indexes_metadata" + } +} impl RpcName for ListIndexesMetadataRequest { fn rpc_name() -> &'static str { "list_indexes_metadata" @@ -678,6 +775,11 @@ pub trait MetastoreService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync &mut self, request: IndexMetadataRequest, ) -> crate::metastore::MetastoreResult; + /// Fetches the metadata of a list of indexes identified by their Index IDs or UIDs. + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> crate::metastore::MetastoreResult; /// Gets an indexes metadatas. async fn list_indexes_metadata( &mut self, @@ -775,12 +877,12 @@ pub trait MetastoreService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync &mut self, request: AcquireShardsRequest, ) -> crate::metastore::MetastoreResult; - /// Deletes a set of shards. This RPC deletes the shards from the metastore and the storage. + /// Deletes a set of shards. This RPC deletes the shards from the metastore. /// If the shard did not exist to begin with, the operation is successful and does not return any error. async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> crate::metastore::MetastoreResult; + ) -> crate::metastore::MetastoreResult; async fn list_shards( &mut self, request: ListShardsRequest, @@ -925,6 +1027,12 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.index_metadata(request).await } + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.indexes_metadata(request).await + } async fn list_indexes_metadata( &mut self, request: ListIndexesMetadataRequest, @@ -1036,7 +1144,7 @@ impl MetastoreService for MetastoreServiceClient { async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.inner.delete_shards(request).await } async fn list_shards( @@ -1109,6 +1217,12 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.index_metadata(request).await } + async fn indexes_metadata( + &mut self, + request: super::IndexesMetadataRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.indexes_metadata(request).await + } async fn list_indexes_metadata( &mut self, request: super::ListIndexesMetadataRequest, @@ -1224,7 +1338,7 @@ pub mod mock_metastore_service { async fn delete_shards( &mut self, request: super::DeleteShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_shards(request).await } async fn list_shards( @@ -1322,6 +1436,22 @@ impl tower::Service for Box { Box::pin(fut) } } +impl tower::Service for Box { + type Response = IndexesMetadataResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: IndexesMetadataRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.indexes_metadata(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = ListIndexesMetadataResponse; type Error = crate::metastore::MetastoreError; @@ -1611,7 +1741,7 @@ impl tower::Service for Box { } } impl tower::Service for Box { - type Response = EmptyResponse; + type Response = DeleteShardsResponse; type Error = crate::metastore::MetastoreError; type Future = BoxFuture; fn poll_ready( @@ -1741,6 +1871,11 @@ struct MetastoreServiceTowerServiceStack { IndexMetadataResponse, crate::metastore::MetastoreError, >, + indexes_metadata_svc: quickwit_common::tower::BoxService< + IndexesMetadataRequest, + IndexesMetadataResponse, + crate::metastore::MetastoreError, + >, list_indexes_metadata_svc: quickwit_common::tower::BoxService< ListIndexesMetadataRequest, ListIndexesMetadataResponse, @@ -1833,7 +1968,7 @@ struct MetastoreServiceTowerServiceStack { >, delete_shards_svc: quickwit_common::tower::BoxService< DeleteShardsRequest, - EmptyResponse, + DeleteShardsResponse, crate::metastore::MetastoreError, >, list_shards_svc: quickwit_common::tower::BoxService< @@ -1874,6 +2009,7 @@ impl Clone for MetastoreServiceTowerServiceStack { create_index_svc: self.create_index_svc.clone(), update_index_svc: self.update_index_svc.clone(), index_metadata_svc: self.index_metadata_svc.clone(), + indexes_metadata_svc: self.indexes_metadata_svc.clone(), list_indexes_metadata_svc: self.list_indexes_metadata_svc.clone(), delete_index_svc: self.delete_index_svc.clone(), list_splits_svc: self.list_splits_svc.clone(), @@ -1926,6 +2062,12 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.index_metadata_svc.ready().await?.call(request).await } + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> crate::metastore::MetastoreResult { + self.indexes_metadata_svc.ready().await?.call(request).await + } async fn list_indexes_metadata( &mut self, request: ListIndexesMetadataRequest, @@ -2037,7 +2179,7 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.delete_shards_svc.ready().await?.call(request).await } async fn list_shards( @@ -2113,6 +2255,16 @@ type IndexMetadataLayer = quickwit_common::tower::BoxLayer< IndexMetadataResponse, crate::metastore::MetastoreError, >; +type IndexesMetadataLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + IndexesMetadataRequest, + IndexesMetadataResponse, + crate::metastore::MetastoreError, + >, + IndexesMetadataRequest, + IndexesMetadataResponse, + crate::metastore::MetastoreError, +>; type ListIndexesMetadataLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< ListIndexesMetadataRequest, @@ -2296,11 +2448,11 @@ type AcquireShardsLayer = quickwit_common::tower::BoxLayer< type DeleteShardsLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< DeleteShardsRequest, - EmptyResponse, + DeleteShardsResponse, crate::metastore::MetastoreError, >, DeleteShardsRequest, - EmptyResponse, + DeleteShardsResponse, crate::metastore::MetastoreError, >; type ListShardsLayer = quickwit_common::tower::BoxLayer< @@ -2368,6 +2520,7 @@ pub struct MetastoreServiceTowerLayerStack { create_index_layers: Vec, update_index_layers: Vec, index_metadata_layers: Vec, + indexes_metadata_layers: Vec, list_indexes_metadata_layers: Vec, delete_index_layers: Vec, list_splits_layers: Vec, @@ -2472,6 +2625,31 @@ impl MetastoreServiceTowerLayerStack { crate::metastore::MetastoreError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + IndexesMetadataRequest, + IndexesMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + IndexesMetadataRequest, + Response = IndexesMetadataResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< ListIndexesMetadataRequest, @@ -2933,25 +3111,25 @@ impl MetastoreServiceTowerLayerStack { L: tower::Layer< quickwit_common::tower::BoxService< DeleteShardsRequest, - EmptyResponse, + DeleteShardsResponse, crate::metastore::MetastoreError, >, > + Clone + Send + Sync + 'static, , >>::Service: tower::Service< DeleteShardsRequest, - Response = EmptyResponse, + Response = DeleteShardsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, <, >>::Service as tower::Service>::Future: Send + 'static, @@ -3120,6 +3298,8 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.index_metadata_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.indexes_metadata_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_indexes_metadata_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.delete_index_layers @@ -3229,6 +3409,25 @@ impl MetastoreServiceTowerLayerStack { self.index_metadata_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_indexes_metadata_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + IndexesMetadataRequest, + IndexesMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + IndexesMetadataRequest, + Response = IndexesMetadataResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.indexes_metadata_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_list_indexes_metadata_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -3590,13 +3789,13 @@ impl MetastoreServiceTowerLayerStack { L: tower::Layer< quickwit_common::tower::BoxService< DeleteShardsRequest, - EmptyResponse, + DeleteShardsResponse, crate::metastore::MetastoreError, >, > + Send + Sync + 'static, L::Service: tower::Service< DeleteShardsRequest, - Response = EmptyResponse, + Response = DeleteShardsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, @@ -3805,6 +4004,14 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(boxed_instance.clone()), |svc, layer| layer.layer(svc), ); + let indexes_metadata_svc = self + .indexes_metadata_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); let list_indexes_metadata_svc = self .list_indexes_metadata_layers .into_iter() @@ -4010,6 +4217,7 @@ impl MetastoreServiceTowerLayerStack { create_index_svc, update_index_svc, index_metadata_svc, + indexes_metadata_svc, list_indexes_metadata_svc, delete_index_svc, list_splits_svc, @@ -4129,6 +4337,12 @@ where Error = crate::metastore::MetastoreError, Future = BoxFuture, > + + tower::Service< + IndexesMetadataRequest, + Response = IndexesMetadataResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture, + > + tower::Service< ListIndexesMetadataRequest, Response = ListIndexesMetadataResponse, @@ -4251,9 +4465,9 @@ where > + tower::Service< DeleteShardsRequest, - Response = EmptyResponse, + Response = DeleteShardsResponse, Error = crate::metastore::MetastoreError, - Future = BoxFuture, + Future = BoxFuture, > + tower::Service< ListShardsRequest, @@ -4319,6 +4533,12 @@ where ) -> crate::metastore::MetastoreResult { self.call(request).await } + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> crate::metastore::MetastoreResult { + self.call(request).await + } async fn list_indexes_metadata( &mut self, request: ListIndexesMetadataRequest, @@ -4430,7 +4650,7 @@ where async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.call(request).await } async fn list_shards( @@ -4555,6 +4775,19 @@ where IndexMetadataRequest::rpc_name(), )) } + async fn indexes_metadata( + &mut self, + request: IndexesMetadataRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .indexes_metadata(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + IndexesMetadataRequest::rpc_name(), + )) + } async fn list_indexes_metadata( &mut self, request: ListIndexesMetadataRequest, @@ -4800,7 +5033,7 @@ where async fn delete_shards( &mut self, request: DeleteShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.inner .delete_shards(request) .await @@ -4952,6 +5185,17 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn indexes_metadata( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .indexes_metadata(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } async fn list_indexes_metadata( &self, request: tonic::Request, @@ -5158,7 +5402,7 @@ for MetastoreServiceGrpcServerAdapter { async fn delete_shards( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { self.inner .clone() .delete_shards(request.into_inner()) @@ -5454,6 +5698,37 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Fetches the metadata of a list of indexes identified by their Index IDs or UIDs. + pub async fn indexes_metadata( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/IndexesMetadata", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.metastore.MetastoreService", + "IndexesMetadata", + ), + ); + self.inner.unary(req, path, codec).await + } /// Gets an indexes metadatas. pub async fn list_indexes_metadata( &mut self, @@ -5974,12 +6249,15 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// Deletes a set of shards. This RPC deletes the shards from the metastore and the storage. + /// Deletes a set of shards. This RPC deletes the shards from the metastore. /// If the shard did not exist to begin with, the operation is successful and does not return any error. pub async fn delete_shards( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await @@ -6215,6 +6493,14 @@ pub mod metastore_service_grpc_server { tonic::Response, tonic::Status, >; + /// Fetches the metadata of a list of indexes identified by their Index IDs or UIDs. + async fn indexes_metadata( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Gets an indexes metadatas. async fn list_indexes_metadata( &self, @@ -6339,12 +6625,15 @@ pub mod metastore_service_grpc_server { tonic::Response, tonic::Status, >; - /// Deletes a set of shards. This RPC deletes the shards from the metastore and the storage. + /// Deletes a set of shards. This RPC deletes the shards from the metastore. /// If the shard did not exist to begin with, the operation is successful and does not return any error. async fn delete_shards( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn list_shards( &self, request: tonic::Request, @@ -6651,6 +6940,52 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/IndexesMetadata" => { + #[allow(non_camel_case_types)] + struct IndexesMetadataSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for IndexesMetadataSvc { + type Response = super::IndexesMetadataResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).indexes_metadata(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = IndexesMetadataSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.metastore.MetastoreService/ListIndexesMetadata" => { #[allow(non_camel_case_types)] struct ListIndexesMetadataSvc(pub Arc); @@ -7486,7 +7821,7 @@ pub mod metastore_service_grpc_server { T: MetastoreServiceGrpc, > tonic::server::UnaryService for DeleteShardsSvc { - type Response = super::EmptyResponse; + type Response = super::DeleteShardsResponse; type Future = BoxFuture< tonic::Response, tonic::Status, diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index 7e8416e56d5..0a2c3d88217 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -55,6 +55,7 @@ generate_getters! { DeleteIndexRequest, DeleteQuery, DeleteShardsRequest, + DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest, LastDeleteOpstampRequest, diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index 6fbb7f3535e..b4e5e06cbd9 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -275,21 +275,6 @@ impl IndexMetadataRequest { index_id: None, } } - - /// Returns the index id either from the `index_id` or the `index_uid`. - /// If none of them is set, an error is returned. - pub fn get_index_id(&self) -> MetastoreResult { - if let Some(index_id) = &self.index_id { - Ok(index_id.to_string()) - } else if let Some(index_uid) = &self.index_uid { - Ok(index_uid.index_id.to_string()) - } else { - Err(MetastoreError::Internal { - message: "index_id or index_uid must be set".to_string(), - cause: "".to_string(), - }) - } - } } impl MarkSplitsForDeletionRequest { diff --git a/quickwit/quickwit-proto/src/types/index_uid.rs b/quickwit/quickwit-proto/src/types/index_uid.rs index ff116263aed..748efe571ca 100644 --- a/quickwit/quickwit-proto/src/types/index_uid.rs +++ b/quickwit/quickwit-proto/src/types/index_uid.rs @@ -203,6 +203,29 @@ impl TryFrom for IndexUid { } } +#[cfg(feature = "postgres")] +impl sqlx::Type for IndexUid { + fn type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("VARCHAR") + } +} + +#[cfg(feature = "postgres")] +impl sqlx::Encode<'_, sqlx::Postgres> for IndexUid { + fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { + let _ = sqlx::Encode::::encode(&self.index_id, buf); + let _ = sqlx::Encode::::encode(":", buf); + sqlx::Encode::::encode(&self.incarnation_id.to_string(), buf) + } +} + +#[cfg(feature = "postgres")] +impl sqlx::postgres::PgHasArrayType for IndexUid { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("VARCHAR[]") + } +} + impl PartialEq<(&'static str, u128)> for IndexUid { fn eq(&self, (index_id, incarnation_id): &(&str, u128)) -> bool { self.index_id == *index_id && self.incarnation_id == Ulid::from(*incarnation_id) diff --git a/quickwit/quickwit-proto/src/types/shard_id.rs b/quickwit/quickwit-proto/src/types/shard_id.rs index 525487a4ae9..8a25d19232d 100644 --- a/quickwit/quickwit-proto/src/types/shard_id.rs +++ b/quickwit/quickwit-proto/src/types/shard_id.rs @@ -139,6 +139,27 @@ impl PartialEq for &ShardId { } } +#[cfg(feature = "postgres")] +impl sqlx::Type for ShardId { + fn type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("VARCHAR") + } +} + +#[cfg(feature = "postgres")] +impl sqlx::Encode<'_, sqlx::Postgres> for ShardId { + fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { + sqlx::Encode::::encode(self.as_str(), buf) + } +} + +#[cfg(feature = "postgres")] +impl sqlx::postgres::PgHasArrayType for ShardId { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("VARCHAR[]") + } +} + #[cfg(test)] mod tests {