Skip to content

Commit

Permalink
Clean.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jul 20, 2023
1 parent 3e241b8 commit e4c2757
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 46 deletions.
18 changes: 0 additions & 18 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ arc-swap = "1.6"
assert-json-diff = "2"
async-speed-limit = "0.4"
async-trait = "0.1"
async-compression = { version = "0.4.1", features = ["gzip", "futures-io"] }
atty = "0.2"
backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.21"
byte-unit = { version = "4", default-features = false, features = ["serde", "std"] }
Expand Down Expand Up @@ -138,6 +140,7 @@ regex = "1.9.1"
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls",
"stream",
] }
rust-embed = "6.8.1"
serde = { version = "1.0", features = ["derive", "rc"] }
Expand Down
16 changes: 6 additions & 10 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,16 @@ pub struct ServiceStream<T> {
inner: BoxStream<T>,
}

impl<T> fmt::Debug for ServiceStream<T>
where T: 'static
impl<T, E> ServiceStream<T, E> {
pub fn new(inner: BoxStream<Result<T, E>>) -> Self {
impl<T> ServiceStream<T>
where T: Send + 'static
{
pub fn new(inner: BoxStream<T>) -> Self {
Self { inner }
}
}

impl<T, E> Unpin for ServiceStream<T, E> {}

impl<T, E> ServiceStream<T, E>
where
T: Send + 'static,
E: Send + 'static,
impl<T> fmt::Debug for ServiceStream<T>
where T: Send + 'static
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ServiceStream<{:?}>", TypeId::of::<T>())
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ quickwit-common = { workspace = true }
quickwit-proto = { workspace = true }
serde = "1"
structopt = "0.3"
tokio = {workspace = true}
tokio = { workspace = true}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use self::store_operations::{
check_indexes_states_exist, delete_index, fetch_index, fetch_or_init_indexes_states,
index_exists, put_index, put_indexes_states,
};
use super::STREAM_SPLITS_CHUNK_SIZE;
use crate::checkpoint::IndexCheckpointDelta;
use crate::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, MetastoreResult, Split,
Expand Down Expand Up @@ -553,14 +554,13 @@ impl Metastore for FileBackedMetastore {
.await
}

/// Stream splits
async fn stream_splits(
&self,
query: ListSplitsQuery,
) -> MetastoreResult<ServiceStream<Vec<Split>, MetastoreError>> {
let splits = self.list_splits(query).await?;
let chunks = splits
.chunks(100)
.chunks(STREAM_SPLITS_CHUNK_SIZE)
.map(|chunk| Ok(chunk.to_vec()))
.collect_vec();
let stream_ok = futures::stream::iter(chunks.into_iter());
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ use time::OffsetDateTime;
use crate::checkpoint::IndexCheckpointDelta;
use crate::{MetastoreError, MetastoreResult, Split, SplitMetadata, SplitState};

/// Splits batch size returned by the stream splits API
const STREAM_SPLITS_CHUNK_SIZE: usize = 100;

/// Metastore meant to manage Quickwit's indexes, their splits and delete tasks.
///
/// I. Index and splits management.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
use std::collections::HashMap;
use std::fmt::{Display, Write};
use std::ops::Bound;
use std::str::FromStr;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
Expand All @@ -47,6 +47,7 @@ use tokio_stream::Stream;
use tracing::log::LevelFilter;
use tracing::{debug, error, info, instrument, warn};

use super::STREAM_SPLITS_CHUNK_SIZE;
use crate::checkpoint::IndexCheckpointDelta;
use crate::metastore::instrumented_metastore::InstrumentedMetastore;
use crate::metastore::postgresql_model::{
Expand Down Expand Up @@ -786,7 +787,7 @@ impl Metastore for PostgresqlMetastore {
}
Err(error) => Err(MetastoreError::from(error)),
})
.chunks(100)
.chunks(STREAM_SPLITS_CHUNK_SIZE)
.map(|chunk| {
chunk
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/protos/quickwit/metastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ service MetastoreService {
rpc list_splits(ListSplitsRequest) returns (ListSplitsResponse);

/// Stream splits from index.
rpc StreamSplits(ListSplitsRequest) returns (stream ListSplitsResponse);
rpc stream_splits(ListSplitsRequest) returns (stream ListSplitsResponse);

// Stages several splits.
rpc stage_splits(StageSplitsRequest) returns (SplitResponse);
Expand Down
24 changes: 12 additions & 12 deletions quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,14 @@ pub mod metastore_service_client {
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/quickwit_metastore_api.MetastoreApiService/StreamSplits",
"/quickwit.metastore.MetastoreService/stream_splits",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
"quickwit_metastore_api.MetastoreApiService",
"StreamSplits",
"quickwit.metastore.MetastoreService",
"stream_splits",
),
);
self.inner.server_streaming(req, path, codec).await
Expand Down Expand Up @@ -985,8 +985,8 @@ pub mod metastore_service_server {
tonic::Response<super::ListSplitsResponse>,
tonic::Status,
>;
/// Server streaming response type for the StreamSplits method.
type StreamSplitsStream: futures_core::Stream<
/// Server streaming response type for the stream_splits method.
type stream_splitsStream: futures_core::Stream<
Item = std::result::Result<super::ListSplitsResponse, tonic::Status>,
>
+ Send
Expand All @@ -996,7 +996,7 @@ pub mod metastore_service_server {
&self,
request: tonic::Request<super::ListSplitsRequest>,
) -> std::result::Result<
tonic::Response<Self::StreamSplitsStream>,
tonic::Response<Self::stream_splitsStream>,
tonic::Status,
>;
/// Stages several splits.
Expand Down Expand Up @@ -1430,15 +1430,15 @@ pub mod metastore_service_server {
};
Box::pin(fut)
}
"/quickwit.metastore.MetastoreService/StreamSplits" => {
"/quickwit.metastore.MetastoreService/stream_splits" => {
#[allow(non_camel_case_types)]
struct StreamSplitsSvc<T: MetastoreApiService>(pub Arc<T>);
struct stream_splitsSvc<T: MetastoreService>(pub Arc<T>);
impl<
T: MetastoreApiService,
T: MetastoreService,
> tonic::server::ServerStreamingService<super::ListSplitsRequest>
for StreamSplitsSvc<T> {
for stream_splitsSvc<T> {
type Response = super::ListSplitsResponse;
type ResponseStream = T::StreamSplitsStream;
type ResponseStream = T::stream_splitsStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
Expand All @@ -1461,7 +1461,7 @@ pub mod metastore_service_server {
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = StreamSplitsSvc(inner);
let method = stream_splitsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
Expand Down

0 comments on commit e4c2757

Please sign in to comment.