From 87a3e5e1692b5930b31e8be7907a77dad686e6e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Mon, 10 Jul 2023 01:51:44 +0900 Subject: [PATCH] Clean. --- quickwit/quickwit-metastore-utils/Cargo.toml | 2 +- .../src/metastore/file_backed_metastore/mod.rs | 4 ++-- quickwit/quickwit-metastore/src/metastore/mod.rs | 3 +++ .../quickwit-metastore/src/metastore/postgresql_metastore.rs | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-metastore-utils/Cargo.toml b/quickwit/quickwit-metastore-utils/Cargo.toml index 1595cc83306..2661752d907 100644 --- a/quickwit/quickwit-metastore-utils/Cargo.toml +++ b/quickwit/quickwit-metastore-utils/Cargo.toml @@ -20,4 +20,4 @@ quickwit-common = { workspace = true } quickwit-proto = { workspace = true } serde = "1" structopt = "0.3" -tokio = {workspace = true} +tokio = { workspace = true} diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index f5bca5f778e..86632828753 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -48,6 +48,7 @@ use self::store_operations::{ check_indexes_states_exist, delete_index, fetch_index, fetch_or_init_indexes_states, index_exists, put_index, put_indexes_states, }; +use super::STREAM_SPLITS_CHUNK_SIZE; use crate::checkpoint::IndexCheckpointDelta; use crate::{ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, MetastoreResult, Split, @@ -553,14 +554,13 @@ impl Metastore for FileBackedMetastore { .await } - /// Stream splits async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { let splits = self.list_splits(query).await?; let chunks = splits - .chunks(100) + .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| Ok(chunk.to_vec())) .collect_vec(); let stream_ok = futures::stream::iter(chunks.into_iter()); diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 92030bbedcb..ae272fc5ee6 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -44,6 +44,9 @@ use time::OffsetDateTime; use crate::checkpoint::IndexCheckpointDelta; use crate::{MetastoreError, MetastoreResult, Split, SplitMetadata, SplitState}; +/// Splits batch size returned by the stream splits API +const STREAM_SPLITS_CHUNK_SIZE: usize = 100; + /// Metastore meant to manage Quickwit's indexes, their splits and delete tasks. /// /// I. Index and splits management. diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 156f4b725b2..a5e9021a9ae 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -47,6 +47,7 @@ use tokio_stream::Stream; use tracing::log::LevelFilter; use tracing::{debug, error, info, instrument, warn}; +use super::STREAM_SPLITS_CHUNK_SIZE; use crate::checkpoint::IndexCheckpointDelta; use crate::metastore::instrumented_metastore::InstrumentedMetastore; use crate::metastore::postgresql_model::{ @@ -786,7 +787,7 @@ impl Metastore for PostgresqlMetastore { } Err(error) => Err(MetastoreError::from(error)), }) - .chunks(100) + .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| { chunk .into_iter()