diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index d55781c91c9..96829c75b31 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -554,7 +554,7 @@ impl Metastore for FileBackedMetastore { } /// Stream splits - async fn splits( + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { diff --git a/quickwit/quickwit-metastore/src/metastore/grpc_metastore/grpc_adapter.rs b/quickwit/quickwit-metastore/src/metastore/grpc_metastore/grpc_adapter.rs index 529427a36cd..49af4c06b74 100644 --- a/quickwit/quickwit-metastore/src/metastore/grpc_metastore/grpc_adapter.rs +++ b/quickwit/quickwit-metastore/src/metastore/grpc_metastore/grpc_adapter.rs @@ -181,13 +181,12 @@ impl grpc::MetastoreApiService for GrpcMetastoreAdapter { Ok(tonic::Response::new(list_splits_reply)) } - type SplitsStream = ServiceStream; - /// Stream splits. + type StreamSplitsStream = ServiceStream; #[instrument(skip(self, request))] - async fn splits( + async fn stream_splits( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let list_splits_request = request.into_inner(); let query: ListSplitsQuery = serde_json::from_str(&list_splits_request.filter_json) @@ -195,7 +194,7 @@ impl grpc::MetastoreApiService for GrpcMetastoreAdapter { struct_name: "ListSplitsQuery".to_string(), message: error.to_string(), })?; - let stream_response = self.0.splits(query).await?; + let stream_response = self.0.stream_splits(query).await?; let splits_response_stream = stream_response .map(|result| match result { Ok(splits) => { diff --git a/quickwit/quickwit-metastore/src/metastore/grpc_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/grpc_metastore/mod.rs index c650f3d0fe3..8b7df295dd3 100644 --- a/quickwit/quickwit-metastore/src/metastore/grpc_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/grpc_metastore/mod.rs @@ -296,7 +296,7 @@ impl Metastore for MetastoreGrpcClient { Ok(splits) } - async fn splits( + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { @@ -310,7 +310,7 @@ impl Metastore for MetastoreGrpcClient { let response = self .underlying .clone() - .splits(request) + .stream_splits(request) .await .map(|tonic_response| tonic_response.into_inner()) .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; diff --git a/quickwit/quickwit-metastore/src/metastore/instrumented_metastore.rs b/quickwit/quickwit-metastore/src/metastore/instrumented_metastore.rs index af6242fb570..b76afecb308 100644 --- a/quickwit/quickwit-metastore/src/metastore/instrumented_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/instrumented_metastore.rs @@ -167,12 +167,12 @@ impl Metastore for InstrumentedMetastore { } /// Stream splits - async fn splits( + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { instrument!( - self.underlying.splits(query.clone()).await, + self.underlying.stream_splits(query.clone()).await, [splits, query.index_uid.index_id()] ); } diff --git a/quickwit/quickwit-metastore/src/metastore/metastore_event_publisher.rs b/quickwit/quickwit-metastore/src/metastore/metastore_event_publisher.rs index fc3ee003663..6b5ac5fb0a0 100644 --- a/quickwit/quickwit-metastore/src/metastore/metastore_event_publisher.rs +++ b/quickwit/quickwit-metastore/src/metastore/metastore_event_publisher.rs @@ -164,11 +164,11 @@ impl Metastore for MetastoreEventPublisher { } /// Stream splits - async fn splits( + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { - self.underlying.splits(query.clone()).await + self.underlying.stream_splits(query.clone()).await } async fn list_all_splits(&self, index_uid: IndexUid) -> MetastoreResult> { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index b9bac20dc38..0648f385c1e 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,6 +31,7 @@ pub mod retrying_metastore; use std::ops::{Bound, RangeInclusive}; use async_trait::async_trait; +use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use quickwit_common::uri::Uri; use quickwit_common::ServiceStream; @@ -190,7 +191,14 @@ pub trait Metastore: Send + Sync + 'static { /// Returns a list of splits that intersects the given `time_range`, `split_state`, and `tag`. /// Regardless of the time range filter, if a split has no timestamp it is always returned. /// An error will occur if an index that does not exist in the storage is specified. - async fn list_splits(&self, query: ListSplitsQuery) -> MetastoreResult>; + async fn list_splits(&self, query: ListSplitsQuery) -> MetastoreResult> { + let mut stream = self.stream_splits(query).await?; + let mut splits = Vec::new(); + while let Some(splits_batch) = stream.try_next().await? { + splits.extend(splits_batch); + } + Ok(splits) + } /// Lists all the splits without filtering. /// @@ -200,8 +208,8 @@ pub trait Metastore: Send + Sync + 'static { self.list_splits(query).await } - /// Stream splits - async fn splits( + /// Stream splits batches matching the given query. + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>>; diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 8efc14d8b36..d78422be45a 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -756,23 +756,13 @@ impl Metastore for PostgresqlMetastore { } #[instrument(skip(self), fields(index_id=query.index_uid.index_id()))] - async fn splits( + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { - #[self_referencing] - struct SplitStream { - connection_pool: Pool, - sql: String, - #[borrows(connection_pool, sql)] - #[covariant] - inner: BoxStream<'this, Result>, - } - let sql_base = "SELECT * FROM splits".to_string(); let sql: String = build_query_filter(sql_base, &query); let connection_pool = self.connection_pool.clone(); - let split_stream = SplitStream::new( connection_pool, sql, @@ -782,17 +772,6 @@ impl Metastore for PostgresqlMetastore { .fetch(connection_pool) }, ); - - impl Stream for SplitStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - SplitStream::with_inner_mut(&mut self, |this| { - Pin::new(&mut this.as_mut()).poll_next(cx) - }) - } - } - let mapped_split_stream = split_stream .map(|result| match result { Ok(pg_split) => { @@ -808,34 +787,6 @@ impl Metastore for PostgresqlMetastore { .collect::, MetastoreError>>() }); let service_stream = ServiceStream::new(Box::pin(mapped_split_stream)); - - // TODO: can we do better than this spawn? - // let (tx, service_stream) = ServiceStream::new_unbounded(); - // let connection_pool = self.connection_pool.clone(); - // tokio::spawn(async move { - // let sql_base = "SELECT * FROM splits".to_string(); - // let sql: String = build_query_filter(sql_base, &query); - // let mut stream = sqlx::query_as::<_, PgSplit>(&sql) - // .bind(query.index_uid.to_string()) - // .fetch(&connection_pool) - // .map(|result| match result { - // Ok(pg_split) => { - // let split_res: Result = pg_split.try_into(); - // split_res - // } - // Err(error) => Err(MetastoreError::from(error)), - // }) - // .chunks(100) - // .map(|chunk| { - // chunk - // .into_iter() - // .collect::, MetastoreError>>() - // }); - - // while let Some(item) = stream.next().await { - // tx.send(item).unwrap(); - // } - // }); Ok(service_stream) } @@ -1240,6 +1191,23 @@ impl Metastore for PostgresqlMetastore { } } +#[self_referencing] +struct SplitStream { + connection_pool: Pool, + sql: String, + #[borrows(connection_pool, sql)] + #[covariant] + inner: BoxStream<'this, Result>, +} + +impl Stream for SplitStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + SplitStream::with_inner_mut(&mut self, |this| Pin::new(&mut this.as_mut()).poll_next(cx)) + } +} + // We use dollar-quoted strings in Postgresql. // // In order to ensure that we do not risk SQL injection, diff --git a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs index 2edf301d723..abc8682cf36 100644 --- a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs @@ -143,12 +143,12 @@ impl Metastore for RetryingMetastore { .await } - async fn splits( + async fn stream_splits( &self, query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { retry(&self.retry_params, || async { - self.inner.splits(query.clone()).await + self.inner.stream_splits(query.clone()).await }) .await } diff --git a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/test.rs b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/test.rs index be4f4dca945..b4473c5c167 100644 --- a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/test.rs +++ b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/test.rs @@ -136,7 +136,7 @@ impl Metastore for RetryTestMetastore { } } - async fn splits( + async fn stream_splits( &self, _query: ListSplitsQuery, ) -> MetastoreResult, MetastoreError>> { diff --git a/quickwit/quickwit-metastore/src/tests.rs b/quickwit/quickwit-metastore/src/tests.rs index 6bdde6d4458..6b02b67238e 100644 --- a/quickwit/quickwit-metastore/src/tests.rs +++ b/quickwit/quickwit-metastore/src/tests.rs @@ -1675,7 +1675,7 @@ pub mod test_suite { .unwrap(); let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged); - let mut splits_stream = metastore.splits(query).await.unwrap(); + let mut splits_stream = metastore.stream_splits(query).await.unwrap(); let first_batch = splits_stream.try_next().await.unwrap().unwrap(); assert_eq!(first_batch.len(), 100); diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore_api.proto b/quickwit/quickwit-proto/protos/quickwit/metastore_api.proto index cba9b8ca8d8..d0dbd1f47cc 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore_api.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore_api.proto @@ -42,7 +42,7 @@ service MetastoreApiService { rpc list_splits(ListSplitsRequest) returns (ListSplitsResponse); /// Stream splits from index. - rpc Splits(ListSplitsRequest) returns (stream ListSplitsResponse); + rpc StreamSplits(ListSplitsRequest) returns (stream ListSplitsResponse); // Stages several splits. rpc stage_splits(StageSplitsRequest) returns (SplitResponse); diff --git a/quickwit/quickwit-proto/src/quickwit_metastore_api.rs b/quickwit/quickwit-proto/src/quickwit_metastore_api.rs index f4695e06fae..35067071a0e 100644 --- a/quickwit/quickwit-proto/src/quickwit_metastore_api.rs +++ b/quickwit/quickwit-proto/src/quickwit_metastore_api.rs @@ -528,7 +528,7 @@ pub mod metastore_api_service_client { self.inner.unary(req, path, codec).await } /// / Stream splits from index. - pub async fn splits( + pub async fn stream_splits( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< @@ -546,14 +546,14 @@ pub mod metastore_api_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit_metastore_api.MetastoreApiService/Splits", + "/quickwit_metastore_api.MetastoreApiService/StreamSplits", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( "quickwit_metastore_api.MetastoreApiService", - "Splits", + "StreamSplits", ), ); self.inner.server_streaming(req, path, codec).await @@ -991,17 +991,20 @@ pub mod metastore_api_service_server { tonic::Response, tonic::Status, >; - /// Server streaming response type for the Splits method. - type SplitsStream: futures_core::Stream< + /// Server streaming response type for the StreamSplits method. + type StreamSplitsStream: futures_core::Stream< Item = std::result::Result, > + Send + 'static; /// / Stream splits from index. - async fn splits( + async fn stream_splits( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Stages several splits. async fn stage_splits( &self, @@ -1433,15 +1436,15 @@ pub mod metastore_api_service_server { }; Box::pin(fut) } - "/quickwit_metastore_api.MetastoreApiService/Splits" => { + "/quickwit_metastore_api.MetastoreApiService/StreamSplits" => { #[allow(non_camel_case_types)] - struct SplitsSvc(pub Arc); + struct StreamSplitsSvc(pub Arc); impl< T: MetastoreApiService, > tonic::server::ServerStreamingService - for SplitsSvc { + for StreamSplitsSvc { type Response = super::ListSplitsResponse; - type ResponseStream = T::SplitsStream; + type ResponseStream = T::StreamSplitsStream; type Future = BoxFuture< tonic::Response, tonic::Status, @@ -1451,7 +1454,9 @@ pub mod metastore_api_service_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).splits(request).await }; + let fut = async move { + (*inner).stream_splits(request).await + }; Box::pin(fut) } } @@ -1462,7 +1467,7 @@ pub mod metastore_api_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = SplitsSvc(inner); + let method = StreamSplitsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config(