diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index c7183acfbd0..80a576219e1 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -35,8 +35,9 @@ use quickwit_proto::metastore::{ ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, - PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, - UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -236,6 +237,14 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.delete_shards(request).await } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> MetastoreResult { + // TODO this call should go through the control plane which should apply debounce + self.metastore.prune_shards(request).await + } + // Index Template API async fn create_index_template( diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index d34a6ea781b..b1bc18d3f71 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -36,7 +36,8 @@ use quickwit_config::{ use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, - MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, + MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, + PruneShardsResponse, }; use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId}; use serde::{Deserialize, Serialize}; @@ -653,6 +654,14 @@ impl FileBackedIndex { .delete_shards(request) } + pub(crate) fn prune_shards( + &mut self, + request: PruneShardsRequest, + ) -> MetastoreResult> { + self.get_shards_for_source_mut(&request.source_id)? + .prune_shards(request) + } + pub(crate) fn list_shards( &self, subrequest: ListShardsSubrequest, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index eb7bb3d013b..ac93f9efc9d 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -26,7 +26,7 @@ use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, DeleteShardsResponse, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, - OpenShardSubrequest, OpenShardSubresponse, + OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, PruneShardsResponse, }; use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId}; use time::OffsetDateTime; @@ -239,6 +239,45 @@ impl Shards { } } + pub(super) fn prune_shards( + &mut self, + request: PruneShardsRequest, + ) -> MetastoreResult> { + let initial_shard_count = self.shards.len(); + + if let Some(max_age) = request.max_age { + self.shards.retain(|_, shard| { + let limit_timestamp = OffsetDateTime::now_utc().unix_timestamp() - max_age as i64; + shard.update_timestamp >= limit_timestamp + }); + }; + if let Some(max_count) = request.max_count { + let max_count = max_count as usize; + if max_count < self.shards.len() { + let num_to_remove = self.shards.len() - max_count; + let shard_ids_to_delete = self + .shards + .values() + .sorted_by_key(|shard| shard.update_timestamp) + .take(num_to_remove) + .map(|shard| shard.shard_id().clone()) + .collect_vec(); + for shard_id in shard_ids_to_delete { + self.shards.remove(&shard_id); + } + } + } + let response = PruneShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + }; + if initial_shard_count > self.shards.len() { + Ok(MutationOccurred::Yes(response)) + } else { + Ok(MutationOccurred::No(response)) + } + } + pub(super) fn list_shards( &self, subrequest: ListShardsSubrequest, @@ -594,4 +633,85 @@ mod tests { assert!(shards.shards.is_empty()); } + + #[test] + fn test_prune_shards() { + let index_uid = IndexUid::for_test("test-index", 0); + let source_id = "test-source".to_string(); + let mut shards = Shards::empty(index_uid.clone(), source_id.clone()); + + let request = PruneShardsRequest { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + max_age: None, + max_count: None, + }; + let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else { + panic!("expected `MutationOccurred::No`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + + let request = PruneShardsRequest { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + max_age: Some(50), + max_count: None, + }; + let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else { + panic!("expected `MutationOccurred::No`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + + let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + shards.shards.insert( + ShardId::from(0), + Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::eof(0u64)), + update_timestamp: current_timestamp - 200, + ..Default::default() + }, + ); + shards.shards.insert( + ShardId::from(1), + Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(Position::offset(0u64)), + update_timestamp: current_timestamp - 100, + ..Default::default() + }, + ); + + let request = PruneShardsRequest { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + max_age: Some(150), + max_count: None, + }; + let MutationOccurred::Yes(response) = shards.prune_shards(request).unwrap() else { + panic!("expected `MutationOccurred::Yes`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + + let request = PruneShardsRequest { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + max_age: Some(150), + max_count: None, + }; + let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else { + panic!("expected `MutationOccurred::No`"); + }; + assert_eq!(response.index_uid(), &index_uid); + assert_eq!(response.source_id, source_id); + } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 9780cbc386e..673c3a6be37 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -58,9 +58,9 @@ use quickwit_proto::metastore::{ ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, - OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, - UpdateSplitsDeleteOpstampResponse, + OpenShardsResponse, PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, + ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, + UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; @@ -892,6 +892,17 @@ impl MetastoreService for FileBackedMetastore { Ok(response) } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> MetastoreResult { + let index_uid = request.index_uid().clone(); + let response = self + .mutate(&index_uid, |index| index.prune_shards(request)) + .await?; + Ok(response) + } + async fn list_shards(&self, request: ListShardsRequest) -> MetastoreResult { let mut subresponses = Vec::with_capacity(request.subrequests.len()); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 4ed4536f8c9..20064cae706 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fmt::{self, Write}; +use std::time::Duration; use async_trait::async_trait; use futures::StreamExt; @@ -44,9 +45,10 @@ use quickwit_proto::metastore::{ ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, - OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, - UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, + PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, + ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId}; use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType}; @@ -1486,6 +1488,39 @@ impl MetastoreService for PostgresqlMetastore { Ok(response) } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> MetastoreResult { + const PRUNE_AGE_SHARDS_QUERY: &str = include_str!("queries/shards/prune_age.sql"); + const PRUNE_COUNT_SHARDS_QUERY: &str = include_str!("queries/shards/prune_count.sql"); + + if let Some(max_age) = request.max_age { + let limit_datetime = OffsetDateTime::now_utc() - Duration::from_secs(max_age as u64); + sqlx::query(PRUNE_AGE_SHARDS_QUERY) + .bind(request.index_uid()) + .bind(&request.source_id) + .bind(limit_datetime) + .execute(&self.connection_pool) + .await?; + } + + if let Some(max_count) = request.max_count { + sqlx::query(PRUNE_COUNT_SHARDS_QUERY) + .bind(request.index_uid()) + .bind(&request.source_id) + .bind(max_count as i64) + .execute(&self.connection_pool) + .await?; + } + + let response = PruneShardsResponse { + index_uid: request.index_uid, + source_id: request.source_id, + }; + Ok(response) + } + // Index Template API async fn create_index_template( diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/prune_age.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/prune_age.sql new file mode 100644 index 00000000000..65f2b7bedaf --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/prune_age.sql @@ -0,0 +1,4 @@ +DELETE FROM shards +WHERE index_uid = $1 + AND source_id = $2 + AND update_timestamp < $3 diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/prune_count.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/prune_count.sql new file mode 100644 index 00000000000..93be36bfe38 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/prune_count.sql @@ -0,0 +1,12 @@ +WITH recent_shards AS ( + SELECT shard_id + FROM shards + WHERE index_uid = $1 + AND source_id = $2 + ORDER BY update_timestamp DESC + LIMIT $3 +) +DELETE FROM shards +WHERE index_uid = $1 + AND source_id = $2 + AND shard_id NOT IN (SELECT shard_id FROM recent_shards) diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 7c4660c9fe5..3e0add028df 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -474,6 +474,12 @@ macro_rules! metastore_test_suite { #[tokio::test] #[serial_test::file_serial] + async fn test_metastore_prune_shards() { + $crate::tests::shard::test_metastore_prune_shards::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::serial] async fn test_metastore_apply_checkpoint_delta_v2_single_shard() { $crate::tests::shard::test_metastore_apply_checkpoint_delta_v2_single_shard::<$metastore_type>().await; } diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index 8b53d3901dd..80549742118 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use async_trait::async_trait; +use itertools::Itertools; use quickwit_common::rand::append_random_suffix; use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_proto::compatibility_shard_update_timestamp; @@ -25,9 +26,10 @@ use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ AcquireShardsRequest, AddSourceRequest, CreateIndexRequest, DeleteShardsRequest, EntityKind, ListShardsRequest, ListShardsSubrequest, MetastoreError, MetastoreService, OpenShardSubrequest, - OpenShardsRequest, PublishSplitsRequest, + OpenShardsRequest, PruneShardsRequest, PublishSplitsRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId, SourceId}; +use time::OffsetDateTime; use super::DefaultForTest; use crate::checkpoint::{IndexCheckpointDelta, PartitionId, SourceCheckpointDelta}; @@ -611,6 +613,130 @@ pub async fn test_metastore_delete_shards< cleanup_index(&mut metastore, test_index.index_uid).await; } +pub async fn test_metastore_prune_shards< + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, +>() { + let mut metastore = MetastoreUnderTest::default_for_test().await; + + let test_index = TestIndex::create_index_with_source( + &mut metastore, + "test-prune-shards", + SourceConfig::ingest_v2(), + ) + .await; + + let now_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + let oldest_shard_age = 10000u32; + + // Create shards with timestamp intervals of 100s starting from + // now_timestamp - oldest_shard_age + let shards = (0..100) + .map(|shard_id| Shard { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_id: Some(ShardId::from(shard_id)), + shard_state: ShardState::Closed as i32, + doc_mapping_uid: Some(DocMappingUid::default()), + publish_position_inclusive: Some(Position::Beginning), + update_timestamp: now_timestamp - oldest_shard_age as i64 + shard_id as i64 * 100, + ..Default::default() + }) + .collect_vec(); + + metastore + .insert_shards(&test_index.index_uid, &test_index.source_id, shards) + .await; + + // noop prune request + { + let prune_index_request = PruneShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + max_age: None, + max_count: None, + }; + let response = metastore.prune_shards(prune_index_request).await.unwrap(); + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + let all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 100); + } + + // delete shards 4 last shards with age limit + { + let prune_index_request = PruneShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + max_age: Some(oldest_shard_age - 350), + max_count: None, + }; + let response = metastore.prune_shards(prune_index_request).await.unwrap(); + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + let mut all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 96); + all_shards.sort_unstable_by_key(|shard| shard.update_timestamp); + assert_eq!(all_shards[0].shard_id(), ShardId::from(4)); + assert_eq!(all_shards[95].shard_id(), ShardId::from(99)); + } + + // delete 6 more shards with count limit + { + let prune_index_request = PruneShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + max_age: None, + max_count: Some(90), + }; + let response = metastore.prune_shards(prune_index_request).await.unwrap(); + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + let mut all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 90); + all_shards.sort_unstable_by_key(|shard| shard.update_timestamp); + assert_eq!(all_shards[0].shard_id(), ShardId::from(10)); + assert_eq!(all_shards[89].shard_id(), ShardId::from(99)); + } + + // age limit is the limiting factor, delete 10 more shards + let prune_index_request = PruneShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + max_age: Some(oldest_shard_age - 2950), + max_count: Some(80), + }; + let response = metastore.prune_shards(prune_index_request).await.unwrap(); + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + let all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 70); + + // count limit is the limiting factor, delete 20 more shards + let prune_index_request = PruneShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + max_age: Some(oldest_shard_age - 4000), + max_count: Some(50), + }; + let response = metastore.prune_shards(prune_index_request).await.unwrap(); + assert_eq!(response.index_uid(), &test_index.index_uid); + assert_eq!(response.source_id, test_index.source_id); + let all_shards = metastore + .list_all_shards(&test_index.index_uid, &test_index.source_id) + .await; + assert_eq!(all_shards.len(), 50); + + cleanup_index(&mut metastore, test_index.index_uid).await; +} + pub async fn test_metastore_apply_checkpoint_delta_v2_single_shard< MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + ReadWriteShardsForTest, >() { diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index b3cd3a7898d..4ce08680e92 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -177,6 +177,9 @@ service MetastoreService { // If the shard did not exist to begin with, the operation is successful and does not return any error. rpc DeleteShards(DeleteShardsRequest) returns (DeleteShardsResponse); + // Deletes outdated shards. This RPC deletes the shards from the metastore. + rpc PruneShards(PruneShardsRequest) returns (PruneShardsResponse); + rpc ListShards(ListShardsRequest) returns (ListShardsResponse); // Index Template API @@ -451,6 +454,20 @@ message DeleteShardsResponse { repeated quickwit.ingest.ShardId failures = 4; } +message PruneShardsRequest { + quickwit.common.IndexUid index_uid = 1; + string source_id = 2; + // The maximum age of the shards to keep, in seconds. + optional uint32 max_age = 5; + // The maximum number of the shards to keep. Delete older shards first. + optional uint32 max_count = 6; +} + +message PruneShardsResponse { + quickwit.common.IndexUid index_uid = 1; + string source_id = 2; +} + message ListShardsRequest { repeated ListShardsSubrequest subrequests = 1; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 1f0f36db21c..9d67734b564 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -411,6 +411,30 @@ pub struct DeleteShardsResponse { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PruneShardsRequest { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, + #[prost(string, tag = "2")] + pub source_id: ::prost::alloc::string::String, + /// The maximum age of the shards to keep, in seconds. + #[prost(uint32, optional, tag = "5")] + pub max_age: ::core::option::Option, + /// The maximum number of the shards to keep. Delete older shards first. + #[prost(uint32, optional, tag = "6")] + pub max_count: ::core::option::Option, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PruneShardsResponse { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, + #[prost(string, tag = "2")] + pub source_id: ::prost::alloc::string::String, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListShardsRequest { #[prost(message, repeated, tag = "1")] pub subrequests: ::prost::alloc::vec::Vec, @@ -732,6 +756,11 @@ impl RpcName for DeleteShardsRequest { "delete_shards" } } +impl RpcName for PruneShardsRequest { + fn rpc_name() -> &'static str { + "prune_shards" + } +} impl RpcName for ListShardsRequest { fn rpc_name() -> &'static str { "list_shards" @@ -900,6 +929,11 @@ pub trait MetastoreService: std::fmt::Debug + Send + Sync + 'static { &self, request: DeleteShardsRequest, ) -> crate::metastore::MetastoreResult; + /// Deletes outdated shards. This RPC deletes the shards from the metastore. + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> crate::metastore::MetastoreResult; async fn list_shards( &self, request: ListShardsRequest, @@ -1161,6 +1195,12 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.0.delete_shards(request).await } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.0.prune_shards(request).await + } async fn list_shards( &self, request: ListShardsRequest, @@ -1355,6 +1395,12 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_shards(request).await } + async fn prune_shards( + &self, + request: super::PruneShardsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.prune_shards(request).await + } async fn list_shards( &self, request: super::ListShardsRequest, @@ -1770,6 +1816,22 @@ impl tower::Service for InnerMetastoreServiceClient { Box::pin(fut) } } +impl tower::Service for InnerMetastoreServiceClient { + type Response = PruneShardsResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: PruneShardsRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.prune_shards(request).await }; + Box::pin(fut) + } +} impl tower::Service for InnerMetastoreServiceClient { type Response = ListShardsResponse; type Error = crate::metastore::MetastoreError; @@ -1986,6 +2048,11 @@ struct MetastoreServiceTowerServiceStack { DeleteShardsResponse, crate::metastore::MetastoreError, >, + prune_shards_svc: quickwit_common::tower::BoxService< + PruneShardsRequest, + PruneShardsResponse, + crate::metastore::MetastoreError, + >, list_shards_svc: quickwit_common::tower::BoxService< ListShardsRequest, ListShardsResponse, @@ -2157,6 +2224,12 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.delete_shards_svc.clone().ready().await?.call(request).await } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> crate::metastore::MetastoreResult { + self.prune_shards_svc.clone().ready().await?.call(request).await + } async fn list_shards( &self, request: ListShardsRequest, @@ -2430,6 +2503,16 @@ type DeleteShardsLayer = quickwit_common::tower::BoxLayer< DeleteShardsResponse, crate::metastore::MetastoreError, >; +type PruneShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + PruneShardsRequest, + PruneShardsResponse, + crate::metastore::MetastoreError, + >, + PruneShardsRequest, + PruneShardsResponse, + crate::metastore::MetastoreError, +>; type ListShardsLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< ListShardsRequest, @@ -2515,6 +2598,7 @@ pub struct MetastoreServiceTowerLayerStack { open_shards_layers: Vec, acquire_shards_layers: Vec, delete_shards_layers: Vec, + prune_shards_layers: Vec, list_shards_layers: Vec, create_index_template_layers: Vec, get_index_template_layers: Vec, @@ -3108,6 +3192,31 @@ impl MetastoreServiceTowerLayerStack { crate::metastore::MetastoreError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + PruneShardsRequest, + PruneShardsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + PruneShardsRequest, + Response = PruneShardsResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< ListShardsRequest, @@ -3313,6 +3422,8 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.delete_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.prune_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.create_index_template_layers @@ -3778,6 +3889,25 @@ impl MetastoreServiceTowerLayerStack { self.delete_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_prune_shards_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + PruneShardsRequest, + PruneShardsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + PruneShardsRequest, + Response = PruneShardsResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.prune_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_list_shards_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -4145,6 +4275,14 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let prune_shards_svc = self + .prune_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let list_shards_svc = self .list_shards_layers .into_iter() @@ -4218,6 +4356,7 @@ impl MetastoreServiceTowerLayerStack { open_shards_svc, acquire_shards_svc, delete_shards_svc, + prune_shards_svc, list_shards_svc, create_index_template_svc, get_index_template_svc, @@ -4450,6 +4589,12 @@ where Error = crate::metastore::MetastoreError, Future = BoxFuture, > + + tower::Service< + PruneShardsRequest, + Response = PruneShardsResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture, + > + tower::Service< ListShardsRequest, Response = ListShardsResponse, @@ -4634,6 +4779,12 @@ where ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> crate::metastore::MetastoreResult { + self.clone().call(request).await + } async fn list_shards( &self, request: ListShardsRequest, @@ -5047,6 +5198,20 @@ where DeleteShardsRequest::rpc_name(), )) } + async fn prune_shards( + &self, + request: PruneShardsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .clone() + .prune_shards(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + PruneShardsRequest::rpc_name(), + )) + } async fn list_shards( &self, request: ListShardsRequest, @@ -5422,6 +5587,17 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn prune_shards( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .prune_shards(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } async fn list_shards( &self, request: tonic::Request, @@ -6299,6 +6475,34 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Deletes outdated shards. This RPC deletes the shards from the metastore. + pub async fn prune_shards( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/PruneShards", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.metastore.MetastoreService", "PruneShards"), + ); + self.inner.unary(req, path, codec).await + } pub async fn list_shards( &mut self, request: impl tonic::IntoRequest, @@ -6658,6 +6862,14 @@ pub mod metastore_service_grpc_server { tonic::Response, tonic::Status, >; + /// Deletes outdated shards. This RPC deletes the shards from the metastore. + async fn prune_shards( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn list_shards( &self, request: tonic::Request, @@ -7884,6 +8096,52 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/PruneShards" => { + #[allow(non_camel_case_types)] + struct PruneShardsSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for PruneShardsSvc { + type Response = super::PruneShardsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).prune_shards(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PruneShardsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.metastore.MetastoreService/ListShards" => { #[allow(non_camel_case_types)] struct ListShardsSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index 9f8df0289ac..ceda0908f03 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -134,6 +134,8 @@ generate_getters! { ListStaleSplitsRequest, MarkSplitsForDeletionRequest, OpenShardSubrequest, + PruneShardsRequest, + PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,