Skip to content

Commit

Permalink
Minor changes.
Browse files Browse the repository at this point in the history
Added comments, broke into methods.
Most important non trivial changes.

I simplified the router by changing the level of
round robbin routing to shards (when several are available.)

Instead of roundrobbin at the document scale,
this round robbins over shards at each ingest requests.

This simplifies the code in charge of dispatching
ingest subrequests.
  • Loading branch information
fulmicoton committed Aug 11, 2023
1 parent bcbf4c7 commit 7045eb6
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 84 deletions.
5 changes: 1 addition & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,11 @@ impl DocBatchBuilderV2 {
}
}

/// Returns the capacity of the underlying buffer, expressed in bytes.
pub fn capacity(&self) -> usize {
self.doc_buffer.capacity()
}

pub fn is_empty(&self) -> bool {
self.doc_lengths.is_empty()
}

fn num_bytes(&self) -> usize {
self.doc_buffer.len()
}
Expand Down
135 changes: 68 additions & 67 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
Expand All @@ -27,16 +26,19 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use quickwit_proto::control_plane::{
ControlPlaneService, ControlPlaneServiceClient, GetOpenShardsRequest, GetOpenShardsSubrequest,
GetOpenShardsSubresponse,
};
use quickwit_proto::ingest::ingester::{IngesterService, PersistRequest, PersistSubrequest};
use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestResponseV2, IngestRouterService, IngestSubrequest,
};
use quickwit_proto::ingest::IngestV2Result;
use quickwit_proto::types::NodeId;
use quickwit_proto::IndexUid;
use tokio::sync::RwLock;

use super::shard_table::ShardTable;
use super::{DocBatchBuilderV2, IngesterPool};
use super::IngesterPool;

type LeaderId = String;

Expand All @@ -54,7 +56,7 @@ struct RouterState {
}

impl fmt::Debug for IngestRouter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("IngestRouter")
.field("self_node_id", &self.self_node_id)
.field("replication_factor", &self.replication_factor)
Expand All @@ -81,18 +83,19 @@ impl IngestRouter {
}
}

async fn refresh_shard_table(
&mut self,
ingest_request: &IngestRequestV2,
) -> IngestV2Result<()> {
/// Identifies the (index, source), for which we (the router) do not know about
/// any open shard, and returns a GetOpen requests to the control plane.
async fn create_get_open_shards_subrequests_for_missing_shards(
&self,
ingest_sub_requests: &[IngestSubrequest],
) -> Vec<GetOpenShardsSubrequest> {
let state_guard = self.state.read().await;

let shard_table = &state_guard.shard_table;
let mut get_open_shards_subrequests = Vec::new();

for ingest_subrequest in &ingest_request.subrequests {
for ingest_subrequest in ingest_sub_requests {
if !shard_table
.contains_entry(&*ingest_subrequest.index_id, &ingest_subrequest.source_id)
.contains_entry(&ingest_subrequest.index_id, &ingest_subrequest.source_id)
{
let subrequest = GetOpenShardsSubrequest {
index_id: ingest_subrequest.index_id.clone(),
Expand All @@ -101,20 +104,12 @@ impl IngestRouter {
get_open_shards_subrequests.push(subrequest);
}
}
if get_open_shards_subrequests.is_empty() {
return Ok(());
}
drop(state_guard);

let request = GetOpenShardsRequest {
subrequests: get_open_shards_subrequests,
unavailable_ingesters: Vec::new(),
};
let response = self.control_plane.get_open_shards(request).await?;
get_open_shards_subrequests
}

async fn update_shard_table(&mut self, subresponses: Vec<GetOpenShardsSubresponse>) {
let mut state_guard = self.state.write().await;

for subresponse in response.subresponses {
for subresponse in subresponses {
let index_uid: IndexUid = subresponse.index_uid.into();
let index_id = index_uid.index_id().to_string();
state_guard.shard_table.update_entry(
Expand All @@ -123,6 +118,28 @@ impl IngestRouter {
subresponse.open_shards,
);
}
}

async fn refresh_shard_table(
&mut self,
ingest_request: &IngestRequestV2,
) -> IngestV2Result<()> {
let get_open_shards_subrequests = self
.create_get_open_shards_subrequests_for_missing_shards(&ingest_request.subrequests)
.await;

if get_open_shards_subrequests.is_empty() {
return Ok(());
}

let request = GetOpenShardsRequest {
subrequests: get_open_shards_subrequests,
unavailable_ingesters: Vec::new(),
};

let response = self.control_plane.get_open_shards(request).await?;
self.update_shard_table(response.subresponses).await;

Ok(())
}
}
Expand All @@ -133,9 +150,19 @@ impl IngestRouterService for IngestRouter {
&mut self,
ingest_request: IngestRequestV2,
) -> IngestV2Result<IngestResponseV2> {
// First, we ensure we have shards for all of the requested `(Index, Source)`.
// If we have never heard of a given `(Index, Source)` a request will
// be made to the control plane.
//
// The control plane will either tell us about an existing open shard or
// open one for us.
self.refresh_shard_table(&ingest_request).await?;

let mut doc_batch_builders: Vec<DocBatchBuilderV2> = Vec::new();
// Our ingest request may target several source, and might issue requests
// to more than one ingesters.
//
// We want however to emit one RPC per ingester. The code here is precisely
// dispatching requests accordingly.
let mut persist_subrequests: HashMap<&LeaderId, Vec<PersistSubrequest>> = HashMap::new();

let state_guard = self.state.read().await;
Expand All @@ -144,53 +171,27 @@ impl IngestRouterService for IngestRouter {
// lines, validate, transform and then pack the docs into compressed batches routed
// to the right shards.
for ingest_subrequest in ingest_request.subrequests {
let table_entry = state_guard
let shard = state_guard
.shard_table
.find_entry(&*ingest_subrequest.index_id, &ingest_subrequest.source_id)
.select_shard_with_round_robbin(
&*ingest_subrequest.index_id,
&ingest_subrequest.source_id,
)
.expect("TODO");

if table_entry.len() == 1 {
let shard = &table_entry.shards()[0];
let persist_subrequest = PersistSubrequest {
index_uid: shard.index_uid.clone(),
source_id: ingest_subrequest.source_id,
shard_id: shard.shard_id,
follower_id: shard.follower_id.clone(),
doc_batch: ingest_subrequest.doc_batch,
};
persist_subrequests
.entry(&shard.leader_id)
.or_default()
.push(persist_subrequest);
continue;
}
doc_batch_builders.resize_with(table_entry.len(), DocBatchBuilderV2::default);

for (i, doc) in ingest_subrequest.docs().enumerate() {
let shard_idx = i % table_entry.len();
doc_batch_builders[shard_idx].add_doc(doc.borrow());
}
for (shard, doc_batch_builder) in table_entry
.shards()
.iter()
.zip(doc_batch_builders.drain(..))
{
if !doc_batch_builder.is_empty() {
let doc_batch = doc_batch_builder.build();
let persist_subrequest = PersistSubrequest {
index_uid: shard.index_uid.clone(),
source_id: ingest_subrequest.source_id.clone(),
shard_id: shard.shard_id,
follower_id: shard.follower_id.clone(),
doc_batch: Some(doc_batch),
};
persist_subrequests
.entry(&shard.leader_id)
.or_default()
.push(persist_subrequest);
}
}
let persist_subrequest = PersistSubrequest {
index_uid: shard.index_uid.clone(),
source_id: ingest_subrequest.source_id,
shard_id: shard.shard_id,
follower_id: shard.follower_id.clone(),
doc_batch: ingest_subrequest.doc_batch,
};
persist_subrequests
.entry(&shard.leader_id)
.or_default()
.push(persist_subrequest);
}

let mut persist_futures = FuturesUnordered::new();

for (leader_id, subrequests) in persist_subrequests {
Expand Down
30 changes: 17 additions & 13 deletions quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};

use quickwit_proto::ingest::Shard;
use quickwit_proto::types::SourceId;
Expand All @@ -27,26 +28,28 @@ use quickwit_proto::IndexId;
#[derive(Debug, Default)]
pub(crate) struct ShardTableEntry {
shards: Vec<Shard>,
round_robbin_state: AtomicUsize,
}

impl ShardTableEntry {
/// Creates a new entry and ensures that the shards are open and unique.
///
/// A shard table entry may not be empty.
pub fn new(mut shards: Vec<Shard>) -> Self {
assert!(!shards.is_empty());
shards.retain(|shard| shard.is_open());
shards.sort_unstable_by_key(|shard| shard.shard_id);
shards.dedup_by_key(|shard| shard.shard_id);

Self { shards }
}

/// Returns the number of shards that make up the entry.
pub fn len(&self) -> usize {
self.shards.len()
Self {
shards,
round_robbin_state: AtomicUsize::default(),
}
}

/// Returns the shards that make up the entry.
pub fn shards(&self) -> &[Shard] {
&self.shards
/// Returns a shard to send documents to using round robbin.
pub fn next_round_robbin(&self) -> &Shard {
let idx = self.round_robbin_state.fetch_add(1, Ordering::Relaxed);
&self.shards[idx % self.shards.len()]
}
}

Expand All @@ -66,13 +69,14 @@ impl ShardTable {
self.table.contains_key(&key)
}

pub fn find_entry(
pub fn select_shard_with_round_robbin(
&self,
index_id: impl Into<IndexId>,
source_id: impl Into<SourceId>,
) -> Option<&ShardTableEntry> {
) -> Option<&Shard> {
let key = (index_id.into(), source_id.into());
self.table.get(&key)
let shard_table_entry = self.table.get(&key)?;
Some(shard_table_entry.next_round_robbin())
}

pub fn update_entry(
Expand Down

0 comments on commit 7045eb6

Please sign in to comment.