Skip to content

Commit

Permalink
Use metastore stream splits.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jul 9, 2023
1 parent 53d446a commit 29f5602
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl Metastore for FileBackedMetastore {
}

/// Stream splits
async fn splits(
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,20 @@ impl grpc::MetastoreApiService for GrpcMetastoreAdapter {
Ok(tonic::Response::new(list_splits_reply))
}

type SplitsStream = ServiceStream<ListSplitsResponse, tonic::Status>;
/// Stream splits.
type StreamSplitsStream = ServiceStream<ListSplitsResponse, tonic::Status>;
#[instrument(skip(self, request))]
async fn splits(
async fn stream_splits(
&self,
request: tonic::Request<ListSplitsRequest>,
) -> std::result::Result<tonic::Response<Self::SplitsStream>, tonic::Status> {
) -> std::result::Result<tonic::Response<Self::StreamSplitsStream>, tonic::Status> {
set_parent_span_from_request_metadata(request.metadata());
let list_splits_request = request.into_inner();
let query: ListSplitsQuery = serde_json::from_str(&list_splits_request.filter_json)
.map_err(|error| MetastoreError::JsonDeserializeError {
struct_name: "ListSplitsQuery".to_string(),
message: error.to_string(),
})?;
let stream_response = self.0.splits(query).await?;
let stream_response = self.0.stream_splits(query).await?;
let splits_response_stream = stream_response
.map(|result| match result {
Ok(splits) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Metastore for MetastoreGrpcClient {
Ok(splits)
}

async fn splits(
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
Expand All @@ -310,7 +310,7 @@ impl Metastore for MetastoreGrpcClient {
let response = self
.underlying
.clone()
.splits(request)
.stream_splits(request)
.await
.map(|tonic_response| tonic_response.into_inner())
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ impl Metastore for InstrumentedMetastore {
}

/// Stream splits
async fn splits(
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
instrument!(
self.underlying.splits(query.clone()).await,
self.underlying.stream_splits(query.clone()).await,
[splits, query.index_uid.index_id()]
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ impl Metastore for MetastoreEventPublisher {
}

/// Stream splits
async fn splits(
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
self.underlying.splits(query.clone()).await
self.underlying.stream_splits(query.clone()).await
}

async fn list_all_splits(&self, index_uid: IndexUid) -> MetastoreResult<Vec<Split>> {
Expand Down
14 changes: 11 additions & 3 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod retrying_metastore;
use std::ops::{Bound, RangeInclusive};

use async_trait::async_trait;
use futures::TryStreamExt;
pub use index_metadata::IndexMetadata;
use quickwit_common::uri::Uri;
use quickwit_common::ServiceStream;
Expand Down Expand Up @@ -190,7 +191,14 @@ pub trait Metastore: Send + Sync + 'static {
/// Returns a list of splits that intersects the given `time_range`, `split_state`, and `tag`.
/// Regardless of the time range filter, if a split has no timestamp it is always returned.
/// An error will occur if an index that does not exist in the storage is specified.
async fn list_splits(&self, query: ListSplitsQuery) -> MetastoreResult<Vec<Split>>;
async fn list_splits(&self, query: ListSplitsQuery) -> MetastoreResult<Vec<Split>> {
let mut stream = self.stream_splits(query).await?;
let mut splits = Vec::new();
while let Some(splits_batch) = stream.try_next().await? {
splits.extend(splits_batch);
}
Ok(splits)
}

/// Lists all the splits without filtering.
///
Expand All @@ -200,8 +208,8 @@ pub trait Metastore: Send + Sync + 'static {
self.list_splits(query).await
}

/// Stream splits
async fn splits(
/// Stream splits batches matching the given query.
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>>;
Expand Down
68 changes: 18 additions & 50 deletions quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,23 +756,13 @@ impl Metastore for PostgresqlMetastore {
}

#[instrument(skip(self), fields(index_id=query.index_uid.index_id()))]
async fn splits(
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
#[self_referencing]
struct SplitStream<T> {
connection_pool: Pool<Postgres>,
sql: String,
#[borrows(connection_pool, sql)]
#[covariant]
inner: BoxStream<'this, Result<T, sqlx::Error>>,
}

let sql_base = "SELECT * FROM splits".to_string();
let sql: String = build_query_filter(sql_base, &query);
let connection_pool = self.connection_pool.clone();

let split_stream = SplitStream::new(
connection_pool,
sql,
Expand All @@ -782,17 +772,6 @@ impl Metastore for PostgresqlMetastore {
.fetch(connection_pool)
},
);

impl<T> Stream for SplitStream<T> {
type Item = Result<T, sqlx::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
SplitStream::with_inner_mut(&mut self, |this| {
Pin::new(&mut this.as_mut()).poll_next(cx)
})
}
}

let mapped_split_stream = split_stream
.map(|result| match result {
Ok(pg_split) => {
Expand All @@ -808,34 +787,6 @@ impl Metastore for PostgresqlMetastore {
.collect::<Result<Vec<Split>, MetastoreError>>()
});
let service_stream = ServiceStream::new(Box::pin(mapped_split_stream));

// TODO: can we do better than this spawn?
// let (tx, service_stream) = ServiceStream::new_unbounded();
// let connection_pool = self.connection_pool.clone();
// tokio::spawn(async move {
// let sql_base = "SELECT * FROM splits".to_string();
// let sql: String = build_query_filter(sql_base, &query);
// let mut stream = sqlx::query_as::<_, PgSplit>(&sql)
// .bind(query.index_uid.to_string())
// .fetch(&connection_pool)
// .map(|result| match result {
// Ok(pg_split) => {
// let split_res: Result<Split, MetastoreError> = pg_split.try_into();
// split_res
// }
// Err(error) => Err(MetastoreError::from(error)),
// })
// .chunks(100)
// .map(|chunk| {
// chunk
// .into_iter()
// .collect::<Result<Vec<Split>, MetastoreError>>()
// });

// while let Some(item) = stream.next().await {
// tx.send(item).unwrap();
// }
// });
Ok(service_stream)
}

Expand Down Expand Up @@ -1240,6 +1191,23 @@ impl Metastore for PostgresqlMetastore {
}
}

#[self_referencing]
struct SplitStream<T> {
connection_pool: Pool<Postgres>,
sql: String,
#[borrows(connection_pool, sql)]
#[covariant]
inner: BoxStream<'this, Result<T, sqlx::Error>>,
}

impl<T> Stream for SplitStream<T> {
type Item = Result<T, sqlx::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
SplitStream::with_inner_mut(&mut self, |this| Pin::new(&mut this.as_mut()).poll_next(cx))
}
}

// We use dollar-quoted strings in Postgresql.
//
// In order to ensure that we do not risk SQL injection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ impl Metastore for RetryingMetastore {
.await
}

async fn splits(
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
retry(&self.retry_params, || async {
self.inner.splits(query.clone()).await
self.inner.stream_splits(query.clone()).await
})
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Metastore for RetryTestMetastore {
}
}

async fn splits(
async fn stream_splits(
&self,
_query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ pub mod test_suite {
.unwrap();
let query =
ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged);
let mut splits_stream = metastore.splits(query).await.unwrap();
let mut splits_stream = metastore.stream_splits(query).await.unwrap();

let first_batch = splits_stream.try_next().await.unwrap().unwrap();
assert_eq!(first_batch.len(), 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ service MetastoreApiService {
rpc list_splits(ListSplitsRequest) returns (ListSplitsResponse);

/// Stream splits from index.
rpc Splits(ListSplitsRequest) returns (stream ListSplitsResponse);
rpc StreamSplits(ListSplitsRequest) returns (stream ListSplitsResponse);

// Stages several splits.
rpc stage_splits(StageSplitsRequest) returns (SplitResponse);
Expand Down
31 changes: 18 additions & 13 deletions quickwit/quickwit-proto/src/quickwit_metastore_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ pub mod metastore_api_service_client {
self.inner.unary(req, path, codec).await
}
/// / Stream splits from index.
pub async fn splits(
pub async fn stream_splits(
&mut self,
request: impl tonic::IntoRequest<super::ListSplitsRequest>,
) -> std::result::Result<
Expand All @@ -546,14 +546,14 @@ pub mod metastore_api_service_client {
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/quickwit_metastore_api.MetastoreApiService/Splits",
"/quickwit_metastore_api.MetastoreApiService/StreamSplits",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
"quickwit_metastore_api.MetastoreApiService",
"Splits",
"StreamSplits",
),
);
self.inner.server_streaming(req, path, codec).await
Expand Down Expand Up @@ -991,17 +991,20 @@ pub mod metastore_api_service_server {
tonic::Response<super::ListSplitsResponse>,
tonic::Status,
>;
/// Server streaming response type for the Splits method.
type SplitsStream: futures_core::Stream<
/// Server streaming response type for the StreamSplits method.
type StreamSplitsStream: futures_core::Stream<
Item = std::result::Result<super::ListSplitsResponse, tonic::Status>,
>
+ Send
+ 'static;
/// / Stream splits from index.
async fn splits(
async fn stream_splits(
&self,
request: tonic::Request<super::ListSplitsRequest>,
) -> std::result::Result<tonic::Response<Self::SplitsStream>, tonic::Status>;
) -> std::result::Result<
tonic::Response<Self::StreamSplitsStream>,
tonic::Status,
>;
/// Stages several splits.
async fn stage_splits(
&self,
Expand Down Expand Up @@ -1433,15 +1436,15 @@ pub mod metastore_api_service_server {
};
Box::pin(fut)
}
"/quickwit_metastore_api.MetastoreApiService/Splits" => {
"/quickwit_metastore_api.MetastoreApiService/StreamSplits" => {
#[allow(non_camel_case_types)]
struct SplitsSvc<T: MetastoreApiService>(pub Arc<T>);
struct StreamSplitsSvc<T: MetastoreApiService>(pub Arc<T>);
impl<
T: MetastoreApiService,
> tonic::server::ServerStreamingService<super::ListSplitsRequest>
for SplitsSvc<T> {
for StreamSplitsSvc<T> {
type Response = super::ListSplitsResponse;
type ResponseStream = T::SplitsStream;
type ResponseStream = T::StreamSplitsStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
Expand All @@ -1451,7 +1454,9 @@ pub mod metastore_api_service_server {
request: tonic::Request<super::ListSplitsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).splits(request).await };
let fut = async move {
(*inner).stream_splits(request).await
};
Box::pin(fut)
}
}
Expand All @@ -1462,7 +1467,7 @@ pub mod metastore_api_service_server {
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = SplitsSvc(inner);
let method = StreamSplitsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
Expand Down

0 comments on commit 29f5602

Please sign in to comment.