Skip to content

Commit

Permalink
Add shard pruning endpoint (#5335)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai authored Sep 2, 2024
1 parent 7cfc733 commit 779f020
Show file tree
Hide file tree
Showing 12 changed files with 620 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ use quickwit_proto::metastore::{
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService,
MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse,
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
};

/// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can
Expand Down Expand Up @@ -236,6 +237,14 @@ impl MetastoreService for ControlPlaneMetastore {
self.metastore.delete_shards(request).await
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
// TODO this call should go through the control plane which should apply debounce
self.metastore.prune_shards(request).await
}

// Index Template API

async fn create_index_template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use quickwit_config::{
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest,
DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse,
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse,
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
PruneShardsResponse,
};
use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -653,6 +654,14 @@ impl FileBackedIndex {
.delete_shards(request)
}

pub(crate) fn prune_shards(
&mut self,
request: PruneShardsRequest,
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
self.get_shards_for_source_mut(&request.source_id)?
.prune_shards(request)
}

pub(crate) fn list_shards(
&self,
subrequest: ListShardsSubrequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, DeleteShardsResponse,
EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult,
OpenShardSubrequest, OpenShardSubresponse,
OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, PruneShardsResponse,
};
use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId};
use time::OffsetDateTime;
Expand Down Expand Up @@ -239,6 +239,45 @@ impl Shards {
}
}

pub(super) fn prune_shards(
&mut self,
request: PruneShardsRequest,
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
let initial_shard_count = self.shards.len();

if let Some(max_age) = request.max_age {
self.shards.retain(|_, shard| {
let limit_timestamp = OffsetDateTime::now_utc().unix_timestamp() - max_age as i64;
shard.update_timestamp >= limit_timestamp
});
};
if let Some(max_count) = request.max_count {
let max_count = max_count as usize;
if max_count < self.shards.len() {
let num_to_remove = self.shards.len() - max_count;
let shard_ids_to_delete = self
.shards
.values()
.sorted_by_key(|shard| shard.update_timestamp)
.take(num_to_remove)
.map(|shard| shard.shard_id().clone())
.collect_vec();
for shard_id in shard_ids_to_delete {
self.shards.remove(&shard_id);
}
}
}
let response = PruneShardsResponse {
index_uid: request.index_uid,
source_id: request.source_id,
};
if initial_shard_count > self.shards.len() {
Ok(MutationOccurred::Yes(response))
} else {
Ok(MutationOccurred::No(response))
}
}

pub(super) fn list_shards(
&self,
subrequest: ListShardsSubrequest,
Expand Down Expand Up @@ -594,4 +633,85 @@ mod tests {

assert!(shards.shards.is_empty());
}

#[test]
fn test_prune_shards() {
let index_uid = IndexUid::for_test("test-index", 0);
let source_id = "test-source".to_string();
let mut shards = Shards::empty(index_uid.clone(), source_id.clone());

let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: None,
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::No`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);

let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(50),
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::No`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);

let current_timestamp = OffsetDateTime::now_utc().unix_timestamp();
shards.shards.insert(
ShardId::from(0),
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(0)),
shard_state: ShardState::Open as i32,
publish_position_inclusive: Some(Position::eof(0u64)),
update_timestamp: current_timestamp - 200,
..Default::default()
},
);
shards.shards.insert(
ShardId::from(1),
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
publish_position_inclusive: Some(Position::offset(0u64)),
update_timestamp: current_timestamp - 100,
..Default::default()
},
);

let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(150),
max_count: None,
};
let MutationOccurred::Yes(response) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::Yes`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);

let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(150),
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
panic!("expected `MutationOccurred::No`");
};
assert_eq!(response.index_uid(), &index_uid);
assert_eq!(response.source_id, source_id);
}
}
17 changes: 14 additions & 3 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use quickwit_proto::metastore::{
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
OpenShardsResponse, PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
};
use quickwit_proto::types::{IndexId, IndexUid};
use quickwit_storage::Storage;
Expand Down Expand Up @@ -892,6 +892,17 @@ impl MetastoreService for FileBackedMetastore {
Ok(response)
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
let index_uid = request.index_uid().clone();
let response = self
.mutate(&index_uid, |index| index.prune_shards(request))
.await?;
Ok(response)
}

async fn list_shards(&self, request: ListShardsRequest) -> MetastoreResult<ListShardsResponse> {
let mut subresponses = Vec::with_capacity(request.subrequests.len());

Expand Down
41 changes: 38 additions & 3 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::collections::HashMap;
use std::fmt::{self, Write};
use std::time::Duration;

use async_trait::async_trait;
use futures::StreamExt;
Expand All @@ -44,9 +45,10 @@ use quickwit_proto::metastore::{
ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
};
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId};
use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType};
Expand Down Expand Up @@ -1486,6 +1488,39 @@ impl MetastoreService for PostgresqlMetastore {
Ok(response)
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
const PRUNE_AGE_SHARDS_QUERY: &str = include_str!("queries/shards/prune_age.sql");
const PRUNE_COUNT_SHARDS_QUERY: &str = include_str!("queries/shards/prune_count.sql");

if let Some(max_age) = request.max_age {
let limit_datetime = OffsetDateTime::now_utc() - Duration::from_secs(max_age as u64);
sqlx::query(PRUNE_AGE_SHARDS_QUERY)
.bind(request.index_uid())
.bind(&request.source_id)
.bind(limit_datetime)
.execute(&self.connection_pool)
.await?;
}

if let Some(max_count) = request.max_count {
sqlx::query(PRUNE_COUNT_SHARDS_QUERY)
.bind(request.index_uid())
.bind(&request.source_id)
.bind(max_count as i64)
.execute(&self.connection_pool)
.await?;
}

let response = PruneShardsResponse {
index_uid: request.index_uid,
source_id: request.source_id,
};
Ok(response)
}

// Index Template API

async fn create_index_template(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DELETE FROM shards
WHERE index_uid = $1
AND source_id = $2
AND update_timestamp < $3
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
WITH recent_shards AS (
SELECT shard_id
FROM shards
WHERE index_uid = $1
AND source_id = $2
ORDER BY update_timestamp DESC
LIMIT $3
)
DELETE FROM shards
WHERE index_uid = $1
AND source_id = $2
AND shard_id NOT IN (SELECT shard_id FROM recent_shards)
6 changes: 6 additions & 0 deletions quickwit/quickwit-metastore/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ macro_rules! metastore_test_suite {

#[tokio::test]
#[serial_test::file_serial]
async fn test_metastore_prune_shards() {
$crate::tests::shard::test_metastore_prune_shards::<$metastore_type>().await;
}

#[tokio::test]
#[serial_test::serial]
async fn test_metastore_apply_checkpoint_delta_v2_single_shard() {
$crate::tests::shard::test_metastore_apply_checkpoint_delta_v2_single_shard::<$metastore_type>().await;
}
Expand Down
Loading

0 comments on commit 779f020

Please sign in to comment.