Skip to content

Commit

Permalink
Clean.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jul 20, 2023
1 parent 3e241b8 commit 87a3e5e
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ quickwit-common = { workspace = true }
quickwit-proto = { workspace = true }
serde = "1"
structopt = "0.3"
tokio = {workspace = true}
tokio = { workspace = true}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use self::store_operations::{
check_indexes_states_exist, delete_index, fetch_index, fetch_or_init_indexes_states,
index_exists, put_index, put_indexes_states,
};
use super::STREAM_SPLITS_CHUNK_SIZE;
use crate::checkpoint::IndexCheckpointDelta;
use crate::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, MetastoreResult, Split,
Expand Down Expand Up @@ -553,14 +554,13 @@ impl Metastore for FileBackedMetastore {
.await
}

/// Stream splits
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
let splits = self.list_splits(query).await?;
let chunks = splits
.chunks(100)
.chunks(STREAM_SPLITS_CHUNK_SIZE)
.map(|chunk| Ok(chunk.to_vec()))
.collect_vec();
let stream_ok = futures::stream::iter(chunks.into_iter());
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ use time::OffsetDateTime;
use crate::checkpoint::IndexCheckpointDelta;
use crate::{MetastoreError, MetastoreResult, Split, SplitMetadata, SplitState};

/// Splits batch size returned by the stream splits API
const STREAM_SPLITS_CHUNK_SIZE: usize = 100;

/// Metastore meant to manage Quickwit's indexes, their splits and delete tasks.
///
/// I. Index and splits management.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use tokio_stream::Stream;
use tracing::log::LevelFilter;
use tracing::{debug, error, info, instrument, warn};

use super::STREAM_SPLITS_CHUNK_SIZE;
use crate::checkpoint::IndexCheckpointDelta;
use crate::metastore::instrumented_metastore::InstrumentedMetastore;
use crate::metastore::postgresql_model::{
Expand Down Expand Up @@ -786,7 +787,7 @@ impl Metastore for PostgresqlMetastore {
}
Err(error) => Err(MetastoreError::from(error)),
})
.chunks(100)
.chunks(STREAM_SPLITS_CHUNK_SIZE)
.map(|chunk| {
chunk
.into_iter()
Expand Down

0 comments on commit 87a3e5e

Please sign in to comment.