Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Router simplification #3737

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::{SortField, SortOrder};
use quickwit_proto::search::{SortField, SortOrder};
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use prost_build::{Method, Service, ServiceGenerator};
use quote::{quote, ToTokens};
use syn::Ident;

use crate::ProstConfig;

pub struct Codegen;

impl Codegen {
Expand All @@ -39,7 +41,7 @@ impl Codegen {
result_type_path,
error_type_path,
includes,
prost_build::Config::default(),
ProstConfig::default(),
)
}

Expand All @@ -49,7 +51,7 @@ impl Codegen {
result_type_path: &str,
error_type_path: &str,
includes: &[&str],
mut prost_config: prost_build::Config,
mut prost_config: ProstConfig,
) -> anyhow::Result<()> {
let service_generator = Box::new(QuickwitServiceGenerator::new(
result_type_path,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
mod codegen;

pub use codegen::Codegen;
pub use prost_build::Config as ProstConfig;
23 changes: 12 additions & 11 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::TypeId;
use std::fmt;
use std::pin::Pin;

use futures::{Stream, StreamExt, TryStreamExt};
use futures::{stream, Stream, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::warn;
Expand Down Expand Up @@ -123,19 +123,20 @@ impl<T> From<tonic::Streaming<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(streaming: tonic::Streaming<T>) -> Self {
let ok_streaming = streaming.filter_map(|message| {
Box::pin(async move {
message
.map_err(|status| {
warn!(status=?status, "gRPC transport error.");
status
})
.ok()
let message_stream = stream::unfold(streaming, |mut streaming| {
Box::pin(async {
match streaming.message().await {
Ok(Some(message)) => Some((message, streaming)),
Ok(None) => None,
Err(error) => {
warn!(error=?error, "gRPC transport error.");
None
}
}
})
});

Self {
inner: Box::pin(ok_streaming),
inner: Box::pin(message_stream),
}
}
}
4 changes: 1 addition & 3 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use box_service::BoxService;
pub use buffer::{Buffer, BufferError, BufferLayer};
pub use change::Change;
pub use estimate_rate::{EstimateRate, EstimateRateLayer};
use futures::{Future, Stream};
use futures::Future;
pub use metrics::{PrometheusMetrics, PrometheusMetricsLayer};
pub use pool::Pool;
pub use rate::{ConstantRate, Rate};
Expand All @@ -51,8 +51,6 @@ pub type BoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 's

pub type BoxFutureInfaillible<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;

pub trait Cost {
fn cost(&self) -> u64;
}
3 changes: 2 additions & 1 deletion quickwit/quickwit-common/src/tower/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use tower::discover::Change as TowerChange;
use tower::load::{CompleteOnResponse, PendingRequestsDiscover};
use tower::{BoxError, Service, ServiceExt};

use super::{BoxFuture, BoxStream, Change};
use super::{BoxFuture, Change};
use crate::BoxStream;

// Transforms a boxed stream of `Change<K, Channel>` into a stream of `Result<TowerChange<K,
// Channel>, Infallible>>` while keeping track of the number of connections.
Expand Down
33 changes: 31 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use quickwit_proto::control_plane::{
CloseShardsRequest, CloseShardsResponse, ControlPlaneResult, GetOpenShardsRequest,
GetOpenShardsResponse, NotifyIndexChangeRequest, NotifyIndexChangeResponse,
};
use tracing::debug;

use crate::scheduler::IndexingScheduler;
Expand Down Expand Up @@ -51,7 +54,7 @@ impl ControlPlane {

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for ControlPlane {
type Reply = quickwit_proto::control_plane::Result<NotifyIndexChangeResponse>;
type Reply = ControlPlaneResult<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
Expand All @@ -66,3 +69,29 @@ impl Handler<NotifyIndexChangeRequest> for ControlPlane {
Ok(Ok(NotifyIndexChangeResponse {}))
}
}

#[async_trait]
impl Handler<GetOpenShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<GetOpenShardsResponse>;

async fn handle(
&mut self,
_request: GetOpenShardsRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
unimplemented!()
}
}

#[async_trait]
impl Handler<CloseShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<CloseShardsResponse>;

async fn handle(
&mut self,
_request: CloseShardsRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
unimplemented!()
}
}
6 changes: 4 additions & 2 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::SourceConfig;
use quickwit_metastore::Metastore;
use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use quickwit_proto::control_plane::{
ControlPlaneResult, NotifyIndexChangeRequest, NotifyIndexChangeResponse,
};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use serde::Serialize;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -298,7 +300,7 @@ impl IndexingScheduler {

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for IndexingScheduler {
type Reply = quickwit_proto::control_plane::Result<NotifyIndexChangeResponse>;
type Reply = ControlPlaneResult<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mockall = { workspace = true, optional = true }
mrecordlog = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand All @@ -41,10 +42,11 @@ rand_distr = { workspace = true }
tempfile = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }

[build-dependencies]
quickwit-codegen = { workspace = true }
prost-build = { workspace = true }

[features]
testsuite = ["mockall"]
10 changes: 6 additions & 4 deletions quickwit/quickwit-ingest/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_codegen::Codegen;
use quickwit_codegen::{Codegen, ProstConfig};

fn main() {
let mut config = prost_build::Config::default();
config.bytes(["DocBatch.doc_buffer"]);
// Legacy ingest codegen
let mut prost_config = ProstConfig::default();
prost_config.bytes(["DocBatch.doc_buffer"]);

Codegen::run_with_config(
&["src/ingest_service.proto"],
"src/codegen/",
"crate::Result",
"crate::IngestServiceError",
&[],
config,
prost_config,
)
.unwrap();
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/doc_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where T: Buf + Default
}

/// Copies the command to the end of bytes::BufMut while returning the number of bytes copied
pub fn write(self, buf: &mut impl BufMut) -> usize {
pub fn write(self, mut buf: impl BufMut) -> usize {
let self_buf = self.into_buf();
let len = self_buf.remaining();
buf.put(self_buf);
Expand Down
Loading
Loading