Skip to content

Commit

Permalink
Track used ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 9, 2024
1 parent b04f0de commit c1aa133
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 136 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod pubsub;
pub mod rand;
pub mod rate_limited_tracing;
pub mod rate_limiter;
pub mod ref_tracker;
pub mod rendezvous_hasher;
pub mod retry;
pub mod runtimes;
Expand Down
125 changes: 125 additions & 0 deletions quickwit/quickwit-common/src/ref_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::LinkedList;
use std::sync::Weak;

pub trait Container<T> {
fn contains(&self, other: &T) -> bool;
}

/// Data structure for checking if a given value is still present in a set of
/// weakly referenced containers (e.g arrays).
///
/// Each time a lookup is performed, all inactive references are removed.
/// Lookups are O(number of referenced containers).
pub struct RefTracker<C, T> {
refs: LinkedList<Weak<C>>,
_marker: std::marker::PhantomData<T>,
}

impl<C, T> Default for RefTracker<C, T> {
fn default() -> Self {
RefTracker {
refs: LinkedList::new(),
_marker: std::marker::PhantomData,
}
}
}

impl<C: Container<T>, T> RefTracker<C, T> {
pub fn add(&mut self, weak: Weak<C>) {
self.refs.push_back(weak);
}

pub fn contains(&mut self, value: &T) -> bool {
// TODO replace impl with `LinkedList::retain` when stabilized
let mut new_refs = LinkedList::new();
let mut value_found = false;
while let Some(weak) = self.refs.pop_front() {
if let Some(ingester_ids) = weak.upgrade() {
if !value_found && ingester_ids.contains(value) {
value_found = true;
}
new_refs.push_back(weak);
}
}
self.refs = new_refs;
value_found
}
}

impl<T: PartialEq> Container<T> for Vec<T> {
fn contains(&self, other: &T) -> bool {
self[..].contains(other)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;

#[test]
fn test_add_and_contains() {
let mut ref_tracker = RefTracker::default();
let container = Arc::new(vec![1, 2, 3]);
let weak_container = Arc::downgrade(&container);

ref_tracker.add(weak_container);
assert!(ref_tracker.contains(&2));
assert!(!ref_tracker.contains(&4));
}

#[test]
fn test_contains_with_dropped_reference() {
let mut ref_tracker = RefTracker::default();
let container = Arc::new(vec![1, 2, 3]);
let weak_container = Arc::downgrade(&container);

ref_tracker.add(weak_container);
drop(container);

assert!(!ref_tracker.contains(&2));
assert!(ref_tracker.refs.is_empty());
}

#[test]
fn test_multiple_references() {
let mut ref_tracker = RefTracker::default();
let container1 = Arc::new(vec![1, 2, 3]);
let container2 = Arc::new(vec![4, 5, 6]);
let weak_container1 = Arc::downgrade(&container1);
let weak_container2 = Arc::downgrade(&container2);

ref_tracker.add(weak_container1);
ref_tracker.add(weak_container2);

assert!(ref_tracker.contains(&2));
assert!(ref_tracker.contains(&5));
assert!(!ref_tracker.contains(&7));
}

#[test]
fn test_empty_tracker() {
let mut ref_tracker: RefTracker<Vec<_>, i32> = RefTracker::default();
assert!(!ref_tracker.contains(&1));
}
}
121 changes: 58 additions & 63 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::retry::RetryParams;
use quickwit_ingest::{
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchMessage, MultiFetchStream,
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::ingest::ingester::{
Expand Down Expand Up @@ -473,22 +473,33 @@ impl Source for IngestSource {
let deadline = now + *EMIT_BATCHES_TIMEOUT;
loop {
match time::timeout_at(deadline, self.fetch_stream.next()).await {
Ok(Ok(fetch_message)) => match fetch_message.message {
Some(fetch_message::Message::Payload(fetch_payload)) => {
self.process_fetch_payload(&mut batch_builder, fetch_payload)?;
Ok(Ok(MultiFetchMessage {
fetch_message,
force_commit,
})) => {
if force_commit {
batch_builder.force_commit();
}
match fetch_message.message {
Some(fetch_message::Message::Payload(fetch_payload)) => {
self.process_fetch_payload(&mut batch_builder, fetch_payload)?;

if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
}
Some(fetch_message::Message::Eof(fetch_eof)) => {
self.process_fetch_eof(&mut batch_builder, fetch_eof)?;
if force_commit {
batch_builder.force_commit()
}
}
None => {
warn!("received empty fetch message");
continue;
}
}
Some(fetch_message::Message::Eof(fetch_eof)) => {
self.process_fetch_eof(&mut batch_builder, fetch_eof)?;
}
None => {
warn!("received empty fetch message");
continue;
}
},
}
Ok(Err(fetch_stream_error)) => {
self.process_fetch_stream_error(&mut batch_builder, fetch_stream_error)?;
}
Expand All @@ -506,9 +517,6 @@ impl Source for IngestSource {
num_millis=%now.elapsed().as_millis(),
"Sending doc batch to indexer."
);
if !self.fetch_stream.has_active_shard_subscriptions() {
batch_builder.force_commit();
}
let message = batch_builder.build();
ctx.send_message(doc_processor_mailbox, message).await?;
}
Expand Down Expand Up @@ -673,11 +681,8 @@ mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use bytesize::ByteSize;
use itertools::Itertools;
use quickwit_actors::{ActorContext, Universe};
use quickwit_common::metrics::MEMORY_METRICS;
use quickwit_common::stream_utils::InFlightValue;
use quickwit_common::ServiceStream;
use quickwit_config::{IndexingSettings, SourceConfig, SourceParams};
use quickwit_proto::indexing::IndexingPipelineId;
Expand Down Expand Up @@ -1366,8 +1371,9 @@ mod tests {

#[tokio::test]
async fn test_ingest_source_emit_batches() {
let node_id = NodeId::from("test-node");
let pipeline_id = IndexingPipelineId {
node_id: NodeId::from("test-node"),
node_id: node_id.clone(),
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
pipeline_uid: PipelineUid::default(),
Expand Down Expand Up @@ -1421,7 +1427,6 @@ mod tests {
status: IndexingStatus::Active,
},
);
let fetch_message_tx = source.fetch_stream.fetch_message_tx();

let fetch_payload = FetchPayload {
index_uid: Some(IndexUid::for_test("test-index", 0)),
Expand All @@ -1435,14 +1440,10 @@ mod tests {
from_position_exclusive: Some(Position::offset(11u64)),
to_position_inclusive: Some(Position::offset(14u64)),
};
let batch_size = fetch_payload.estimate_size();
let fetch_message = FetchMessage::new_payload(fetch_payload);
let in_flight_value = InFlightValue::new(
fetch_message,
batch_size,
&MEMORY_METRICS.in_flight.fetch_stream,
);
fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
source
.fetch_stream
.send_message(&node_id, Ok(FetchMessage::new_payload(fetch_payload)))
.await;

let fetch_payload = FetchPayload {
index_uid: Some(IndexUid::for_test("test-index", 0)),
Expand All @@ -1452,28 +1453,22 @@ mod tests {
from_position_exclusive: Some(Position::offset(22u64)),
to_position_inclusive: Some(Position::offset(23u64)),
};
let batch_size = fetch_payload.estimate_size();
let fetch_message = FetchMessage::new_payload(fetch_payload);
let in_flight_value = InFlightValue::new(
fetch_message,
batch_size,
&MEMORY_METRICS.in_flight.fetch_stream,
);
fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
source
.fetch_stream
.send_message(&node_id, Ok(FetchMessage::new_payload(fetch_payload)))
.await;

let fetch_eof = FetchEof {
index_uid: Some(IndexUid::for_test("test-index", 0)),
source_id: "test-source".into(),
shard_id: Some(ShardId::from(2)),
eof_position: Some(Position::eof(23u64)),
is_decommissioning: false,
};
let fetch_message = FetchMessage::new_eof(fetch_eof);
let in_flight_value = InFlightValue::new(
fetch_message,
ByteSize(0),
&MEMORY_METRICS.in_flight.fetch_stream,
);
fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
source
.fetch_stream
.send_message(&node_id, Ok(FetchMessage::new_eof(fetch_eof)))
.await;

source
.emit_batches(&doc_processor_mailbox, &ctx)
Expand Down Expand Up @@ -1511,15 +1506,18 @@ mod tests {
let shard = source.assigned_shards.get(&ShardId::from(2)).unwrap();
assert_eq!(shard.status, IndexingStatus::ReachedEof);

fetch_message_tx
.send(Err(FetchStreamError {
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".into(),
shard_id: ShardId::from(1),
ingest_error: IngestV2Error::Internal("test-error".to_string()),
}))
.await
.unwrap();
source
.fetch_stream
.send_message(
&node_id,
Err(FetchStreamError {
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".into(),
shard_id: ShardId::from(1),
ingest_error: IngestV2Error::Internal("test-error".to_string()),
}),
)
.await;

source
.emit_batches(&doc_processor_mailbox, &ctx)
Expand All @@ -1536,14 +1534,10 @@ mod tests {
from_position_exclusive: Some(Position::offset(14u64)),
to_position_inclusive: Some(Position::offset(15u64)),
};
let batch_size = fetch_payload.estimate_size();
let fetch_message = FetchMessage::new_payload(fetch_payload);
let in_flight_value = InFlightValue::new(
fetch_message,
batch_size,
&MEMORY_METRICS.in_flight.fetch_stream,
);
fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
source
.fetch_stream
.send_message(&node_id, Ok(FetchMessage::new_payload(fetch_payload)))
.await;

source
.emit_batches(&doc_processor_mailbox, &ctx)
Expand All @@ -1555,8 +1549,9 @@ mod tests {

#[tokio::test]
async fn test_ingest_source_emit_batches_shard_not_found() {
let node_id = NodeId::from("test-node");
let pipeline_id = IndexingPipelineId {
node_id: NodeId::from("test-node"),
node_id: node_id.clone(),
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
pipeline_uid: PipelineUid::default(),
Expand Down
Loading

0 comments on commit c1aa133

Please sign in to comment.