diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6a91de1b6f8..95647618dc7 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5389,24 +5389,6 @@ dependencies = [ ] [[package]] -<<<<<<< HEAD -======= -name = "quickwit-metastore-utils" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "futures", - "quickwit-common", - "quickwit-proto", - "serde", - "serde_json", - "structopt", - "tokio", -] - -[[package]] ->>>>>>> 476dce104 (POC) name = "quickwit-opentelemetry" version = "0.6.2" dependencies = [ diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index b570ec774ee..c3a62d02c10 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -46,6 +46,8 @@ arc-swap = "1.6" assert-json-diff = "2" async-speed-limit = "0.4" async-trait = "0.1" +async-compression = { version = "0.4.1", features = ["gzip", "futures-io"] } +atty = "0.2" backoff = { version = "0.4", features = ["tokio"] } base64 = "0.21" byte-unit = { version = "4", default-features = false, features = ["serde", "std"] } @@ -138,6 +140,7 @@ regex = "1.9.1" reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls", + "stream", ] } rust-embed = "6.8.1" serde = { version = "1.0", features = ["derive", "rc"] } diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index 3a95b02930e..1cff9734d07 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -33,20 +33,16 @@ pub struct ServiceStream { inner: BoxStream, } -impl fmt::Debug for ServiceStream -where T: 'static -impl ServiceStream { - pub fn new(inner: BoxStream>) -> Self { +impl ServiceStream +where T: Send + 'static +{ + pub fn new(inner: BoxStream) -> Self { Self { inner } } } -impl Unpin for ServiceStream {} - -impl ServiceStream -where - T: Send + 'static, - E: Send + 'static, +impl fmt::Debug for ServiceStream +where T: Send + 'static { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "ServiceStream<{:?}>", TypeId::of::()) diff --git a/quickwit/quickwit-metastore-utils/Cargo.toml b/quickwit/quickwit-metastore-utils/Cargo.toml index 1595cc83306..2661752d907 100644 --- a/quickwit/quickwit-metastore-utils/Cargo.toml +++ b/quickwit/quickwit-metastore-utils/Cargo.toml @@ -20,4 +20,4 @@ quickwit-common = { workspace = true } quickwit-proto = { workspace = true } serde = "1" structopt = "0.3" -tokio = {workspace = true} +tokio = { workspace = true} diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index f5bca5f778e..86632828753 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -48,6 +48,7 @@ use self::store_operations::{ check_indexes_states_exist, delete_index, fetch_index, fetch_or_init_indexes_states, index_exists, put_index, put_indexes_states, }; +use super::STREAM_SPLITS_CHUNK_SIZE; use crate::checkpoint::IndexCheckpointDelta; use crate::{ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, MetastoreResult, Split, @@ -553,14 +554,13 @@ impl Metastore for FileBackedMetastore { .await } - /// Stream splits async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { let splits = self.list_splits(query).await?; let chunks = splits - .chunks(100) + .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| Ok(chunk.to_vec())) .collect_vec(); let stream_ok = futures::stream::iter(chunks.into_iter()); diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 92030bbedcb..ae272fc5ee6 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -44,6 +44,9 @@ use time::OffsetDateTime; use crate::checkpoint::IndexCheckpointDelta; use crate::{MetastoreError, MetastoreResult, Split, SplitMetadata, SplitState}; +/// Splits batch size returned by the stream splits API +const STREAM_SPLITS_CHUNK_SIZE: usize = 100; + /// Metastore meant to manage Quickwit's indexes, their splits and delete tasks. /// /// I. Index and splits management. diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 156f4b725b2..e1c01b477dc 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -20,8 +20,8 @@ use std::collections::HashMap; use std::fmt::{Display, Write}; use std::ops::Bound; -use std::str::FromStr; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -47,6 +47,7 @@ use tokio_stream::Stream; use tracing::log::LevelFilter; use tracing::{debug, error, info, instrument, warn}; +use super::STREAM_SPLITS_CHUNK_SIZE; use crate::checkpoint::IndexCheckpointDelta; use crate::metastore::instrumented_metastore::InstrumentedMetastore; use crate::metastore::postgresql_model::{ @@ -786,7 +787,7 @@ impl Metastore for PostgresqlMetastore { } Err(error) => Err(MetastoreError::from(error)), }) - .chunks(100) + .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| { chunk .into_iter() diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 2aa1a34d6bf..030a2111be5 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -42,7 +42,7 @@ service MetastoreService { rpc list_splits(ListSplitsRequest) returns (ListSplitsResponse); /// Stream splits from index. - rpc StreamSplits(ListSplitsRequest) returns (stream ListSplitsResponse); + rpc stream_splits(ListSplitsRequest) returns (stream ListSplitsResponse); // Stages several splits. rpc stage_splits(StageSplitsRequest) returns (SplitResponse); diff --git a/quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs index e8ba3f7f31c..9775fc6e4f0 100644 --- a/quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs @@ -543,14 +543,14 @@ pub mod metastore_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit_metastore_api.MetastoreApiService/StreamSplits", + "/quickwit.metastore.MetastoreService/stream_splits", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "quickwit_metastore_api.MetastoreApiService", - "StreamSplits", + "quickwit.metastore.MetastoreService", + "stream_splits", ), ); self.inner.server_streaming(req, path, codec).await @@ -985,8 +985,8 @@ pub mod metastore_service_server { tonic::Response, tonic::Status, >; - /// Server streaming response type for the StreamSplits method. - type StreamSplitsStream: futures_core::Stream< + /// Server streaming response type for the stream_splits method. + type stream_splitsStream: futures_core::Stream< Item = std::result::Result, > + Send @@ -996,7 +996,7 @@ pub mod metastore_service_server { &self, request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// Stages several splits. @@ -1430,15 +1430,15 @@ pub mod metastore_service_server { }; Box::pin(fut) } - "/quickwit.metastore.MetastoreService/StreamSplits" => { + "/quickwit.metastore.MetastoreService/stream_splits" => { #[allow(non_camel_case_types)] - struct StreamSplitsSvc(pub Arc); + struct stream_splitsSvc(pub Arc); impl< - T: MetastoreApiService, + T: MetastoreService, > tonic::server::ServerStreamingService - for StreamSplitsSvc { + for stream_splitsSvc { type Response = super::ListSplitsResponse; - type ResponseStream = T::StreamSplitsStream; + type ResponseStream = T::stream_splitsStream; type Future = BoxFuture< tonic::Response, tonic::Status, @@ -1461,7 +1461,7 @@ pub mod metastore_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = StreamSplitsSvc(inner); + let method = stream_splitsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config(