diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs
index a5b57bc79e1..a4aa0bdab19 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs
@@ -66,14 +66,11 @@ impl DocBatchBuilderV2 {
}
}
+ /// Returns the capacity of the underlying buffer, expressed in bytes.
pub fn capacity(&self) -> usize {
self.doc_buffer.capacity()
}
- pub fn is_empty(&self) -> bool {
- self.doc_lengths.is_empty()
- }
-
fn num_bytes(&self) -> usize {
self.doc_buffer.len()
}
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs
index 8204cfd0858..bdc753958a1 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs
@@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
@@ -27,16 +26,19 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use quickwit_proto::control_plane::{
ControlPlaneService, ControlPlaneServiceClient, GetOpenShardsRequest, GetOpenShardsSubrequest,
+ GetOpenShardsSubresponse,
};
use quickwit_proto::ingest::ingester::{IngesterService, PersistRequest, PersistSubrequest};
-use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService};
+use quickwit_proto::ingest::router::{
+ IngestRequestV2, IngestResponseV2, IngestRouterService, IngestSubrequest,
+};
use quickwit_proto::ingest::IngestV2Result;
use quickwit_proto::types::NodeId;
use quickwit_proto::IndexUid;
use tokio::sync::RwLock;
use super::shard_table::ShardTable;
-use super::{DocBatchBuilderV2, IngesterPool};
+use super::IngesterPool;
type LeaderId = String;
@@ -54,7 +56,7 @@ struct RouterState {
}
impl fmt::Debug for IngestRouter {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("IngestRouter")
.field("self_node_id", &self.self_node_id)
.field("replication_factor", &self.replication_factor)
@@ -81,18 +83,19 @@ impl IngestRouter {
}
}
- async fn refresh_shard_table(
- &mut self,
- ingest_request: &IngestRequestV2,
- ) -> IngestV2Result<()> {
+ /// Identifies the (index, source), for which we (the router) do not know about
+ /// any open shard, and returns a GetOpen requests to the control plane.
+ async fn create_get_open_shards_subrequests_for_missing_shards(
+ &self,
+ ingest_sub_requests: &[IngestSubrequest],
+ ) -> Vec {
let state_guard = self.state.read().await;
-
let shard_table = &state_guard.shard_table;
let mut get_open_shards_subrequests = Vec::new();
- for ingest_subrequest in &ingest_request.subrequests {
+ for ingest_subrequest in ingest_sub_requests {
if !shard_table
- .contains_entry(&*ingest_subrequest.index_id, &ingest_subrequest.source_id)
+ .contains_entry(&ingest_subrequest.index_id, &ingest_subrequest.source_id)
{
let subrequest = GetOpenShardsSubrequest {
index_id: ingest_subrequest.index_id.clone(),
@@ -101,20 +104,12 @@ impl IngestRouter {
get_open_shards_subrequests.push(subrequest);
}
}
- if get_open_shards_subrequests.is_empty() {
- return Ok(());
- }
- drop(state_guard);
-
- let request = GetOpenShardsRequest {
- subrequests: get_open_shards_subrequests,
- unavailable_ingesters: Vec::new(),
- };
- let response = self.control_plane.get_open_shards(request).await?;
+ get_open_shards_subrequests
+ }
+ async fn update_shard_table(&mut self, subresponses: Vec) {
let mut state_guard = self.state.write().await;
-
- for subresponse in response.subresponses {
+ for subresponse in subresponses {
let index_uid: IndexUid = subresponse.index_uid.into();
let index_id = index_uid.index_id().to_string();
state_guard.shard_table.update_entry(
@@ -123,6 +118,28 @@ impl IngestRouter {
subresponse.open_shards,
);
}
+ }
+
+ async fn refresh_shard_table(
+ &mut self,
+ ingest_request: &IngestRequestV2,
+ ) -> IngestV2Result<()> {
+ let get_open_shards_subrequests = self
+ .create_get_open_shards_subrequests_for_missing_shards(&ingest_request.subrequests)
+ .await;
+
+ if get_open_shards_subrequests.is_empty() {
+ return Ok(());
+ }
+
+ let request = GetOpenShardsRequest {
+ subrequests: get_open_shards_subrequests,
+ unavailable_ingesters: Vec::new(),
+ };
+
+ let response = self.control_plane.get_open_shards(request).await?;
+ self.update_shard_table(response.subresponses).await;
+
Ok(())
}
}
@@ -133,9 +150,19 @@ impl IngestRouterService for IngestRouter {
&mut self,
ingest_request: IngestRequestV2,
) -> IngestV2Result {
+ // First, we ensure we have shards for all of the requested `(Index, Source)`.
+ // If we have never heard of a given `(Index, Source)` a request will
+ // be made to the control plane.
+ //
+ // The control plane will either tell us about an existing open shard or
+ // open one for us.
self.refresh_shard_table(&ingest_request).await?;
- let mut doc_batch_builders: Vec = Vec::new();
+ // Our ingest request may target several source, and might issue requests
+ // to more than one ingesters.
+ //
+ // We want however to emit one RPC per ingester. The code here is precisely
+ // dispatching requests accordingly.
let mut persist_subrequests: HashMap<&LeaderId, Vec> = HashMap::new();
let state_guard = self.state.read().await;
@@ -144,53 +171,27 @@ impl IngestRouterService for IngestRouter {
// lines, validate, transform and then pack the docs into compressed batches routed
// to the right shards.
for ingest_subrequest in ingest_request.subrequests {
- let table_entry = state_guard
+ let shard = state_guard
.shard_table
- .find_entry(&*ingest_subrequest.index_id, &ingest_subrequest.source_id)
+ .select_shard_with_round_robbin(
+ &*ingest_subrequest.index_id,
+ &ingest_subrequest.source_id,
+ )
.expect("TODO");
- if table_entry.len() == 1 {
- let shard = &table_entry.shards()[0];
- let persist_subrequest = PersistSubrequest {
- index_uid: shard.index_uid.clone(),
- source_id: ingest_subrequest.source_id,
- shard_id: shard.shard_id,
- follower_id: shard.follower_id.clone(),
- doc_batch: ingest_subrequest.doc_batch,
- };
- persist_subrequests
- .entry(&shard.leader_id)
- .or_default()
- .push(persist_subrequest);
- continue;
- }
- doc_batch_builders.resize_with(table_entry.len(), DocBatchBuilderV2::default);
-
- for (i, doc) in ingest_subrequest.docs().enumerate() {
- let shard_idx = i % table_entry.len();
- doc_batch_builders[shard_idx].add_doc(doc.borrow());
- }
- for (shard, doc_batch_builder) in table_entry
- .shards()
- .iter()
- .zip(doc_batch_builders.drain(..))
- {
- if !doc_batch_builder.is_empty() {
- let doc_batch = doc_batch_builder.build();
- let persist_subrequest = PersistSubrequest {
- index_uid: shard.index_uid.clone(),
- source_id: ingest_subrequest.source_id.clone(),
- shard_id: shard.shard_id,
- follower_id: shard.follower_id.clone(),
- doc_batch: Some(doc_batch),
- };
- persist_subrequests
- .entry(&shard.leader_id)
- .or_default()
- .push(persist_subrequest);
- }
- }
+ let persist_subrequest = PersistSubrequest {
+ index_uid: shard.index_uid.clone(),
+ source_id: ingest_subrequest.source_id,
+ shard_id: shard.shard_id,
+ follower_id: shard.follower_id.clone(),
+ doc_batch: ingest_subrequest.doc_batch,
+ };
+ persist_subrequests
+ .entry(&shard.leader_id)
+ .or_default()
+ .push(persist_subrequest);
}
+
let mut persist_futures = FuturesUnordered::new();
for (leader_id, subrequests) in persist_subrequests {
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs
index aeb90c7d77d..32d1b344cae 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs
@@ -18,6 +18,7 @@
// along with this program. If not, see .
use std::collections::HashMap;
+use std::sync::atomic::{AtomicUsize, Ordering};
use quickwit_proto::ingest::Shard;
use quickwit_proto::types::SourceId;
@@ -27,26 +28,28 @@ use quickwit_proto::IndexId;
#[derive(Debug, Default)]
pub(crate) struct ShardTableEntry {
shards: Vec,
+ round_robbin_state: AtomicUsize,
}
impl ShardTableEntry {
/// Creates a new entry and ensures that the shards are open and unique.
+ ///
+ /// A shard table entry may not be empty.
pub fn new(mut shards: Vec) -> Self {
+ assert!(!shards.is_empty());
shards.retain(|shard| shard.is_open());
shards.sort_unstable_by_key(|shard| shard.shard_id);
shards.dedup_by_key(|shard| shard.shard_id);
-
- Self { shards }
- }
-
- /// Returns the number of shards that make up the entry.
- pub fn len(&self) -> usize {
- self.shards.len()
+ Self {
+ shards,
+ round_robbin_state: AtomicUsize::default(),
+ }
}
- /// Returns the shards that make up the entry.
- pub fn shards(&self) -> &[Shard] {
- &self.shards
+ /// Returns a shard to send documents to using round robbin.
+ pub fn next_round_robbin(&self) -> &Shard {
+ let idx = self.round_robbin_state.fetch_add(1, Ordering::Relaxed);
+ &self.shards[idx % self.shards.len()]
}
}
@@ -66,13 +69,14 @@ impl ShardTable {
self.table.contains_key(&key)
}
- pub fn find_entry(
+ pub fn select_shard_with_round_robbin(
&self,
index_id: impl Into,
source_id: impl Into,
- ) -> Option<&ShardTableEntry> {
+ ) -> Option<&Shard> {
let key = (index_id.into(), source_id.into());
- self.table.get(&key)
+ let shard_table_entry = self.table.get(&key)?;
+ Some(shard_table_entry.next_round_robbin())
}
pub fn update_entry(