diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 3a2cd756643..adb40a4d626 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -4995,7 +4995,6 @@ dependencies = [
"quickwit-codegen",
"quickwit-common",
"quickwit-config",
- "quickwit-grpc-clients",
"quickwit-indexing",
"quickwit-metastore",
"quickwit-proto",
@@ -5165,7 +5164,6 @@ dependencies = [
"quickwit-config",
"quickwit-directories",
"quickwit-doc-mapper",
- "quickwit-grpc-clients",
"quickwit-ingest",
"quickwit-metastore",
"quickwit-proto",
@@ -5273,7 +5271,6 @@ dependencies = [
"quickwit-cluster",
"quickwit-common",
"quickwit-config",
- "quickwit-grpc-clients",
"quickwit-indexing",
"quickwit-ingest",
"quickwit-metastore",
@@ -5310,7 +5307,6 @@ dependencies = [
"quickwit-config",
"quickwit-directories",
"quickwit-doc-mapper",
- "quickwit-grpc-clients",
"quickwit-indexing",
"quickwit-metastore",
"quickwit-proto",
@@ -5566,7 +5562,6 @@ dependencies = [
"quickwit-core",
"quickwit-directories",
"quickwit-doc-mapper",
- "quickwit-grpc-clients",
"quickwit-indexing",
"quickwit-ingest",
"quickwit-jaeger",
diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml
index 63908a8f5fc..e1650eedc46 100644
--- a/quickwit/quickwit-control-plane/Cargo.toml
+++ b/quickwit/quickwit-control-plane/Cargo.toml
@@ -34,7 +34,6 @@ quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
-quickwit-grpc-clients = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
@@ -50,6 +49,7 @@ quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
+quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }
[build-dependencies]
diff --git a/quickwit/quickwit-control-plane/build.rs b/quickwit/quickwit-control-plane/build.rs
deleted file mode 100644
index 6f16b01a656..00000000000
--- a/quickwit/quickwit-control-plane/build.rs
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright (C) 2023 Quickwit, Inc.
-//
-// Quickwit is offered under the AGPL v3.0 and as commercial software.
-// For commercial licensing, contact us at hello@quickwit.io.
-//
-// AGPL:
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-use quickwit_codegen::Codegen;
-
-fn main() {
- Codegen::run(
- &["src/control_plane.proto"],
- "src/codegen/",
- "crate::Result",
- "crate::ControlPlaneError",
- &[],
- )
- .unwrap();
-}
diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs
index 5e47c0d837d..fb57a418d3e 100644
--- a/quickwit/quickwit-control-plane/src/control_plane.rs
+++ b/quickwit/quickwit-control-plane/src/control_plane.rs
@@ -20,10 +20,10 @@
use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
+use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use tracing::debug;
use crate::scheduler::IndexingScheduler;
-use crate::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
#[derive(Debug)]
pub struct ControlPlane {
@@ -51,7 +51,7 @@ impl ControlPlane {
#[async_trait]
impl Handler for ControlPlane {
- type Reply = crate::Result;
+ type Reply = quickwit_proto::control_plane::Result;
async fn handle(
&mut self,
diff --git a/quickwit/quickwit-control-plane/src/grpc_adapter.rs b/quickwit/quickwit-control-plane/src/grpc_adapter.rs
deleted file mode 100644
index 2d5fa768743..00000000000
--- a/quickwit/quickwit-control-plane/src/grpc_adapter.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright (C) 2023 Quickwit, Inc.
-//
-// Quickwit is offered under the AGPL v3.0 and as commercial software.
-// For commercial licensing, contact us at hello@quickwit.io.
-//
-// AGPL:
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-use async_trait::async_trait;
-use quickwit_actors::Mailbox;
-use quickwit_proto::control_plane_api::control_plane_service_server::{self as grpc};
-use quickwit_proto::control_plane_api::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
-use quickwit_proto::tonic;
-
-use crate::scheduler::IndexingScheduler;
-
-#[allow(missing_docs)]
-#[derive(Clone)]
-pub struct GrpcControlPlaneAdapter(Mailbox);
-
-impl From> for GrpcControlPlaneAdapter {
- fn from(indexing_scheduler: Mailbox) -> Self {
- Self(indexing_scheduler)
- }
-}
-
-#[async_trait]
-impl grpc::ControlPlaneService for GrpcControlPlaneAdapter {
- async fn notify_index_change(
- &self,
- request: tonic::Request,
- ) -> Result, tonic::Status> {
- let index_event_request = request.into_inner();
- let create_index_reply = self
- .0
- .ask(index_event_request)
- .await
- .map(|_| NotifyIndexChangeResponse {})
- .map_err(|send_error| {
- quickwit_proto::tonic::Status::new(tonic::Code::Internal, send_error.to_string())
- })?;
- Ok(tonic::Response::new(create_index_reply))
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::collections::{HashMap, HashSet};
- use std::net::SocketAddr;
- use std::sync::Arc;
-
- use chitchat::transport::ChannelTransport;
- use quickwit_actors::{Mailbox, Universe};
- use quickwit_cluster::{create_cluster_for_test, ClusterMember};
- use quickwit_config::service::QuickwitService;
- use quickwit_grpc_clients::service_client_pool::ServiceClientPool;
- use quickwit_grpc_clients::ControlPlaneGrpcClient;
- use quickwit_metastore::MockMetastore;
- use quickwit_proto::control_plane_api::control_plane_service_server::ControlPlaneServiceServer;
- use quickwit_proto::tonic::transport::Server;
- use tokio::sync::watch;
- use tokio_stream::wrappers::WatchStream;
-
- use super::GrpcControlPlaneAdapter;
- use crate::scheduler::IndexingScheduler;
-
- async fn start_grpc_server(
- address: SocketAddr,
- indexing_scheduler: Mailbox,
- ) -> anyhow::Result<()> {
- let grpc_adapter = GrpcControlPlaneAdapter::from(indexing_scheduler);
- tokio::spawn(async move {
- Server::builder()
- .add_service(ControlPlaneServiceServer::new(grpc_adapter))
- .serve(address)
- .await?;
- Result::<_, anyhow::Error>::Ok(())
- });
- Ok(())
- }
-
- #[tokio::test]
- async fn test_control_plane_grpc_client() -> anyhow::Result<()> {
- quickwit_common::setup_logging_for_tests();
- let universe = Universe::with_accelerated_time();
- let mut metastore = MockMetastore::default();
- metastore
- .expect_list_indexes_metadatas()
- .returning(move || Ok(Vec::new()));
- let transport = ChannelTransport::default();
- let cluster = Arc::new(
- create_cluster_for_test(Vec::new(), &["control_plane", "indexer"], &transport, true)
- .await
- .unwrap(),
- );
- let scheduler = IndexingScheduler::new(
- cluster,
- Arc::new(metastore),
- ServiceClientPool::new(HashMap::new()),
- );
- let (_, scheduler_handler) = universe.spawn_builder().spawn(scheduler);
- let control_plane_grpc_addr_port = quickwit_common::net::find_available_tcp_port().unwrap();
- let control_plane_grpc_addr: SocketAddr =
- ([127, 0, 0, 1], control_plane_grpc_addr_port).into();
- start_grpc_server(control_plane_grpc_addr, scheduler_handler.mailbox().clone()).await?;
- let control_plane_service_member = ClusterMember::new(
- "1".to_string(),
- 0,
- HashSet::from([QuickwitService::ControlPlane]),
- control_plane_grpc_addr,
- control_plane_grpc_addr,
- Vec::new(),
- );
- let (_members_tx, members_rx) =
- watch::channel::>(vec![control_plane_service_member.clone()]);
- let watch_members = WatchStream::new(members_rx);
- let mut control_plane_client =
- ControlPlaneGrpcClient::create_and_update_from_members(watch_members)
- .await
- .unwrap();
- let result = control_plane_client.notify_index_change().await;
- assert!(result.is_ok());
- let scheduler_state = scheduler_handler.process_pending_and_observe().await;
- assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1);
- universe.assert_quit().await;
-
- Ok(())
- }
-}
diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs
index 85f7ffd9128..6fdb074891f 100644
--- a/quickwit/quickwit-control-plane/src/indexing_plan.rs
+++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs
@@ -324,8 +324,8 @@ mod tests {
FileSourceParams, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams,
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
- use quickwit_proto::indexing::IndexingTask;
- use quickwit_proto::{IndexUid, IndexingServiceClient};
+ use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask};
+ use quickwit_proto::IndexUid;
use rand::seq::SliceRandom;
use serde_json::json;
use tonic::transport::Endpoint;
diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs
index 0167f2fe93e..533b0ef37eb 100644
--- a/quickwit/quickwit-control-plane/src/lib.rs
+++ b/quickwit/quickwit-control-plane/src/lib.rs
@@ -18,26 +18,24 @@
// along with this program. If not, see .
pub mod control_plane;
-#[path = "codegen/control_plane_service.rs"]
-mod control_plane_service;
pub mod indexing_plan;
pub mod scheduler;
use std::sync::Arc;
use async_trait::async_trait;
-pub use control_plane_service::*;
-use quickwit_actors::{AskError, Mailbox, Universe};
+use quickwit_actors::{Mailbox, Universe};
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::tower::Pool;
use quickwit_config::SourceParams;
use quickwit_metastore::{Metastore, MetastoreEvent};
+use quickwit_proto::control_plane::{
+ ControlPlaneService, ControlPlaneServiceClient, NotifyIndexChangeRequest,
+};
use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask};
use scheduler::IndexingScheduler;
use tracing::error;
-pub type Result = std::result::Result;
-
/// Indexer-node specific information stored in the pool of available indexer nodes
#[derive(Debug, Clone)]
pub struct IndexerNodeInfo {
@@ -47,48 +45,6 @@ pub struct IndexerNodeInfo {
pub type IndexerPool = Pool;
-#[derive(Debug, Clone, thiserror::Error)]
-pub enum ControlPlaneError {
- #[error("An internal error occurred: {0}.")]
- Internal(String),
- #[error("Control plane is unavailable: {0}.")]
- Unavailable(String),
-}
-
-impl From for tonic::Status {
- fn from(error: ControlPlaneError) -> Self {
- match error {
- ControlPlaneError::Internal(message) => tonic::Status::internal(message),
- ControlPlaneError::Unavailable(message) => tonic::Status::unavailable(message),
- }
- }
-}
-
-impl From for ControlPlaneError {
- fn from(status: tonic::Status) -> Self {
- match status.code() {
- tonic::Code::Unavailable => {
- ControlPlaneError::Unavailable(status.message().to_string())
- }
- _ => ControlPlaneError::Internal(status.message().to_string()),
- }
- }
-}
-
-impl From> for ControlPlaneError {
- fn from(error: AskError) -> Self {
- match error {
- AskError::ErrorReply(error) => error,
- AskError::MessageNotDelivered => {
- ControlPlaneError::Unavailable("Request not delivered".to_string())
- }
- AskError::ProcessMessageError => ControlPlaneError::Internal(
- "An error occurred while processing the request".to_string(),
- ),
- }
- }
-}
-
/// Starts the Control Plane.
pub async fn start_indexing_scheduler(
cluster_id: String,
@@ -102,6 +58,9 @@ pub async fn start_indexing_scheduler(
Ok(scheduler_mailbox)
}
+#[derive(Debug, Clone)]
+pub struct ControlPlaneEventSubscriber(pub ControlPlaneServiceClient);
+
/// Notify the control plane when one of the following event occurs:
/// - an index is deleted.
/// - a source, other than the ingest CLI source, is created.
@@ -113,7 +72,7 @@ pub async fn start_indexing_scheduler(
// - We don't sent any data to the Control Plane. It could be nice to send the relevant data to the
// control plane and let it decide to schedule or not indexing tasks.
#[async_trait]
-impl EventSubscriber for ControlPlaneServiceClient {
+impl EventSubscriber for ControlPlaneEventSubscriber {
async fn handle_event(&mut self, event: MetastoreEvent) {
let event = match event {
MetastoreEvent::DeleteIndex { .. } => "delete-index",
@@ -129,7 +88,11 @@ impl EventSubscriber for ControlPlaneServiceClient {
MetastoreEvent::ToggleSource { .. } => "toggle-source",
MetastoreEvent::DeleteSource { .. } => "delete-source",
};
- if let Err(error) = self.notify_index_change(NotifyIndexChangeRequest {}).await {
+ if let Err(error) = self
+ .0
+ .notify_index_change(NotifyIndexChangeRequest {})
+ .await
+ {
error!(error=?error, event=event, "Failed to notify control plane of index change.");
}
}
@@ -138,6 +101,7 @@ impl EventSubscriber for ControlPlaneServiceClient {
#[cfg(test)]
mod tests {
use quickwit_config::SourceConfig;
+ use quickwit_proto::control_plane::NotifyIndexChangeResponse;
use quickwit_proto::IndexUid;
use super::*;
@@ -148,7 +112,8 @@ mod tests {
mock.expect_notify_index_change()
.return_once(|_| Ok(NotifyIndexChangeResponse {}));
- let mut control_plane = ControlPlaneServiceClient::new(mock);
+ let mut control_plane_event_subscriber =
+ ControlPlaneEventSubscriber(ControlPlaneServiceClient::new(mock));
let index_uid = IndexUid::new("test-index");
@@ -156,18 +121,18 @@ mod tests {
index_uid: index_uid.clone(),
source_config: SourceConfig::for_test("test-source", SourceParams::IngestApi),
};
- control_plane.handle_event(event).await;
+ control_plane_event_subscriber.handle_event(event).await;
let event = MetastoreEvent::AddSource {
index_uid: index_uid.clone(),
source_config: SourceConfig::for_test("test-source", SourceParams::file("test-file")),
};
- control_plane.handle_event(event).await;
+ control_plane_event_subscriber.handle_event(event).await;
let event = MetastoreEvent::AddSource {
index_uid: index_uid.clone(),
source_config: SourceConfig::for_test("test-source", SourceParams::IngestCli),
};
- control_plane.handle_event(event).await;
+ control_plane_event_subscriber.handle_event(event).await;
}
}
diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/scheduler.rs
index 9938e7ee6be..d9adf1ee89e 100644
--- a/quickwit/quickwit-control-plane/src/scheduler.rs
+++ b/quickwit/quickwit-control-plane/src/scheduler.rs
@@ -29,15 +29,15 @@ use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::SourceConfig;
use quickwit_metastore::Metastore;
-use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingTask};
-use quickwit_proto::IndexingService;
+use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
+use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use serde::Serialize;
use tracing::{debug, error, info, warn};
use crate::indexing_plan::{
build_indexing_plan, build_physical_indexing_plan, IndexSourceId, PhysicalIndexingPlan,
};
-use crate::{IndexerNodeInfo, IndexerPool, NotifyIndexChangeRequest, NotifyIndexChangeResponse};
+use crate::{IndexerNodeInfo, IndexerPool};
/// Interval between two controls (or checks) of the desired plan VS running plan.
const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
@@ -298,7 +298,7 @@ impl IndexingScheduler {
#[async_trait]
impl Handler for IndexingScheduler {
- type Reply = crate::Result;
+ type Reply = quickwit_proto::control_plane::Result;
async fn handle(
&mut self,
@@ -531,8 +531,7 @@ mod tests {
use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{IndexMetadata, MockMetastore};
- use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingTask};
- use quickwit_proto::IndexingServiceClient;
+ use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient, IndexingTask};
use serde_json::json;
use super::{IndexingScheduler, CONTROL_PLAN_LOOP_INTERVAL};
diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml
index 4782b821ea7..77e7556d8ec 100644
--- a/quickwit/quickwit-indexing/Cargo.toml
+++ b/quickwit/quickwit-indexing/Cargo.toml
@@ -54,7 +54,6 @@ quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
-quickwit-grpc-clients = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs
index 1a97a6ab826..defacafb39d 100644
--- a/quickwit/quickwit-indexing/failpoints/mod.rs
+++ b/quickwit/quickwit-indexing/failpoints/mod.rs
@@ -50,7 +50,7 @@ use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_indexing::models::MergeScratch;
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox};
use quickwit_metastore::{ListSplitsQuery, Split, SplitMetadata, SplitState};
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use serde_json::Value as JsonValue;
use tantivy::{Directory, Inventory};
diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs
index 4726f913654..1b02250c119 100644
--- a/quickwit/quickwit-indexing/src/actors/indexer.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexer.rs
@@ -39,7 +39,7 @@ use quickwit_config::IndexingSettings;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
use quickwit_metastore::Metastore;
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use serde::Serialize;
use tantivy::schema::Schema;
diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
index ac5d1b12410..8010a3b0cb9 100644
--- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
@@ -31,7 +31,7 @@ use quickwit_common::KillSwitch;
use quickwit_config::{IndexingSettings, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError};
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs
index 206a09edbc8..34d12e1c248 100644
--- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs
@@ -38,8 +38,10 @@ use quickwit_config::{
};
use quickwit_ingest::{DropQueueRequest, IngestApiService, ListQueuesRequest, QUEUES_DIR_NAME};
use quickwit_metastore::{IndexMetadata, Metastore};
-use quickwit_proto::indexing::{ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingTask};
-use quickwit_proto::indexing_api::{IndexingPipelineId, IndexingServiceError};
+use quickwit_proto::indexing::{
+ ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId,
+ IndexingTask,
+};
use quickwit_proto::IndexUid;
use quickwit_storage::StorageResolver;
use serde::{Deserialize, Serialize};
@@ -161,11 +163,11 @@ impl IndexingService {
async fn detach_pipeline(
&mut self,
pipeline_id: &IndexingPipelineId,
- ) -> Result, IndexingServiceError> {
+ ) -> Result, IndexingError> {
let pipeline_handle = self
.indexing_pipeline_handles
.remove(pipeline_id)
- .ok_or_else(|| IndexingServiceError::MissingPipeline {
+ .ok_or_else(|| IndexingError::MissingPipeline {
index_id: pipeline_id.index_uid.index_id().to_string(),
source_id: pipeline_id.source_id.clone(),
})?;
@@ -176,11 +178,11 @@ impl IndexingService {
async fn detach_merge_pipeline(
&mut self,
pipeline_id: &MergePipelineId,
- ) -> Result, IndexingServiceError> {
+ ) -> Result, IndexingError> {
let pipeline_handle = self
.merge_pipeline_handles
.remove(pipeline_id)
- .ok_or_else(|| IndexingServiceError::MissingPipeline {
+ .ok_or_else(|| IndexingError::MissingPipeline {
index_id: pipeline_id.index_uid.index_id().to_string(),
source_id: pipeline_id.source_id.clone(),
})?;
@@ -191,11 +193,11 @@ impl IndexingService {
async fn observe_pipeline(
&mut self,
pipeline_id: &IndexingPipelineId,
- ) -> Result, IndexingServiceError> {
+ ) -> Result, IndexingError> {
let pipeline_handle = self
.indexing_pipeline_handles
.get(pipeline_id)
- .ok_or_else(|| IndexingServiceError::MissingPipeline {
+ .ok_or_else(|| IndexingError::MissingPipeline {
index_id: pipeline_id.index_uid.index_id().to_string(),
source_id: pipeline_id.source_id.clone(),
})?;
@@ -209,7 +211,7 @@ impl IndexingService {
index_id: String,
source_config: SourceConfig,
pipeline_ord: usize,
- ) -> Result {
+ ) -> Result {
let index_metadata = self.index_metadata(ctx, &index_id).await?;
let pipeline_id = IndexingPipelineId {
index_uid: index_metadata.index_uid.clone(),
@@ -229,9 +231,9 @@ impl IndexingService {
pipeline_id: IndexingPipelineId,
index_config: IndexConfig,
source_config: SourceConfig,
- ) -> Result<(), IndexingServiceError> {
+ ) -> Result<(), IndexingError> {
if self.indexing_pipeline_handles.contains_key(&pipeline_id) {
- return Err(IndexingServiceError::PipelineAlreadyExists {
+ return Err(IndexingError::PipelineAlreadyExists {
index_id: pipeline_id.index_uid.index_id().to_string(),
source_id: pipeline_id.source_id,
pipeline_ord: pipeline_id.pipeline_ord,
@@ -243,18 +245,18 @@ impl IndexingService {
.join(&pipeline_id.source_id)
.join(&pipeline_id.pipeline_ord.to_string())
.tempdir_in(&self.indexing_root_directory)
- .map_err(IndexingServiceError::Io)?;
+ .map_err(IndexingError::Io)?;
let storage = self
.storage_resolver
.resolve(&index_config.index_uri)
.await
- .map_err(|err| IndexingServiceError::StorageResolverError(err.to_string()))?;
+ .map_err(|err| IndexingError::StorageResolverError(err.to_string()))?;
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());
let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
- .map_err(IndexingServiceError::InvalidParams)?;
+ .map_err(IndexingError::InvalidParams)?;
let merge_pipeline_params = MergePipelineParams {
pipeline_id: pipeline_id.clone(),
@@ -307,13 +309,13 @@ impl IndexingService {
&self,
ctx: &ActorContext,
index_id: &str,
- ) -> Result {
+ ) -> Result {
let _protect_guard = ctx.protect_zone();
let index_metadata = self
.metastore
.index_metadata(index_id)
.await
- .map_err(|err| IndexingServiceError::MetastoreError(err.to_string()))?;
+ .map_err(|err| IndexingError::MetastoreError(err.to_string()))?;
Ok(index_metadata)
}
@@ -387,7 +389,7 @@ impl IndexingService {
&mut self,
merge_pipeline_params: MergePipelineParams,
ctx: &ActorContext,
- ) -> Result, IndexingServiceError> {
+ ) -> Result, IndexingError> {
let merge_pipeline_id = MergePipelineId::from(&merge_pipeline_params.pipeline_id);
if let Some(merge_pipeline_mailbox_handle) =
self.merge_pipeline_handles.get(&merge_pipeline_id)
@@ -417,7 +419,7 @@ impl IndexingService {
&mut self,
ctx: &ActorContext,
physical_indexing_plan_request: ApplyIndexingPlanRequest,
- ) -> Result {
+ ) -> Result {
let mut updated_pipeline_ids: HashSet = HashSet::new();
let mut pipeline_ordinals: HashMap<&IndexingTask, usize> = HashMap::new();
for indexing_task in physical_indexing_plan_request.indexing_tasks.iter() {
@@ -456,7 +458,7 @@ impl IndexingService {
self.update_cluster_running_indexing_tasks().await;
if !failed_spawning_pipeline_ids.is_empty() {
- return Err(IndexingServiceError::SpawnPipelinesError {
+ return Err(IndexingError::SpawnPipelinesError {
pipeline_ids: failed_spawning_pipeline_ids,
});
}
@@ -469,7 +471,7 @@ impl IndexingService {
&mut self,
ctx: &ActorContext,
added_pipeline_ids: Vec<&IndexingPipelineId>,
- ) -> Result, IndexingServiceError> {
+ ) -> Result, IndexingError> {
// We fetch the new indexes metadata.
let indexes_metadata_futures = added_pipeline_ids
.iter()
@@ -630,7 +632,7 @@ impl IndexingService {
#[async_trait]
impl Handler for IndexingService {
- type Reply = Result, IndexingServiceError>;
+ type Reply = Result, IndexingError>;
async fn handle(
&mut self,
@@ -644,7 +646,7 @@ impl Handler for IndexingService {
#[async_trait]
impl Handler for IndexingService {
- type Reply = Result, IndexingServiceError>;
+ type Reply = Result, IndexingError>;
async fn handle(
&mut self,
@@ -657,7 +659,7 @@ impl Handler for IndexingService {
#[async_trait]
impl Handler for IndexingService {
- type Reply = Result, IndexingServiceError>;
+ type Reply = Result, IndexingError>;
async fn handle(
&mut self,
@@ -703,12 +705,12 @@ impl Actor for IndexingService {
#[async_trait]
impl Handler for IndexingService {
- type Reply = Result;
+ type Reply = Result;
async fn handle(
&mut self,
message: SpawnPipeline,
ctx: &ActorContext,
- ) -> Result, ActorExitStatus> {
+ ) -> Result, ActorExitStatus> {
Ok(self
.spawn_pipeline(
ctx,
@@ -734,7 +736,7 @@ impl Handler for IndexingService {
#[async_trait]
impl Handler for IndexingService {
- type Reply = Result;
+ type Reply = Result;
async fn handle(
&mut self,
diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs
index 86d845983a1..754f00e0a25 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs
@@ -34,7 +34,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_directories::UnionDirectory;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, SplitMetadata};
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::DeleteTask;
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use quickwit_query::query_ast::QueryAst;
diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
index 89800980535..2b25cc2a881 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
@@ -31,7 +31,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState};
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};
@@ -449,7 +449,7 @@ mod tests {
use quickwit_common::temp_dir::TempDirectory;
use quickwit_doc_mapper::default_doc_mapper_for_test;
use quickwit_metastore::MockMetastore;
- use quickwit_proto::indexing_api::IndexingPipelineId;
+ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use quickwit_storage::RamStorage;
diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs
index 1e84693ff9b..2458fea615c 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs
@@ -26,7 +26,7 @@ use itertools::Itertools;
use quickwit_actors::channel_with_priority::TrySendError;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_metastore::SplitMetadata;
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
@@ -308,7 +308,7 @@ mod tests {
};
use quickwit_config::IndexingSettings;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
- use quickwit_proto::indexing_api::IndexingPipelineId;
+ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use tantivy::TrackedObject;
use time::OffsetDateTime;
diff --git a/quickwit/quickwit-indexing/src/actors/mod.rs b/quickwit/quickwit-indexing/src/actors/mod.rs
index b91ad6d7a79..9c94f5c8537 100644
--- a/quickwit/quickwit-indexing/src/actors/mod.rs
+++ b/quickwit/quickwit-indexing/src/actors/mod.rs
@@ -33,7 +33,7 @@ pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineHandles, IndexingP
pub use indexing_service::{
IndexingService, IndexingServiceCounters, MergePipelineId, INDEXING_DIR_NAME,
};
-pub use quickwit_proto::indexing_api::IndexingServiceError;
+pub use quickwit_proto::indexing::IndexingError;
pub use sequencer::Sequencer;
mod merge_executor;
mod merge_planner;
diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs
index 8378d0252d8..95fb5ee3947 100644
--- a/quickwit/quickwit-indexing/src/actors/packager.rs
+++ b/quickwit/quickwit-indexing/src/actors/packager.rs
@@ -336,7 +336,7 @@ mod tests {
use quickwit_actors::{ObservationType, Universe};
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
- use quickwit_proto::indexing_api::IndexingPipelineId;
+ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use tantivy::schema::{NumericOptions, Schema, FAST, STRING, TEXT};
use tantivy::{doc, DateTime, Index};
diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs
index 48b0eaf8736..bde305b28d6 100644
--- a/quickwit/quickwit-indexing/src/actors/uploader.rs
+++ b/quickwit/quickwit-indexing/src/actors/uploader.rs
@@ -465,7 +465,7 @@ mod tests {
use quickwit_common::temp_dir::TempDirectory;
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
use quickwit_metastore::MockMetastore;
- use quickwit_proto::indexing_api::IndexingPipelineId;
+ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_storage::RamStorage;
use tantivy::DateTime;
use tokio::sync::oneshot;
diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs
index 72087729e1e..d4cca6fb97d 100644
--- a/quickwit/quickwit-indexing/src/lib.rs
+++ b/quickwit/quickwit-indexing/src/lib.rs
@@ -30,7 +30,7 @@ use quickwit_storage::StorageResolver;
use tracing::info;
pub use crate::actors::{
- IndexingPipeline, IndexingPipelineParams, IndexingService, IndexingServiceError, PublisherType,
+ IndexingError, IndexingPipeline, IndexingPipelineParams, IndexingService, PublisherType,
Sequencer, SplitsUpdateMailbox,
};
pub use crate::controlled_directory::ControlledDirectory;
diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs
index c35a3487c0f..1772ae6787b 100644
--- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs
+++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs
@@ -169,7 +169,7 @@ pub mod tests {
use proptest::prelude::*;
use quickwit_actors::Universe;
- use quickwit_proto::indexing_api::IndexingPipelineId;
+ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use rand::seq::SliceRandom;
use tantivy::TrackedObject;
diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs
index 0c24dbc4ce2..7fda336b32d 100644
--- a/quickwit/quickwit-indexing/src/models/indexed_split.rs
+++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs
@@ -23,7 +23,7 @@ use std::path::Path;
use quickwit_common::io::IoControls;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use tantivy::directory::MmapDirectory;
use tantivy::{IndexBuilder, TrackedObject};
diff --git a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs
index 92c610334fe..4c2de58aabe 100644
--- a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs
+++ b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs
@@ -18,7 +18,7 @@
// along with this program. If not, see .
use quickwit_config::SourceConfig;
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use crate::actors::MergePipelineId;
diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs
index 0d5bf460cf8..fa614eea9e2 100644
--- a/quickwit/quickwit-indexing/src/models/split_attrs.rs
+++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs
@@ -23,7 +23,7 @@ use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use quickwit_metastore::SplitMetadata;
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use tantivy::DateTime;
use time::OffsetDateTime;
diff --git a/quickwit/quickwit-jaeger/Cargo.toml b/quickwit/quickwit-jaeger/Cargo.toml
index 70120b0632c..b806f6dcfa2 100644
--- a/quickwit/quickwit-jaeger/Cargo.toml
+++ b/quickwit/quickwit-jaeger/Cargo.toml
@@ -40,7 +40,6 @@ time = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true }
-quickwit-grpc-clients = { workspace = true }
quickwit-indexing = { workspace = true, features = ["testsuite"] }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
diff --git a/quickwit/quickwit-janitor/Cargo.toml b/quickwit/quickwit-janitor/Cargo.toml
index f768cfe9ee1..5fa99e536a5 100644
--- a/quickwit/quickwit-janitor/Cargo.toml
+++ b/quickwit/quickwit-janitor/Cargo.toml
@@ -33,7 +33,6 @@ quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
-quickwit-grpc-clients = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
index 59b68e1f31c..13c442e2017 100644
--- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
+++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
@@ -35,7 +35,7 @@ use quickwit_indexing::actors::{
use quickwit_indexing::merge_policy::merge_policy_from_settings;
use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox};
use quickwit_metastore::Metastore;
-use quickwit_proto::indexing_api::IndexingPipelineId;
+use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::IndexUid;
use quickwit_search::SearchJobPlacer;
use quickwit_storage::Storage;
diff --git a/quickwit/quickwit-proto/Cargo.toml b/quickwit/quickwit-proto/Cargo.toml
index e83525bea4b..67cf11882a9 100644
--- a/quickwit/quickwit-proto/Cargo.toml
+++ b/quickwit/quickwit-proto/Cargo.toml
@@ -42,3 +42,6 @@ prost-build = { workspace = true }
tonic-build = { workspace = true }
quickwit-codegen = { workspace = true }
+
+[features]
+testsuite = [ "mockall" ]
diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs
index ce0b4297227..d55c045250a 100644
--- a/quickwit/quickwit-proto/build.rs
+++ b/quickwit/quickwit-proto/build.rs
@@ -23,10 +23,10 @@ use glob::glob;
use quickwit_codegen::Codegen;
fn main() -> Result<(), Box> {
- // Quickwit proto
+ // "Classic" prost + tonic codegen for metastore and search services.
let protos: Vec = find_protos("protos/quickwit")
.into_iter()
- .filter(|path| !path.ends_with("protos/quickwit/indexing.proto"))
+ .filter(|path| !path.ends_with("control_pane.proto") || !path.ends_with("indexing.proto"))
.collect();
let mut prost_config = prost_build::Config::default();
@@ -49,17 +49,30 @@ fn main() -> Result<(), Box> {
.type_attribute("OutputFormat", "#[serde(rename_all = \"snake_case\")]")
.type_attribute("PartialHit.sort_value", "#[derive(Copy)]")
.type_attribute("SortOrder", "#[serde(rename_all = \"lowercase\")]")
- .out_dir("src/quickwit")
+ .out_dir("src/codegen/quickwit")
.compile_with_config(prost_config, &protos, &["protos/quickwit"])?;
+ // Prost + tonic + Quickwit codegen for control plane and indexing services.
+ //
+ // Control plane
+ Codegen::run(
+ &["protos/quickwit/control_plane.proto"],
+ "src/codegen/quickwit",
+ "crate::control_plane::Result",
+ "crate::control_plane::ControlPlaneError",
+ &[],
+ )
+ .unwrap();
+
// Indexing Service
let mut index_api_config = prost_build::Config::default();
index_api_config.type_attribute("IndexingTask", "#[derive(Eq, Hash)]");
+
Codegen::run_with_config(
&["protos/quickwit/indexing.proto"],
- "src/quickwit/",
- "crate::indexing_api::Result",
- "crate::indexing_api::IndexingServiceError",
+ "src/codegen/quickwit",
+ "crate::indexing::Result",
+ "crate::indexing::IndexingError",
&[],
index_api_config,
)
@@ -72,7 +85,7 @@ fn main() -> Result<(), Box> {
prost_config.type_attribute("Operation", "#[derive(Eq, Ord, PartialOrd)]");
tonic_build::configure()
- .out_dir("src/jaeger")
+ .out_dir("src/codegen/jaeger")
.compile_with_config(
prost_config,
&protos,
@@ -87,7 +100,7 @@ fn main() -> Result<(), Box> {
tonic_build::configure()
.type_attribute(".", "#[derive(Serialize, Deserialize)]")
.type_attribute("StatusCode", r#"#[serde(rename_all = "snake_case")]"#)
- .out_dir("src/opentelemetry")
+ .out_dir("src/codegen/opentelemetry")
.compile_with_config(prost_config, &protos, &["protos/third-party"])?;
Ok(())
}
diff --git a/quickwit/quickwit-control-plane/src/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto
similarity index 98%
rename from quickwit/quickwit-control-plane/src/control_plane.proto
rename to quickwit/quickwit-proto/protos/quickwit/control_plane.proto
index 4921c07268b..400a532496b 100644
--- a/quickwit/quickwit-control-plane/src/control_plane.proto
+++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto
@@ -19,7 +19,7 @@
syntax = "proto3";
-package control_plane_service;
+package quickwit.control_plane;
service ControlPlaneService {
/// Notify the Control Plane that a change on an index occurred. The change
diff --git a/quickwit/quickwit-proto/src/jaeger/jaeger.api_v2.rs b/quickwit/quickwit-proto/src/codegen/jaeger/jaeger.api_v2.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/jaeger/jaeger.api_v2.rs
rename to quickwit/quickwit-proto/src/codegen/jaeger/jaeger.api_v2.rs
diff --git a/quickwit/quickwit-proto/src/jaeger/jaeger.storage.v1.rs b/quickwit/quickwit-proto/src/codegen/jaeger/jaeger.storage.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/jaeger/jaeger.storage.v1.rs
rename to quickwit/quickwit-proto/src/codegen/jaeger/jaeger.storage.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.collector.logs.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.collector.logs.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.collector.metrics.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.metrics.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.collector.metrics.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.metrics.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.collector.trace.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.trace.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.collector.trace.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.trace.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.common.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.common.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.common.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.common.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.logs.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.logs.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.logs.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.logs.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.metrics.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.metrics.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.metrics.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.metrics.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.resource.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.resource.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.resource.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.resource.v1.rs
diff --git a/quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.trace.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.trace.v1.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/opentelemetry/opentelemetry.proto.trace.v1.rs
rename to quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.trace.v1.rs
diff --git a/quickwit/quickwit-control-plane/src/codegen/control_plane_service.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs
similarity index 94%
rename from quickwit/quickwit-control-plane/src/codegen/control_plane_service.rs
rename to quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs
index 3824b963d94..3dfa43efb20 100644
--- a/quickwit/quickwit-control-plane/src/codegen/control_plane_service.rs
+++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs
@@ -14,7 +14,7 @@ pub trait ControlPlaneService: std::fmt::Debug + dyn_clone::DynClone + Send + Sy
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
- ) -> crate::Result;
+ ) -> crate::control_plane::Result;
}
dyn_clone::clone_trait_object!(ControlPlaneService);
#[cfg(any(test, feature = "testsuite"))]
@@ -78,7 +78,7 @@ impl ControlPlaneService for ControlPlaneServiceClient {
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
- ) -> crate::Result {
+ ) -> crate::control_plane::Result {
self.inner.notify_index_change(request).await
}
}
@@ -94,7 +94,7 @@ pub mod mock {
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
- ) -> crate::Result {
+ ) -> crate::control_plane::Result {
self.inner.lock().await.notify_index_change(request).await
}
}
@@ -112,7 +112,7 @@ pub type BoxFuture = std::pin::Pin<
>;
impl tower::Service for Box {
type Response = NotifyIndexChangeResponse;
- type Error = crate::ControlPlaneError;
+ type Error = crate::control_plane::ControlPlaneError;
type Future = BoxFuture;
fn poll_ready(
&mut self,
@@ -132,7 +132,7 @@ struct ControlPlaneServiceTowerBlock {
notify_index_change_svc: quickwit_common::tower::BoxService<
NotifyIndexChangeRequest,
NotifyIndexChangeResponse,
- crate::ControlPlaneError,
+ crate::control_plane::ControlPlaneError,
>,
}
impl Clone for ControlPlaneServiceTowerBlock {
@@ -147,7 +147,7 @@ impl ControlPlaneService for ControlPlaneServiceTowerBlock {
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
- ) -> crate::Result {
+ ) -> crate::control_plane::Result {
self.notify_index_change_svc.ready().await?.call(request).await
}
}
@@ -159,7 +159,7 @@ pub struct ControlPlaneServiceTowerBlockBuilder {
Box,
NotifyIndexChangeRequest,
NotifyIndexChangeResponse,
- crate::ControlPlaneError,
+ crate::control_plane::ControlPlaneError,
>,
>,
}
@@ -170,7 +170,7 @@ impl ControlPlaneServiceTowerBlockBuilder {
L::Service: tower::Service<
NotifyIndexChangeRequest,
Response = NotifyIndexChangeResponse,
- Error = crate::ControlPlaneError,
+ Error = crate::control_plane::ControlPlaneError,
> + Clone + Send + Sync + 'static,
>::Future: Send + 'static,
{
@@ -186,7 +186,7 @@ impl ControlPlaneServiceTowerBlockBuilder {
L::Service: tower::Service<
NotifyIndexChangeRequest,
Response = NotifyIndexChangeResponse,
- Error = crate::ControlPlaneError,
+ Error = crate::control_plane::ControlPlaneError,
> + Clone + Send + Sync + 'static,
>::Future: Send + 'static,
{
@@ -262,7 +262,7 @@ where
}
#[derive(Debug)]
pub struct ControlPlaneServiceMailbox {
- inner: MailboxAdapter,
+ inner: MailboxAdapter,
}
impl ControlPlaneServiceMailbox {
pub fn new(instance: quickwit_actors::Mailbox) -> Self {
@@ -290,10 +290,10 @@ where
M: std::fmt::Debug + Send + 'static,
T: Send + 'static,
E: std::fmt::Debug + Send + 'static,
- crate::ControlPlaneError: From>,
+ crate::control_plane::ControlPlaneError: From>,
{
type Response = T;
- type Error = crate::ControlPlaneError;
+ type Error = crate::control_plane::ControlPlaneError;
type Future = BoxFuture;
fn poll_ready(
&mut self,
@@ -321,14 +321,17 @@ where
>: tower::Service<
NotifyIndexChangeRequest,
Response = NotifyIndexChangeResponse,
- Error = crate::ControlPlaneError,
- Future = BoxFuture,
+ Error = crate::control_plane::ControlPlaneError,
+ Future = BoxFuture<
+ NotifyIndexChangeResponse,
+ crate::control_plane::ControlPlaneError,
+ >,
>,
{
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
- ) -> crate::Result {
+ ) -> crate::control_plane::Result {
self.call(request).await
}
}
@@ -357,7 +360,7 @@ where
async fn notify_index_change(
&mut self,
request: NotifyIndexChangeRequest,
- ) -> crate::Result {
+ ) -> crate::control_plane::Result {
self.inner
.notify_index_change(request)
.await
@@ -503,13 +506,13 @@ pub mod control_plane_service_grpc_client {
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
- "/control_plane_service.ControlPlaneService/notifyIndexChange",
+ "/quickwit.control_plane.ControlPlaneService/notifyIndexChange",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
- "control_plane_service.ControlPlaneService",
+ "quickwit.control_plane.ControlPlaneService",
"notifyIndexChange",
),
);
@@ -618,7 +621,7 @@ pub mod control_plane_service_grpc_server {
fn call(&mut self, req: http::Request) -> Self::Future {
let inner = self.inner.clone();
match req.uri().path() {
- "/control_plane_service.ControlPlaneService/notifyIndexChange" => {
+ "/quickwit.control_plane.ControlPlaneService/notifyIndexChange" => {
#[allow(non_camel_case_types)]
struct notifyIndexChangeSvc(pub Arc);
impl<
@@ -703,6 +706,6 @@ pub mod control_plane_service_grpc_server {
}
impl tonic::server::NamedService
for ControlPlaneServiceGrpcServer {
- const NAME: &'static str = "control_plane_service.ControlPlaneService";
+ const NAME: &'static str = "quickwit.control_plane.ControlPlaneService";
}
}
diff --git a/quickwit/quickwit-proto/src/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs
similarity index 95%
rename from quickwit/quickwit-proto/src/quickwit/quickwit.indexing.rs
rename to quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs
index c8359f8b6d5..ddc2e8a94f3 100644
--- a/quickwit/quickwit-proto/src/quickwit/quickwit.indexing.rs
+++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs
@@ -29,7 +29,7 @@ pub trait IndexingService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync +
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
- ) -> crate::indexing_api::Result;
+ ) -> crate::indexing::Result;
}
dyn_clone::clone_trait_object!(IndexingService);
#[cfg(any(test, feature = "testsuite"))]
@@ -91,7 +91,7 @@ impl IndexingService for IndexingServiceClient {
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
- ) -> crate::indexing_api::Result {
+ ) -> crate::indexing::Result {
self.inner.apply_indexing_plan(request).await
}
}
@@ -107,7 +107,7 @@ pub mod mock {
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
- ) -> crate::indexing_api::Result {
+ ) -> crate::indexing::Result {
self.inner.lock().await.apply_indexing_plan(request).await
}
}
@@ -125,7 +125,7 @@ pub type BoxFuture = std::pin::Pin<
>;
impl tower::Service for Box {
type Response = ApplyIndexingPlanResponse;
- type Error = crate::indexing_api::IndexingServiceError;
+ type Error = crate::indexing::IndexingError;
type Future = BoxFuture;
fn poll_ready(
&mut self,
@@ -145,7 +145,7 @@ struct IndexingServiceTowerBlock {
apply_indexing_plan_svc: quickwit_common::tower::BoxService<
ApplyIndexingPlanRequest,
ApplyIndexingPlanResponse,
- crate::indexing_api::IndexingServiceError,
+ crate::indexing::IndexingError,
>,
}
impl Clone for IndexingServiceTowerBlock {
@@ -160,7 +160,7 @@ impl IndexingService for IndexingServiceTowerBlock {
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
- ) -> crate::indexing_api::Result {
+ ) -> crate::indexing::Result {
self.apply_indexing_plan_svc.ready().await?.call(request).await
}
}
@@ -172,7 +172,7 @@ pub struct IndexingServiceTowerBlockBuilder {
Box,
ApplyIndexingPlanRequest,
ApplyIndexingPlanResponse,
- crate::indexing_api::IndexingServiceError,
+ crate::indexing::IndexingError,
>,
>,
}
@@ -183,7 +183,7 @@ impl IndexingServiceTowerBlockBuilder {
L::Service: tower::Service<
ApplyIndexingPlanRequest,
Response = ApplyIndexingPlanResponse,
- Error = crate::indexing_api::IndexingServiceError,
+ Error = crate::indexing::IndexingError,
> + Clone + Send + Sync + 'static,
>::Future: Send + 'static,
{
@@ -199,7 +199,7 @@ impl IndexingServiceTowerBlockBuilder {
L::Service: tower::Service<
ApplyIndexingPlanRequest,
Response = ApplyIndexingPlanResponse,
- Error = crate::indexing_api::IndexingServiceError,
+ Error = crate::indexing::IndexingError,
> + Clone + Send + Sync + 'static,
>::Future: Send + 'static,
{
@@ -275,7 +275,7 @@ where
}
#[derive(Debug)]
pub struct IndexingServiceMailbox {
- inner: MailboxAdapter,
+ inner: MailboxAdapter,
}
impl IndexingServiceMailbox {
pub fn new(instance: quickwit_actors::Mailbox) -> Self {
@@ -303,10 +303,10 @@ where
M: std::fmt::Debug + Send + 'static,
T: Send + 'static,
E: std::fmt::Debug + Send + 'static,
- crate::indexing_api::IndexingServiceError: From>,
+ crate::indexing::IndexingError: From>,
{
type Response = T;
- type Error = crate::indexing_api::IndexingServiceError;
+ type Error = crate::indexing::IndexingError;
type Future = BoxFuture;
fn poll_ready(
&mut self,
@@ -334,17 +334,14 @@ where
>: tower::Service<
ApplyIndexingPlanRequest,
Response = ApplyIndexingPlanResponse,
- Error = crate::indexing_api::IndexingServiceError,
- Future = BoxFuture<
- ApplyIndexingPlanResponse,
- crate::indexing_api::IndexingServiceError,
- >,
+ Error = crate::indexing::IndexingError,
+ Future = BoxFuture,
>,
{
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
- ) -> crate::indexing_api::Result {
+ ) -> crate::indexing::Result {
self.call(request).await
}
}
@@ -373,7 +370,7 @@ where
async fn apply_indexing_plan(
&mut self,
request: ApplyIndexingPlanRequest,
- ) -> crate::indexing_api::Result {
+ ) -> crate::indexing::Result {
self.inner
.apply_indexing_plan(request)
.await
diff --git a/quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/quickwit/quickwit.metastore.rs
rename to quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs
diff --git a/quickwit/quickwit-proto/src/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
similarity index 100%
rename from quickwit/quickwit-proto/src/quickwit/quickwit.search.rs
rename to quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs
new file mode 100644
index 00000000000..602ed62c6b6
--- /dev/null
+++ b/quickwit/quickwit-proto/src/control_plane/mod.rs
@@ -0,0 +1,70 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use quickwit_actors::AskError;
+use thiserror;
+
+#[path = "../codegen/quickwit/quickwit.control_plane.rs"]
+mod codegen;
+
+pub use codegen::*;
+
+pub type Result = std::result::Result;
+
+#[derive(Debug, thiserror::Error)]
+pub enum ControlPlaneError {
+ #[error("An internal error occurred: {0}.")]
+ Internal(String),
+ #[error("Control plane is unavailable: {0}.")]
+ Unavailable(String),
+}
+
+impl From for tonic::Status {
+ fn from(error: ControlPlaneError) -> Self {
+ match error {
+ ControlPlaneError::Internal(message) => tonic::Status::internal(message),
+ ControlPlaneError::Unavailable(message) => tonic::Status::unavailable(message),
+ }
+ }
+}
+
+impl From for ControlPlaneError {
+ fn from(status: tonic::Status) -> Self {
+ match status.code() {
+ tonic::Code::Unavailable => {
+ ControlPlaneError::Unavailable(status.message().to_string())
+ }
+ _ => ControlPlaneError::Internal(status.message().to_string()),
+ }
+ }
+}
+
+impl From> for ControlPlaneError {
+ fn from(error: AskError) -> Self {
+ match error {
+ AskError::ErrorReply(error) => error,
+ AskError::MessageNotDelivered => {
+ ControlPlaneError::Unavailable("Request not delivered".to_string())
+ }
+ AskError::ProcessMessageError => ControlPlaneError::Internal(
+ "An error occurred while processing the request".to_string(),
+ ),
+ }
+ }
+}
diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs
new file mode 100644
index 00000000000..451d784f98a
--- /dev/null
+++ b/quickwit/quickwit-proto/src/indexing/mod.rs
@@ -0,0 +1,228 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::io;
+
+use anyhow::anyhow;
+use quickwit_actors::AskError;
+use thiserror;
+
+use crate::{IndexUid, ServiceError, ServiceErrorCode};
+
+#[path = "../codegen/quickwit/quickwit.indexing.rs"]
+mod codegen;
+
+pub use codegen::*;
+
+pub type Result = std::result::Result;
+
+#[derive(Debug, thiserror::Error)]
+pub enum IndexingError {
+ #[error("Indexing pipeline `{index_id}` for source `{source_id}` does not exist.")]
+ MissingPipeline { index_id: String, source_id: String },
+ #[error(
+ "Pipeline #{pipeline_ord} for index `{index_id}` and source `{source_id}` already exists."
+ )]
+ PipelineAlreadyExists {
+ index_id: String,
+ source_id: String,
+ pipeline_ord: usize,
+ },
+ #[error("I/O Error `{0}`.")]
+ Io(io::Error),
+ #[error("Invalid params `{0}`.")]
+ InvalidParams(anyhow::Error),
+ #[error("Spanw pipelines errors `{pipeline_ids:?}`.")]
+ SpawnPipelinesError {
+ pipeline_ids: Vec,
+ },
+ #[error("A metastore error occurred: {0}.")]
+ MetastoreError(String),
+ #[error("A storage resolver error occurred: {0}.")]
+ StorageResolverError(String),
+ #[error("An internal error occurred: {0}.")]
+ Internal(String),
+ #[error("The ingest service is unavailable.")]
+ Unavailable,
+}
+
+impl From for tonic::Status {
+ fn from(error: IndexingError) -> Self {
+ match error {
+ IndexingError::MissingPipeline {
+ index_id,
+ source_id,
+ } => tonic::Status::not_found(format!("Missing pipeline {index_id}/{source_id}")),
+ IndexingError::PipelineAlreadyExists {
+ index_id,
+ source_id,
+ pipeline_ord,
+ } => tonic::Status::already_exists(format!(
+ "Pipeline {index_id}/{source_id} {pipeline_ord} already exists "
+ )),
+ IndexingError::Io(error) => tonic::Status::internal(error.to_string()),
+ IndexingError::InvalidParams(error) => {
+ tonic::Status::invalid_argument(error.to_string())
+ }
+ IndexingError::SpawnPipelinesError { pipeline_ids } => {
+ tonic::Status::internal(format!("Error spawning pipelines {:?}", pipeline_ids))
+ }
+ IndexingError::Internal(string) => tonic::Status::internal(string),
+ IndexingError::MetastoreError(string) => tonic::Status::internal(string),
+ IndexingError::StorageResolverError(string) => tonic::Status::internal(string),
+ IndexingError::Unavailable => {
+ tonic::Status::unavailable("Indexing service is unavailable.")
+ }
+ }
+ }
+}
+
+impl From for IndexingError {
+ fn from(status: tonic::Status) -> Self {
+ match status.code() {
+ tonic::Code::InvalidArgument => {
+ IndexingError::InvalidParams(anyhow!(status.message().to_string()))
+ }
+ tonic::Code::NotFound => IndexingError::MissingPipeline {
+ index_id: "".to_string(),
+ source_id: "".to_string(),
+ },
+ tonic::Code::AlreadyExists => IndexingError::PipelineAlreadyExists {
+ index_id: "".to_string(),
+ source_id: "".to_string(),
+ pipeline_ord: 0,
+ },
+ tonic::Code::Unavailable => IndexingError::Unavailable,
+ _ => IndexingError::InvalidParams(anyhow!(status.message().to_string())),
+ }
+ }
+}
+
+impl ServiceError for IndexingError {
+ fn status_code(&self) -> ServiceErrorCode {
+ match self {
+ Self::MissingPipeline { .. } => ServiceErrorCode::NotFound,
+ Self::PipelineAlreadyExists { .. } => ServiceErrorCode::BadRequest,
+ Self::InvalidParams(_) => ServiceErrorCode::BadRequest,
+ Self::SpawnPipelinesError { .. } => ServiceErrorCode::Internal,
+ Self::Io(_) => ServiceErrorCode::Internal,
+ Self::Internal(_) => ServiceErrorCode::Internal,
+ Self::MetastoreError(_) => ServiceErrorCode::Internal,
+ Self::StorageResolverError(_) => ServiceErrorCode::Internal,
+ Self::Unavailable => ServiceErrorCode::Unavailable,
+ }
+ }
+}
+
+impl From> for IndexingError {
+ fn from(error: AskError) -> Self {
+ match error {
+ AskError::ErrorReply(error) => error,
+ AskError::MessageNotDelivered => IndexingError::Unavailable,
+ AskError::ProcessMessageError => IndexingError::Internal(
+ "An error occurred while processing the request".to_string(),
+ ),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub struct IndexingPipelineId {
+ pub index_uid: IndexUid,
+ pub source_id: String,
+ pub node_id: String,
+ pub pipeline_ord: usize,
+}
+
+impl ToString for IndexingTask {
+ fn to_string(&self) -> String {
+ format!("{}:{}", self.index_uid, self.source_id)
+ }
+}
+
+impl TryFrom<&str> for IndexingTask {
+ type Error = anyhow::Error;
+
+ fn try_from(index_task_str: &str) -> anyhow::Result {
+ let mut iter = index_task_str.rsplit(':');
+ let source_id = iter.next().ok_or_else(|| {
+ anyhow!(
+ "Invalid index task format, cannot find source_id in `{}`",
+ index_task_str
+ )
+ })?;
+ let part1 = iter.next().ok_or_else(|| {
+ anyhow!(
+ "Invalid index task format, cannot find index_id in `{}`",
+ index_task_str
+ )
+ })?;
+ if let Some(part2) = iter.next() {
+ Ok(IndexingTask {
+ index_uid: format!("{part2}:{part1}"),
+ source_id: source_id.to_string(),
+ })
+ } else {
+ Ok(IndexingTask {
+ index_uid: part1.to_string(),
+ source_id: source_id.to_string(),
+ })
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_indexing_task_serialization() {
+ let original = IndexingTask {
+ index_uid: "test-index:123456".to_string(),
+ source_id: "test-source".to_string(),
+ };
+
+ let serialized = original.to_string();
+ let deserialized: IndexingTask = serialized.as_str().try_into().unwrap();
+ assert_eq!(original, deserialized);
+ }
+
+ #[test]
+ fn test_indexing_task_serialization_bwc() {
+ assert_eq!(
+ IndexingTask::try_from("foo:bar").unwrap(),
+ IndexingTask {
+ index_uid: "foo".to_string(),
+ source_id: "bar".to_string(),
+ }
+ );
+ }
+
+ #[test]
+ fn test_indexing_task_serialization_errors() {
+ assert_eq!(
+ "Invalid index task format, cannot find index_id in ``",
+ IndexingTask::try_from("").unwrap_err().to_string()
+ );
+ assert_eq!(
+ "Invalid index task format, cannot find index_id in `foo`",
+ IndexingTask::try_from("foo").unwrap_err().to_string()
+ );
+ }
+}
diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs
index 9ddbf7854c7..6ecded6122a 100644
--- a/quickwit/quickwit-proto/src/lib.rs
+++ b/quickwit/quickwit-proto/src/lib.rs
@@ -21,144 +21,38 @@
#![deny(clippy::disallowed_methods)]
#![allow(rustdoc::invalid_html_tags)]
-use anyhow::anyhow;
-use ulid::Ulid;
-pub use sort_by_value::SortValue;
use std::cmp::Ordering;
use std::convert::Infallible;
use std::fmt;
+
use ::opentelemetry::global;
-use ::opentelemetry::propagation::Extractor;
-use ::opentelemetry::propagation::Injector;
+use ::opentelemetry::propagation::{Extractor, Injector};
use tonic::codegen::http;
use tonic::service::Interceptor;
use tonic::Status;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
+use ulid::Ulid;
-pub use tonic;
-
-#[path = "quickwit/quickwit.indexing.rs"]
+pub mod control_plane;
pub mod indexing;
-#[path = "quickwit/quickwit.metastore.rs"]
+#[path = "codegen/quickwit/quickwit.metastore.rs"]
pub mod metastore;
-#[path = "quickwit/quickwit.search.rs"]
+#[path = "codegen/quickwit/quickwit.search.rs"]
pub mod search;
-pub use indexing::*;
pub use metastore::*;
pub use search::*;
-pub mod indexing_api {
- use crate::IndexUid;
- use crate::{ServiceError, ServiceErrorCode};
- use std::io;
- use anyhow::anyhow;
- use thiserror::Error;
- use quickwit_actors::AskError;
-
- #[derive(Clone, Debug, Hash, Eq, PartialEq)]
- pub struct IndexingPipelineId {
- pub index_uid: IndexUid,
- pub source_id: String,
- pub node_id: String,
- pub pipeline_ord: usize,
- }
- pub type Result = std::result::Result;
-
- #[derive(Error, Debug)]
- pub enum IndexingServiceError {
- #[error("Indexing pipeline `{index_id}` for source `{source_id}` does not exist.")]
- MissingPipeline { index_id: String, source_id: String },
- #[error(
- "Pipeline #{pipeline_ord} for index `{index_id}` and source `{source_id}` already exists."
- )]
- PipelineAlreadyExists {
- index_id: String,
- source_id: String,
- pipeline_ord: usize,
- },
- #[error("I/O Error `{0}`.")]
- Io(io::Error),
- #[error("Invalid params `{0}`.")]
- InvalidParams(anyhow::Error),
- #[error("Spanw pipelines errors `{pipeline_ids:?}`.")]
- SpawnPipelinesError {
- pipeline_ids: Vec,
- },
- #[error("A metastore error occurred: {0}.")]
- MetastoreError(String),
- #[error("A storage resolver error occurred: {0}.")]
- StorageResolverError(String),
- #[error("An internal error occurred: {0}.")]
- Internal(String),
- #[error("The ingest service is unavailable.")]
- Unavailable,
- }
-
- impl From for tonic::Status {
- fn from(error: IndexingServiceError) -> Self {
- match error {
- IndexingServiceError::MissingPipeline { index_id, source_id } => tonic::Status::not_found(format!("Missing pipeline {index_id}/{source_id}")),
- IndexingServiceError::PipelineAlreadyExists { index_id, source_id, pipeline_ord } => tonic::Status::already_exists(format!("Pipeline {index_id}/{source_id} {pipeline_ord} already exists ")),
- IndexingServiceError::Io(error) => tonic::Status::internal(error.to_string()),
- IndexingServiceError::InvalidParams(error) => tonic::Status::invalid_argument(error.to_string()),
- IndexingServiceError::SpawnPipelinesError { pipeline_ids } => tonic::Status::internal(format!("Error spawning pipelines {:?}", pipeline_ids)),
- IndexingServiceError::Internal(string) => tonic::Status::internal(string),
- IndexingServiceError::MetastoreError(string) => tonic::Status::internal(string),
- IndexingServiceError::StorageResolverError(string) => tonic::Status::internal(string),
- IndexingServiceError::Unavailable => tonic::Status::unavailable("Indexing service is unavailable."),
- }
- }
- }
-
- impl From for IndexingServiceError {
- fn from(status: tonic::Status) -> Self {
- match status.code() {
- tonic::Code::InvalidArgument => IndexingServiceError::InvalidParams(anyhow!(status.message().to_string())),
- tonic::Code::NotFound => IndexingServiceError::MissingPipeline { index_id: "".to_string(), source_id: "".to_string() },
- tonic::Code::AlreadyExists => IndexingServiceError::PipelineAlreadyExists { index_id: "".to_string(), source_id: "".to_string(), pipeline_ord: 0 },
- tonic::Code::Unavailable => IndexingServiceError::Unavailable,
- _ => IndexingServiceError::InvalidParams(anyhow!(status.message().to_string())),
- }
- }
- }
-
- impl ServiceError for IndexingServiceError {
- fn status_code(&self) -> ServiceErrorCode {
- match self {
- Self::MissingPipeline { .. } => ServiceErrorCode::NotFound,
- Self::PipelineAlreadyExists { .. } => ServiceErrorCode::BadRequest,
- Self::InvalidParams(_) => ServiceErrorCode::BadRequest,
- Self::SpawnPipelinesError { .. } => ServiceErrorCode::Internal,
- Self::Io(_) => ServiceErrorCode::Internal,
- Self::Internal(_) => ServiceErrorCode::Internal,
- Self::MetastoreError(_) => ServiceErrorCode::Internal,
- Self::StorageResolverError(_) => ServiceErrorCode::Internal,
- Self::Unavailable => ServiceErrorCode::Unavailable,
- }
- }
- }
-
- impl From> for IndexingServiceError {
- fn from(error: AskError) -> Self {
- match error {
- AskError::ErrorReply(error) => error,
- AskError::MessageNotDelivered => IndexingServiceError::Unavailable,
- AskError::ProcessMessageError => IndexingServiceError::Internal(
- "An error occurred while processing the request".to_string(),
- ),
- }
- }
- }
-}
+pub use sort_by_value::SortValue;
+pub use tonic;
pub mod jaeger {
pub mod api_v2 {
- include!("jaeger/jaeger.api_v2.rs");
+ include!("codegen/jaeger/jaeger.api_v2.rs");
}
pub mod storage {
pub mod v1 {
- include!("jaeger/jaeger.storage.v1.rs");
+ include!("codegen/jaeger/jaeger.storage.v1.rs");
}
}
}
@@ -170,46 +64,46 @@ pub mod opentelemetry {
pub mod collector {
pub mod logs {
pub mod v1 {
- include!("opentelemetry/opentelemetry.proto.collector.logs.v1.rs");
+ include!("codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs");
}
}
// pub mod metrics {
// pub mod v1 {
- // include!("opentelemetry/opentelemetry.proto.collector.metrics.v1.rs");
- // }
+ // include!("codegen/opentelemetry/opentelemetry.proto.collector.metrics.v1.rs"
+ // ); }
// }
pub mod trace {
pub mod v1 {
- include!("opentelemetry/opentelemetry.proto.collector.trace.v1.rs");
+ include!("codegen/opentelemetry/opentelemetry.proto.collector.trace.v1.rs");
}
}
}
pub mod common {
pub mod v1 {
- include!("opentelemetry/opentelemetry.proto.common.v1.rs");
+ include!("codegen/opentelemetry/opentelemetry.proto.common.v1.rs");
}
}
pub mod logs {
pub mod v1 {
- include!("opentelemetry/opentelemetry.proto.logs.v1.rs");
+ include!("codegen/opentelemetry/opentelemetry.proto.logs.v1.rs");
}
}
// pub mod metrics {
// pub mod experimental {
- // include!("opentelemetry/opentelemetry.proto.metrics.experimental.rs");
+ // include!("codegen/opentelemetry/opentelemetry.proto.metrics.experimental.rs");
// }
// pub mod v1 {
- // tonic::include_proto!("opentelemetry/opentelemetry.proto.metrics.v1");
+ // tonic::include_proto!("codegen/opentelemetry/opentelemetry.proto.metrics.v1");
// }
// }
pub mod resource {
pub mod v1 {
- include!("opentelemetry/opentelemetry.proto.resource.v1.rs");
+ include!("codegen/opentelemetry/opentelemetry.proto.resource.v1.rs");
}
}
pub mod trace {
pub mod v1 {
- include!("opentelemetry/opentelemetry.proto.trace.v1.rs");
+ include!("codegen/opentelemetry/opentelemetry.proto.trace.v1.rs");
}
}
}
@@ -231,7 +125,8 @@ pub enum ServiceErrorCode {
RateLimited,
Unavailable,
UnsupportedMediaType,
- NotSupportedYet, //< Used for API that is available in elasticsearch but is not yet available in Quickwit.
+ NotSupportedYet, /* Used for API that is available in elasticsearch but is not yet
+ * available in Quickwit. */
}
impl ServiceErrorCode {
@@ -375,7 +270,8 @@ impl<'a> Extractor for MutMetadataMap<'a> {
}
}
-/// [`tonic::service::interceptor::Interceptor`] which injects the span context into [`tonic::metadata::MetadataMap`].
+/// [`tonic::service::interceptor::Interceptor`] which injects the span context into
+/// [`tonic::metadata::MetadataMap`].
#[derive(Clone, Debug)]
pub struct SpanContextInterceptor;
@@ -478,48 +374,11 @@ impl From for IndexUid {
}
}
-impl ToString for IndexingTask {
- fn to_string(&self) -> String {
- format!("{}:{}", self.index_uid, self.source_id)
- }
-}
-
-impl TryFrom<&str> for IndexingTask {
- type Error = anyhow::Error;
-
- fn try_from(index_task_str: &str) -> anyhow::Result {
- let mut iter = index_task_str.rsplit(':');
- let source_id = iter.next().ok_or_else(|| {
- anyhow!(
- "Invalid index task format, cannot find source_id in `{}`",
- index_task_str
- )
- })?;
- let part1 = iter.next().ok_or_else(|| {
- anyhow!(
- "Invalid index task format, cannot find index_id in `{}`",
- index_task_str
- )
- })?;
- if let Some(part2) = iter.next() {
- Ok(IndexingTask {
- index_uid: format!("{part2}:{part1}"),
- source_id: source_id.to_string(),
- })
- } else {
- Ok(IndexingTask {
- index_uid: part1.to_string(),
- source_id: source_id.to_string(),
- })
- }
- }
-}
-
// !!! Disclaimer !!!
//
// Prost imposes the PartialEq derived implementation.
-// This is terrible because this means Eq, PartialEq are not really in line with Ord's implementation.
-// if in presence of NaN.
+// This is terrible because this means Eq, PartialEq are not really in line with Ord's
+// implementation. if in presence of NaN.
impl Eq for SortByValue {}
impl Copy for SortByValue {}
impl From for SortByValue {
@@ -595,41 +454,6 @@ impl ServiceError for quickwit_actors::AskError
mod tests {
use super::*;
- #[test]
- fn test_indexing_task_serialization() {
- let original = IndexingTask {
- index_uid: "test-index:123456".to_string(),
- source_id: "test-source".to_string(),
- };
-
- let serialized = original.to_string();
- let deserialized: IndexingTask = serialized.as_str().try_into().unwrap();
- assert_eq!(original, deserialized);
- }
-
- #[test]
- fn test_indexing_task_serialization_bwc() {
- assert_eq!(
- IndexingTask::try_from("foo:bar").unwrap(),
- IndexingTask {
- index_uid: "foo".to_string(),
- source_id: "bar".to_string(),
- }
- );
- }
-
- #[test]
- fn test_indexing_task_serialization_errors() {
- assert_eq!(
- "Invalid index task format, cannot find index_id in ``",
- IndexingTask::try_from("").unwrap_err().to_string()
- );
- assert_eq!(
- "Invalid index task format, cannot find index_id in `foo`",
- IndexingTask::try_from("foo").unwrap_err().to_string()
- );
- }
-
#[test]
fn test_index_uid_parsing() {
assert_eq!("foo", IndexUid::from("foo".to_string()).index_id());
@@ -660,5 +484,4 @@ mod tests {
let index_uid_from_parts = IndexUid::from_parts(index_id, incarnation_id);
index_uid_from_parts.to_string()
}
-
}
diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml
index 585eb3f6779..cd46e8e946c 100644
--- a/quickwit/quickwit-serve/Cargo.toml
+++ b/quickwit/quickwit-serve/Cargo.toml
@@ -49,7 +49,6 @@ quickwit-control-plane = { workspace = true }
quickwit-core = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
-quickwit-grpc-clients = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-jaeger = { workspace = true }
diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs
index 51c9a3e2cdf..8da4d2bdea4 100644
--- a/quickwit/quickwit-serve/src/grpc.rs
+++ b/quickwit/quickwit-serve/src/grpc.rs
@@ -23,22 +23,22 @@ use std::sync::Arc;
use quickwit_common::tower::BoxFutureInfaillible;
use quickwit_config::service::QuickwitService;
-use quickwit_control_plane::control_plane_service_grpc_server::ControlPlaneServiceGrpcServer;
-use quickwit_control_plane::ControlPlaneServiceGrpcServerAdapter;
use quickwit_ingest::ingest_service_grpc_server::IngestServiceGrpcServer;
use quickwit_ingest::IngestServiceGrpcServerAdapter;
use quickwit_jaeger::JaegerService;
use quickwit_metastore::GrpcMetastoreAdapter;
use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService};
-use quickwit_proto::indexing_service_grpc_server::IndexingServiceGrpcServer;
+use quickwit_proto::control_plane::control_plane_service_grpc_server::ControlPlaneServiceGrpcServer;
+use quickwit_proto::control_plane::ControlPlaneServiceGrpcServerAdapter;
+use quickwit_proto::indexing::indexing_service_grpc_server::IndexingServiceGrpcServer;
+use quickwit_proto::indexing::{IndexingServiceClient, IndexingServiceGrpcServerAdapter};
use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPluginServer;
use quickwit_proto::metastore::metastore_service_server::MetastoreServiceServer;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsServiceServer;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer;
use quickwit_proto::search_service_server::SearchServiceServer;
use quickwit_proto::tonic::codegen::CompressionEncoding;
-use quickwit_proto::{tonic, IndexingServiceClient, IndexingServiceGrpcServerAdapter};
-use tonic::transport::Server;
+use quickwit_proto::tonic::transport::Server;
use tracing::*;
use crate::search_api::GrpcSearchAdapter;
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index 072d7994f5b..4dc43f737e2 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -65,7 +65,7 @@ use quickwit_config::{NodeConfig, SearcherConfig};
use quickwit_control_plane::control_plane::ControlPlane;
use quickwit_control_plane::scheduler::IndexingScheduler;
use quickwit_control_plane::{
- start_indexing_scheduler, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool,
+ start_indexing_scheduler, ControlPlaneEventSubscriber, IndexerNodeInfo, IndexerPool,
};
use quickwit_core::{IndexService, IndexServiceError};
use quickwit_indexing::actors::IndexingService;
@@ -79,7 +79,8 @@ use quickwit_metastore::{
MetastoreResolver, RetryingMetastore,
};
use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService};
-use quickwit_proto::IndexingServiceClient;
+use quickwit_proto::control_plane::ControlPlaneServiceClient;
+use quickwit_proto::indexing::IndexingServiceClient;
use quickwit_search::{
create_search_client_from_channel, start_searcher_service, SearchJobPlacer, SearchService,
SearchServiceClient, SearcherPool,
@@ -298,8 +299,8 @@ pub async fn serve_quickwit(
None
};
let control_plane_subscription_handle =
- control_plane_service.as_ref().map(|scheduler_service| {
- event_broker.subscribe::(scheduler_service.clone())
+ control_plane_service.clone().map(|scheduler_service| {
+ event_broker.subscribe::(ControlPlaneEventSubscriber(scheduler_service))
});
let searcher_config = config.searcher_config.clone();
@@ -721,7 +722,7 @@ mod tests {
use quickwit_cluster::{create_cluster_for_test, ClusterNode};
use quickwit_common::uri::Uri;
use quickwit_metastore::{metastore_for_test, IndexMetadata, MockMetastore};
- use quickwit_proto::IndexingTask;
+ use quickwit_proto::indexing::IndexingTask;
use quickwit_search::Job;
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
diff --git a/quickwit/rustfmt.toml b/quickwit/rustfmt.toml
index d313705507b..d6bb5a7c020 100644
--- a/quickwit/rustfmt.toml
+++ b/quickwit/rustfmt.toml
@@ -1,6 +1,5 @@
ignore = [
- "quickwit-proto/src/**/*.rs",
- "**/codegen/*.rs",
+ "**/codegen/**/*.rs",
]
comment_width = 120