diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index dff26829584..cd355eb6ade 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -34,6 +34,7 @@ pub mod pubsub;
pub mod rand;
pub mod rate_limited_tracing;
pub mod rate_limiter;
+pub mod ref_tracker;
pub mod rendezvous_hasher;
pub mod retry;
pub mod runtimes;
diff --git a/quickwit/quickwit-common/src/ref_tracker.rs b/quickwit/quickwit-common/src/ref_tracker.rs
new file mode 100644
index 00000000000..c317f760d8e
--- /dev/null
+++ b/quickwit/quickwit-common/src/ref_tracker.rs
@@ -0,0 +1,125 @@
+// Copyright (C) 2024 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::collections::LinkedList;
+use std::sync::Weak;
+
+pub trait Container {
+ fn contains(&self, other: &T) -> bool;
+}
+
+/// Data structure for checking if a given value is still present in a set of
+/// weakly referenced containers (e.g arrays).
+///
+/// Each time a lookup is performed, all inactive references are removed.
+/// Lookups are O(number of referenced containers).
+pub struct RefTracker {
+ refs: LinkedList>,
+ _marker: std::marker::PhantomData,
+}
+
+impl Default for RefTracker {
+ fn default() -> Self {
+ RefTracker {
+ refs: LinkedList::new(),
+ _marker: std::marker::PhantomData,
+ }
+ }
+}
+
+impl, T> RefTracker {
+ pub fn add(&mut self, weak: Weak) {
+ self.refs.push_back(weak);
+ }
+
+ pub fn contains(&mut self, value: &T) -> bool {
+ // TODO replace impl with `LinkedList::retain` when stabilized
+ let mut new_refs = LinkedList::new();
+ let mut value_found = false;
+ while let Some(weak) = self.refs.pop_front() {
+ if let Some(ingester_ids) = weak.upgrade() {
+ if !value_found && ingester_ids.contains(value) {
+ value_found = true;
+ }
+ new_refs.push_back(weak);
+ }
+ }
+ self.refs = new_refs;
+ value_found
+ }
+}
+
+impl Container for Vec {
+ fn contains(&self, other: &T) -> bool {
+ self[..].contains(other)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use super::*;
+
+ #[test]
+ fn test_add_and_contains() {
+ let mut ref_tracker = RefTracker::default();
+ let container = Arc::new(vec![1, 2, 3]);
+ let weak_container = Arc::downgrade(&container);
+
+ ref_tracker.add(weak_container);
+ assert!(ref_tracker.contains(&2));
+ assert!(!ref_tracker.contains(&4));
+ }
+
+ #[test]
+ fn test_contains_with_dropped_reference() {
+ let mut ref_tracker = RefTracker::default();
+ let container = Arc::new(vec![1, 2, 3]);
+ let weak_container = Arc::downgrade(&container);
+
+ ref_tracker.add(weak_container);
+ drop(container);
+
+ assert!(!ref_tracker.contains(&2));
+ assert!(ref_tracker.refs.is_empty());
+ }
+
+ #[test]
+ fn test_multiple_references() {
+ let mut ref_tracker = RefTracker::default();
+ let container1 = Arc::new(vec![1, 2, 3]);
+ let container2 = Arc::new(vec![4, 5, 6]);
+ let weak_container1 = Arc::downgrade(&container1);
+ let weak_container2 = Arc::downgrade(&container2);
+
+ ref_tracker.add(weak_container1);
+ ref_tracker.add(weak_container2);
+
+ assert!(ref_tracker.contains(&2));
+ assert!(ref_tracker.contains(&5));
+ assert!(!ref_tracker.contains(&7));
+ }
+
+ #[test]
+ fn test_empty_tracker() {
+ let mut ref_tracker: RefTracker, i32> = RefTracker::default();
+ assert!(!ref_tracker.contains(&1));
+ }
+}
diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs
index c01605c017a..eb95b6e0691 100644
--- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs
+++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs
@@ -29,7 +29,7 @@ use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::retry::RetryParams;
use quickwit_ingest::{
- decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
+ decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchMessage, MultiFetchStream,
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::ingest::ingester::{
@@ -473,22 +473,33 @@ impl Source for IngestSource {
let deadline = now + *EMIT_BATCHES_TIMEOUT;
loop {
match time::timeout_at(deadline, self.fetch_stream.next()).await {
- Ok(Ok(fetch_message)) => match fetch_message.message {
- Some(fetch_message::Message::Payload(fetch_payload)) => {
- self.process_fetch_payload(&mut batch_builder, fetch_payload)?;
+ Ok(Ok(MultiFetchMessage {
+ fetch_message,
+ force_commit,
+ })) => {
+ if force_commit {
+ batch_builder.force_commit();
+ }
+ match fetch_message.message {
+ Some(fetch_message::Message::Payload(fetch_payload)) => {
+ self.process_fetch_payload(&mut batch_builder, fetch_payload)?;
- if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
- break;
+ if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
+ break;
+ }
+ }
+ Some(fetch_message::Message::Eof(fetch_eof)) => {
+ self.process_fetch_eof(&mut batch_builder, fetch_eof)?;
+ if force_commit {
+ batch_builder.force_commit()
+ }
+ }
+ None => {
+ warn!("received empty fetch message");
+ continue;
}
}
- Some(fetch_message::Message::Eof(fetch_eof)) => {
- self.process_fetch_eof(&mut batch_builder, fetch_eof)?;
- }
- None => {
- warn!("received empty fetch message");
- continue;
- }
- },
+ }
Ok(Err(fetch_stream_error)) => {
self.process_fetch_stream_error(&mut batch_builder, fetch_stream_error)?;
}
@@ -506,9 +517,6 @@ impl Source for IngestSource {
num_millis=%now.elapsed().as_millis(),
"Sending doc batch to indexer."
);
- if !self.fetch_stream.has_active_shard_subscriptions() {
- batch_builder.force_commit();
- }
let message = batch_builder.build();
ctx.send_message(doc_processor_mailbox, message).await?;
}
@@ -673,11 +681,8 @@ mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
- use bytesize::ByteSize;
use itertools::Itertools;
use quickwit_actors::{ActorContext, Universe};
- use quickwit_common::metrics::MEMORY_METRICS;
- use quickwit_common::stream_utils::InFlightValue;
use quickwit_common::ServiceStream;
use quickwit_config::{IndexingSettings, SourceConfig, SourceParams};
use quickwit_proto::indexing::IndexingPipelineId;
@@ -1366,8 +1371,9 @@ mod tests {
#[tokio::test]
async fn test_ingest_source_emit_batches() {
+ let node_id = NodeId::from("test-node");
let pipeline_id = IndexingPipelineId {
- node_id: NodeId::from("test-node"),
+ node_id: node_id.clone(),
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
pipeline_uid: PipelineUid::default(),
@@ -1421,7 +1427,6 @@ mod tests {
status: IndexingStatus::Active,
},
);
- let fetch_message_tx = source.fetch_stream.fetch_message_tx();
let fetch_payload = FetchPayload {
index_uid: Some(IndexUid::for_test("test-index", 0)),
@@ -1435,14 +1440,10 @@ mod tests {
from_position_exclusive: Some(Position::offset(11u64)),
to_position_inclusive: Some(Position::offset(14u64)),
};
- let batch_size = fetch_payload.estimate_size();
- let fetch_message = FetchMessage::new_payload(fetch_payload);
- let in_flight_value = InFlightValue::new(
- fetch_message,
- batch_size,
- &MEMORY_METRICS.in_flight.fetch_stream,
- );
- fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
+ source
+ .fetch_stream
+ .send_message(&node_id, Ok(FetchMessage::new_payload(fetch_payload)))
+ .await;
let fetch_payload = FetchPayload {
index_uid: Some(IndexUid::for_test("test-index", 0)),
@@ -1452,28 +1453,22 @@ mod tests {
from_position_exclusive: Some(Position::offset(22u64)),
to_position_inclusive: Some(Position::offset(23u64)),
};
- let batch_size = fetch_payload.estimate_size();
- let fetch_message = FetchMessage::new_payload(fetch_payload);
- let in_flight_value = InFlightValue::new(
- fetch_message,
- batch_size,
- &MEMORY_METRICS.in_flight.fetch_stream,
- );
- fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
+ source
+ .fetch_stream
+ .send_message(&node_id, Ok(FetchMessage::new_payload(fetch_payload)))
+ .await;
let fetch_eof = FetchEof {
index_uid: Some(IndexUid::for_test("test-index", 0)),
source_id: "test-source".into(),
shard_id: Some(ShardId::from(2)),
eof_position: Some(Position::eof(23u64)),
+ is_decommissioning: false,
};
- let fetch_message = FetchMessage::new_eof(fetch_eof);
- let in_flight_value = InFlightValue::new(
- fetch_message,
- ByteSize(0),
- &MEMORY_METRICS.in_flight.fetch_stream,
- );
- fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
+ source
+ .fetch_stream
+ .send_message(&node_id, Ok(FetchMessage::new_eof(fetch_eof)))
+ .await;
source
.emit_batches(&doc_processor_mailbox, &ctx)
@@ -1511,15 +1506,18 @@ mod tests {
let shard = source.assigned_shards.get(&ShardId::from(2)).unwrap();
assert_eq!(shard.status, IndexingStatus::ReachedEof);
- fetch_message_tx
- .send(Err(FetchStreamError {
- index_uid: IndexUid::for_test("test-index", 0),
- source_id: "test-source".into(),
- shard_id: ShardId::from(1),
- ingest_error: IngestV2Error::Internal("test-error".to_string()),
- }))
- .await
- .unwrap();
+ source
+ .fetch_stream
+ .send_message(
+ &node_id,
+ Err(FetchStreamError {
+ index_uid: IndexUid::for_test("test-index", 0),
+ source_id: "test-source".into(),
+ shard_id: ShardId::from(1),
+ ingest_error: IngestV2Error::Internal("test-error".to_string()),
+ }),
+ )
+ .await;
source
.emit_batches(&doc_processor_mailbox, &ctx)
@@ -1536,14 +1534,10 @@ mod tests {
from_position_exclusive: Some(Position::offset(14u64)),
to_position_inclusive: Some(Position::offset(15u64)),
};
- let batch_size = fetch_payload.estimate_size();
- let fetch_message = FetchMessage::new_payload(fetch_payload);
- let in_flight_value = InFlightValue::new(
- fetch_message,
- batch_size,
- &MEMORY_METRICS.in_flight.fetch_stream,
- );
- fetch_message_tx.send(Ok(in_flight_value)).await.unwrap();
+ source
+ .fetch_stream
+ .send_message(&node_id, Ok(FetchMessage::new_payload(fetch_payload)))
+ .await;
source
.emit_batches(&doc_processor_mailbox, &ctx)
@@ -1555,8 +1549,9 @@ mod tests {
#[tokio::test]
async fn test_ingest_source_emit_batches_shard_not_found() {
+ let node_id = NodeId::from("test-node");
let pipeline_id = IndexingPipelineId {
- node_id: NodeId::from("test-node"),
+ node_id: node_id.clone(),
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
pipeline_uid: PipelineUid::default(),
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
index 65739d89242..2fade99e221 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
@@ -28,11 +28,13 @@ use bytesize::ByteSize;
use futures::StreamExt;
use mrecordlog::Record;
use quickwit_common::metrics::MEMORY_METRICS;
+use quickwit_common::ref_tracker::RefTracker;
use quickwit_common::retry::RetryParams;
use quickwit_common::stream_utils::{InFlightValue, TrackedSender};
use quickwit_common::{spawn_named_task, ServiceStream};
use quickwit_proto::ingest::ingester::{
- fetch_message, FetchEof, FetchMessage, FetchPayload, IngesterService, OpenFetchStreamRequest,
+ fetch_message, FetchEof, FetchMessage, FetchPayload, IngesterService, IngesterStatus,
+ OpenFetchStreamRequest,
};
use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, MRecordBatch};
use quickwit_proto::types::{queue_id, IndexUid, NodeId, Position, QueueId, ShardId, SourceId};
@@ -61,6 +63,7 @@ pub(super) struct FetchStreamTask {
/// task does not need to grab the lock and poll the mrecordlog queue unnecessarily.
shard_status_rx: watch::Receiver,
batch_num_bytes: usize,
+ ingester_status_rx: watch::Receiver,
}
impl fmt::Debug for FetchStreamTask {
@@ -74,12 +77,30 @@ impl fmt::Debug for FetchStreamTask {
}
}
+/// Wrapper around [`FetchMessage`] that keeps track of both the amount of data
+/// in flight and its origin
+#[derive(Debug)]
+struct TrackedFetchMessage {
+ /// the ingester from which the message was fetched
+ source_node_id: NodeId,
+ /// the actual message and a tracker of the amount of in flight data
+ fetch_message: InFlightValue,
+ /// a guard to track the ingesters that still have in flight data
+ _tracked_ingesters_ref: Arc>,
+}
+
+pub struct MultiFetchMessage {
+ pub force_commit: bool,
+ pub fetch_message: FetchMessage,
+}
+
impl FetchStreamTask {
pub fn spawn(
open_fetch_stream_request: OpenFetchStreamRequest,
mrecordlog: Arc>>,
shard_status_rx: watch::Receiver,
batch_num_bytes: usize,
+ ingester_status_rx: watch::Receiver,
) -> (ServiceStream>, JoinHandle<()>) {
let from_position_inclusive = open_fetch_stream_request
.from_position_exclusive()
@@ -99,6 +120,7 @@ impl FetchStreamTask {
fetch_message_tx,
shard_status_rx,
batch_num_bytes,
+ ingester_status_rx,
};
let future = async move { fetch_task.run().await };
let fetch_task_handle: JoinHandle<()> = spawn_named_task(future, "fetch_task");
@@ -216,6 +238,10 @@ impl FetchStreamTask {
source_id: self.source_id.clone(),
shard_id: Some(self.shard_id.clone()),
eof_position: Some(eof_position),
+ is_decommissioning: matches!(
+ *self.ingester_status_rx.borrow(),
+ IngesterStatus::Decommissioning | IngesterStatus::Decommissioned
+ ),
};
let fetch_message = FetchMessage::new_eof(fetch_eof);
let _ = self
@@ -264,8 +290,9 @@ pub struct MultiFetchStream {
ingester_pool: IngesterPool,
retry_params: RetryParams,
fetch_task_handles: HashMap>,
- fetch_message_rx: mpsc::Receiver, FetchStreamError>>,
- fetch_message_tx: mpsc::Sender, FetchStreamError>>,
+ fetch_message_rx: mpsc::Receiver>,
+ fetch_message_tx: mpsc::Sender>,
+ active_ingesters: RefTracker, NodeId>,
}
impl MultiFetchStream {
@@ -282,24 +309,34 @@ impl MultiFetchStream {
ingester_pool,
retry_params,
fetch_task_handles: HashMap::new(),
+ active_ingesters: RefTracker::default(),
fetch_message_rx,
fetch_message_tx,
}
}
#[cfg(any(test, feature = "testsuite"))]
- pub fn fetch_message_tx(
+ pub async fn send_message(
&self,
- ) -> mpsc::Sender, FetchStreamError>> {
- self.fetch_message_tx.clone()
- }
-
- pub fn has_active_shard_subscriptions(&self) -> bool {
- tracing::info!(
- tx_count = self.fetch_message_tx.strong_count(),
- "has_active_shard_subscriptions"
- );
- self.fetch_message_tx.strong_count() > 1
+ node_id: &NodeId,
+ message_res: Result,
+ ) {
+ let tracked_msg = message_res
+ .map(|msg| {
+ let batch_size = match &msg.message {
+ Some(fetch_message::Message::Payload(fetch_payload)) => {
+ fetch_payload.estimate_size()
+ }
+ _ => ByteSize(0),
+ };
+ InFlightValue::new(msg, batch_size, &MEMORY_METRICS.in_flight.fetch_stream)
+ })
+ .map(|msg| TrackedFetchMessage {
+ source_node_id: node_id.clone(),
+ fetch_message: msg,
+ _tracked_ingesters_ref: Arc::new(vec![node_id.clone()]),
+ });
+ self.fetch_message_tx.send(tracked_msg).await.unwrap();
}
/// Subscribes to a shard and fails over to the replica if an error occurs.
@@ -330,49 +367,59 @@ impl MultiFetchStream {
if let Some(failover_ingester_id) = failover_ingester_id_opt {
ingester_ids.push(failover_ingester_id);
}
+ let ingester_ids_ref = Arc::new(ingester_ids);
+ let ingester_ids_weak = Arc::downgrade(&ingester_ids_ref);
let fetch_stream_future = retrying_fetch_stream(
self.client_id.clone(),
index_uid,
source_id,
shard_id,
from_position_exclusive,
- ingester_ids,
+ ingester_ids_ref,
self.ingester_pool.clone(),
self.retry_params,
self.fetch_message_tx.clone(),
);
let fetch_task_handle = spawn_named_task(fetch_stream_future, "fetch_stream");
self.fetch_task_handles.insert(queue_id, fetch_task_handle);
+ self.active_ingesters.add(ingester_ids_weak);
Ok(())
}
- pub fn unsubscribe(
- &mut self,
- index_uid: &IndexUid,
- source_id: &str,
- shard_id: ShardId,
- ) -> IngestV2Result<()> {
- let queue_id = queue_id(index_uid, source_id, &shard_id);
-
- if let Some(fetch_stream_handle) = self.fetch_task_handles.remove(&queue_id) {
- fetch_stream_handle.abort();
- }
- Ok(())
- }
-
- /// Returns the next fetch response. This method blocks until a response is available.
+ /// Returns the next fetch response and a boolean set to true if the batch
+ /// should be force committed. This method blocks until a response is
+ /// available.
///
/// # Cancel safety
///
/// This method is cancel safe.
- pub async fn next(&mut self) -> Result {
+ pub async fn next(&mut self) -> Result {
// Because we always hold a sender and never call `close()` on the receiver, the channel is
// always open.
- self.fetch_message_rx
+ let TrackedFetchMessage {
+ fetch_message,
+ source_node_id,
+ _tracked_ingesters_ref,
+ } = self
+ .fetch_message_rx
.recv()
.await
- .expect("channel should be open")
- .map(|value: InFlightValue| value.into_inner())
+ .expect("channel should be open")?;
+
+ let fetch_message = fetch_message.into_inner();
+ drop(_tracked_ingesters_ref);
+
+ let force_commit =
+ if let Some(fetch_message::Message::Eof(fetch_eof)) = &fetch_message.message {
+ fetch_eof.is_decommissioning && !self.active_ingesters.contains(&source_node_id)
+ } else {
+ false
+ };
+
+ Ok(MultiFetchMessage {
+ force_commit,
+ fetch_message,
+ })
}
/// Resets the stream by aborting all the active fetch tasks and dropping all queued responses.
@@ -381,7 +428,7 @@ impl MultiFetchStream {
/// simultaneously because they are both `&mut self` methods.
pub fn reset(&mut self) {
for (_queue_id, fetch_stream_handle) in self.fetch_task_handles.drain() {
- fetch_stream_handle.abort();
+ fetch_stream_handle.abort()
}
let (fetch_message_tx, fetch_message_rx) = mpsc::channel(3);
self.fetch_message_tx = fetch_message_tx;
@@ -425,10 +472,10 @@ async fn retrying_fetch_stream(
source_id: SourceId,
shard_id: ShardId,
mut from_position_exclusive: Position,
- ingester_ids: Vec,
+ ingester_ids: Arc>,
ingester_pool: IngesterPool,
retry_params: RetryParams,
- fetch_message_tx: mpsc::Sender, FetchStreamError>>,
+ fetch_message_tx: mpsc::Sender>,
) {
for num_attempts in 1..=retry_params.max_attempts {
fault_tolerant_fetch_stream(
@@ -460,13 +507,13 @@ async fn fault_tolerant_fetch_stream(
source_id: SourceId,
shard_id: ShardId,
from_position_exclusive: &mut Position,
- ingester_ids: &[NodeId],
+ tracked_ingester_ids: &Arc>,
ingester_pool: IngesterPool,
- fetch_message_tx: mpsc::Sender, FetchStreamError>>,
+ fetch_message_tx: mpsc::Sender>,
) {
// TODO: We can probably simplify this code by breaking it into smaller functions.
- 'outer: for (ingester_idx, ingester_id) in ingester_ids.iter().enumerate() {
- let failover_ingester_id_opt = ingester_ids.get(ingester_idx + 1);
+ 'outer: for (ingester_idx, ingester_id) in tracked_ingester_ids.iter().enumerate() {
+ let failover_ingester_id_opt = tracked_ingester_ids.get(ingester_idx + 1);
let Some(ingester) = ingester_pool.get(ingester_id) else {
if let Some(failover_ingester_id) = failover_ingester_id_opt {
@@ -570,7 +617,15 @@ async fn fault_tolerant_fetch_stream(
batch_size,
&MEMORY_METRICS.in_flight.multi_fetch_stream,
);
- if fetch_message_tx.send(Ok(in_flight_value)).await.is_err() {
+ if fetch_message_tx
+ .send(Ok(TrackedFetchMessage {
+ source_node_id: ingester_id.clone(),
+ fetch_message: in_flight_value,
+ _tracked_ingesters_ref: tracked_ingester_ids.clone(),
+ }))
+ .await
+ .is_err()
+ {
// The consumer was dropped.
return;
}
@@ -585,7 +640,13 @@ async fn fault_tolerant_fetch_stream(
);
// We ignore the send error if the consumer was dropped because we're going
// to return anyway.
- let _ = fetch_message_tx.send(Ok(in_flight_value)).await;
+ let _ = fetch_message_tx
+ .send(Ok(TrackedFetchMessage {
+ source_node_id: ingester_id.clone(),
+ fetch_message: in_flight_value,
+ _tracked_ingesters_ref: tracked_ingester_ids.clone(),
+ }))
+ .await;
*from_position_exclusive = eof_position;
return;
@@ -643,15 +704,27 @@ pub(super) mod tests {
use super::*;
use crate::MRecord;
- pub fn into_fetch_payload(fetch_message: FetchMessage) -> FetchPayload {
- match fetch_message.message.unwrap() {
+ impl Into for TrackedFetchMessage {
+ fn into(self) -> FetchMessage {
+ self.fetch_message.into_inner()
+ }
+ }
+
+ impl Into for MultiFetchMessage {
+ fn into(self) -> FetchMessage {
+ self.fetch_message
+ }
+ }
+
+ pub fn into_fetch_payload(fetch_message: impl Into) -> FetchPayload {
+ match fetch_message.into().message.unwrap() {
fetch_message::Message::Payload(fetch_payload) => fetch_payload,
other => panic!("expected fetch payload, got `{other:?}`"),
}
}
- pub fn into_fetch_eof(fetch_message: FetchMessage) -> FetchEof {
- match fetch_message.message.unwrap() {
+ pub fn into_fetch_eof(fetch_message: impl Into) -> FetchEof {
+ match fetch_message.into().message.unwrap() {
fetch_message::Message::Eof(fetch_eof) => fetch_eof,
other => panic!("expected fetch EOF, got `{other:?}`"),
}
@@ -677,11 +750,13 @@ pub(super) mod tests {
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
+ ingester_status_rx,
);
let mut mrecordlog_guard = mrecordlog.write().await;
@@ -895,12 +970,13 @@ pub(super) mod tests {
};
let shard_status = (ShardState::Closed, Position::offset(0u64));
let (_shard_status_tx, shard_status_rx) = watch::channel(shard_status);
-
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
+ ingester_status_rx,
);
let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next())
.await
@@ -937,11 +1013,13 @@ pub(super) mod tests {
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
+ ingester_status_rx,
);
let mut mrecordlog_guard = mrecordlog.write().await;
@@ -991,11 +1069,13 @@ pub(super) mod tests {
from_position_exclusive: Some(Position::offset(0u64)),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
+ ingester_status_rx,
);
let mut mrecordlog_guard = mrecordlog.write().await;
@@ -1100,11 +1180,13 @@ pub(super) mod tests {
from_position_exclusive: Some(Position::Beginning),
};
let (_shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
+ ingester_status_rx,
);
let ingest_error = timeout(Duration::from_millis(100), fetch_stream.next())
.await
@@ -1136,11 +1218,13 @@ pub(super) mod tests {
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
30,
+ ingester_status_rx,
);
let mut mrecordlog_guard = mrecordlog.write().await;
@@ -1230,11 +1314,13 @@ pub(super) mod tests {
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
+ let (_ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::default());
let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
10, //< we request batch larger than 10 bytes.
+ ingester_status_rx,
);
let mut mrecordlog_guard = mrecordlog.write().await;
@@ -1319,7 +1405,7 @@ pub(super) mod tests {
let shard_id = ShardId::from(1);
let mut from_position_exclusive = Position::offset(0u64);
- let ingester_ids: Vec = vec!["test-ingester-0".into(), "test-ingester-1".into()];
+ let ingester_ids = Arc::new(vec!["test-ingester-0".into(), "test-ingester-1".into()]);
let ingester_pool = IngesterPool::default();
let (fetch_message_tx, mut fetch_stream) = ServiceStream::new_bounded(5);
@@ -1358,6 +1444,7 @@ pub(super) mod tests {
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
eof_position: Some(Position::eof(1u64)),
+ is_decommissioning: false,
};
let fetch_message = FetchMessage::new_eof(fetch_eof);
service_stream_tx_1.send(Ok(fetch_message)).unwrap();
@@ -1378,8 +1465,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_payload = into_fetch_payload(fetch_message);
assert_eq!(
@@ -1395,8 +1481,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_eof = into_fetch_eof(fetch_message);
assert_eq!(fetch_eof.eof_position(), Position::eof(1u64));
@@ -1415,7 +1500,7 @@ pub(super) mod tests {
let shard_id = ShardId::from(1);
let mut from_position_exclusive = Position::offset(0u64);
- let ingester_ids: Vec = vec!["test-ingester-0".into(), "test-ingester-1".into()];
+ let ingester_ids = Arc::new(vec!["test-ingester-0".into(), "test-ingester-1".into()]);
let ingester_pool = IngesterPool::default();
let (fetch_message_tx, mut fetch_stream) = ServiceStream::new_bounded(5);
@@ -1472,6 +1557,7 @@ pub(super) mod tests {
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
eof_position: Some(Position::eof(1u64)),
+ is_decommissioning: false,
};
let fetch_message = FetchMessage::new_eof(fetch_eof);
service_stream_tx_1.send(Ok(fetch_message)).unwrap();
@@ -1492,8 +1578,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_payload = into_fetch_payload(fetch_message);
assert_eq!(
@@ -1509,8 +1594,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_eof = into_fetch_eof(fetch_message);
assert_eq!(fetch_eof.eof_position(), Position::eof(1u64));
@@ -1529,7 +1613,7 @@ pub(super) mod tests {
let shard_id = ShardId::from(1);
let mut from_position_exclusive = Position::offset(0u64);
- let ingester_ids: Vec = vec!["test-ingester-0".into(), "test-ingester-1".into()];
+ let ingester_ids = Arc::new(vec!["test-ingester-0".into(), "test-ingester-1".into()]);
let ingester_pool = IngesterPool::default();
let (fetch_message_tx, mut fetch_stream) = ServiceStream::new_bounded(5);
@@ -1588,6 +1672,7 @@ pub(super) mod tests {
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
eof_position: Some(Position::eof(1u64)),
+ is_decommissioning: false,
};
let fetch_message = FetchMessage::new_eof(fetch_eof);
service_stream_tx_1.send(Ok(fetch_message)).unwrap();
@@ -1608,8 +1693,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_payload = into_fetch_payload(fetch_message);
assert_eq!(
@@ -1625,8 +1709,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_eof = into_fetch_eof(fetch_message);
assert_eq!(fetch_eof.eof_position(), Position::eof(1u64));
@@ -1645,7 +1728,7 @@ pub(super) mod tests {
let shard_id = ShardId::from(1);
let mut from_position_exclusive = Position::offset(0u64);
- let ingester_ids: Vec = vec!["test-ingester-0".into(), "test-ingester-1".into()];
+ let ingester_ids = Arc::new(vec!["test-ingester-0".into(), "test-ingester-1".into()]);
let ingester_pool = IngesterPool::default();
let (fetch_message_tx, mut fetch_stream) = ServiceStream::new_bounded(5);
@@ -1791,7 +1874,7 @@ pub(super) mod tests {
source_id,
shard_id,
from_position_exclusive,
- ingester_ids,
+ Arc::new(ingester_ids),
ingester_pool,
retry_params,
fetch_message_tx,
@@ -1812,8 +1895,7 @@ pub(super) mod tests {
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_payload = into_fetch_payload(fetch_message);
assert_eq!(
@@ -1831,15 +1913,15 @@ pub(super) mod tests {
.unwrap()
.unwrap_err();
assert!(
- matches!(fetch_stream_error.ingest_error, IngestV2Error::Internal(message) if message == "fetch stream error #1")
+ matches!(fetch_stream_error.ingest_error, IngestV2Error::Internal(message) if
+ message == "fetch stream error #1")
);
let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next())
.await
.unwrap()
.unwrap()
- .unwrap()
- .into_inner();
+ .unwrap();
let fetch_payload = into_fetch_payload(fetch_message);
assert_eq!(
@@ -1872,9 +1954,8 @@ pub(super) mod tests {
let client_id = "test-client".to_string();
let ingester_pool = IngesterPool::default();
let retry_params = RetryParams::for_test();
- let multi_fetch_stream =
+ let _multi_fetch_stream =
MultiFetchStream::new(self_node_id, client_id, ingester_pool, retry_params);
- assert!(!multi_fetch_stream.has_active_shard_subscriptions())
// TODO: Backport from original branch.
}
}
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
index 7412a25cf28..19085a73049 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
@@ -913,6 +913,7 @@ impl Ingester {
mrecordlog,
shard_status_rx,
get_batch_num_bytes(),
+ self.state.status_rx.clone(),
);
Ok(service_stream)
}
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs
index 381be88535e..e77b32d6286 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs
@@ -54,7 +54,7 @@ use serde::Serialize;
use tracing::{error, info};
use workbench::pending_subrequests;
-pub use self::fetch::{FetchStreamError, MultiFetchStream};
+pub use self::fetch::{FetchStreamError, MultiFetchMessage, MultiFetchStream};
pub use self::ingester::{wait_for_ingester_decommission, wait_for_ingester_status, Ingester};
use self::mrecord::MRECORD_HEADER_LEN;
pub use self::mrecord::{decoded_mrecords, MRecord};
diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto
index 8874176b941..3f11b5ba1ae 100644
--- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto
+++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto
@@ -249,6 +249,7 @@ message FetchEof {
string source_id = 2;
quickwit.ingest.ShardId shard_id = 3;
quickwit.ingest.Position eof_position = 4;
+ bool is_decommissioning = 5;
}
message InitShardsRequest {
diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs
index ccb13a5e44d..9d2867dd35d 100644
--- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs
+++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs
@@ -342,6 +342,8 @@ pub struct FetchEof {
pub shard_id: ::core::option::Option,
#[prost(message, optional, tag = "4")]
pub eof_position: ::core::option::Option,
+ #[prost(bool, tag = "5")]
+ pub is_decommissioning: bool,
}
#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]