Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ingester graceful shutdown #4117

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::fmt;
use std::pin::Pin;

use futures::{stream, Stream, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream};
use tracing::warn;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;
Expand Down Expand Up @@ -57,6 +57,15 @@ where T: Send + 'static
}
}

impl<T> ServiceStream<T>
where T: Clone + Send + Sync + 'static
{
pub fn new_watch(init: T) -> (watch::Sender<T>, Self) {
let (sender, receiver) = watch::channel(init);
(sender, receiver.into())
}
}

impl<T, E> ServiceStream<Result<T, E>>
where
T: Send + 'static,
Expand Down Expand Up @@ -104,6 +113,16 @@ where T: Send + 'static
}
}

impl<T> From<watch::Receiver<T>> for ServiceStream<T>
where T: Clone + Send + Sync + 'static
{
fn from(receiver: watch::Receiver<T>) -> Self {
Self {
inner: Box::pin(WatchStream::new(receiver)),
}
}
}

/// Adapts a server-side tonic::Streaming into a ServiceStream of `Result<T, tonic::Status>`. Once
/// an error is encountered, the stream will be closed and subsequent calls to `poll_next` will
/// return `None`.
Expand Down
10 changes: 3 additions & 7 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use quickwit_proto::control_plane::{
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{ClosedShards, IngestV2Error, ShardState};
use quickwit_proto::ingest::{IngestV2Error, ShardIds, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
Expand Down Expand Up @@ -172,11 +172,7 @@ impl IngestController {
None
}

fn handle_closed_shards(
&self,
closed_shards: Vec<ClosedShards>,
model: &mut ControlPlaneModel,
) {
fn handle_closed_shards(&self, closed_shards: Vec<ShardIds>, model: &mut ControlPlaneModel) {
for closed_shard in closed_shards {
let index_uid: IndexUid = closed_shard.index_uid.into();
let source_id = closed_shard.source_id;
Expand Down Expand Up @@ -764,7 +760,7 @@ mod tests {

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: vec![ClosedShards {
closed_shards: vec![ShardIds {
index_uid: index_uid.clone().into(),
source_id: source_id.clone(),
shard_ids: vec![1, 2],
Expand Down
63 changes: 43 additions & 20 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::retry::RetryParams;
use quickwit_ingest::{
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::ingest::ingester::{
FetchResponseV2, IngesterService, TruncateRequest, TruncateSubrequest,
FetchResponseV2, IngesterService, TruncateShardsRequest, TruncateShardsSubrequest,
};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsSubrequest, AcquireShardsSubresponse, MetastoreService,
Expand Down Expand Up @@ -237,7 +238,7 @@ impl IngestSource {
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
let mut per_ingester_subrequests: HashMap<&NodeId, Vec<TruncateShardsSubrequest>> =
HashMap::new();

for (shard_id, to_position_exclusive) in truncation_point {
Expand All @@ -251,38 +252,60 @@ impl IngestSource {
);
continue;
};
let truncate_subrequest = TruncateSubrequest {
let truncate_shards_subrequest = TruncateShardsSubrequest {
index_uid: self.client_id.index_uid.clone().into(),
source_id: self.client_id.source_id.clone(),
shard_id: *shard_id,
to_position_inclusive: Some(to_position_exclusive.clone()),
};
if let Some(follower_id) = &shard.follower_id_opt {
per_ingester_truncate_subrequests
per_ingester_subrequests
.entry(follower_id)
.or_default()
.push(truncate_subrequest.clone());
.push(truncate_shards_subrequest.clone());
}
per_ingester_truncate_subrequests
per_ingester_subrequests
.entry(&shard.leader_id)
.or_default()
.push(truncate_subrequest);
.push(truncate_shards_subrequest);
}
for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests {
for (ingester_id, truncate_subrequests) in per_ingester_subrequests {
let Some(mut ingester) = self.ingester_pool.get(ingester_id) else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
ingester_id
);
continue;
};
let truncate_request = TruncateRequest {
let truncate_shards_request = TruncateShardsRequest {
ingester_id: ingester_id.clone().into(),
subrequests: truncate_subrequests,
};
let truncate_future = async move {
if let Err(error) = ingester.truncate(truncate_request).await {
warn!("failed to truncate shard(s): {error}");
let retry_params = RetryParams {
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(30),
max_attempts: 5,
};
let mut num_attempts = 0;

while num_attempts < retry_params.max_attempts {
let Err(error) = ingester
.truncate_shards(truncate_shards_request.clone())
.await
else {
return;
};
num_attempts += 1;
let delay = retry_params.compute_delay(num_attempts);
time::sleep(delay).await;

if num_attempts == retry_params.max_attempts {
error!(
ingester_id=%truncate_shards_request.ingester_id,
"failed to truncate shard(s): {error}"
);
}
}
};
// Truncation is best-effort, so fire and forget.
Expand Down Expand Up @@ -480,7 +503,7 @@ mod tests {
use quickwit_common::ServiceStream;
use quickwit_config::{SourceConfig, SourceParams};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse};
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateShardsResponse};
use quickwit_proto::ingest::{IngestV2Error, MRecordBatch, Shard, ShardState};
use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse};
use quickwit_storage::StorageResolver;
Expand Down Expand Up @@ -552,7 +575,7 @@ mod tests {
Ok(service_stream)
});
ingester_mock_0
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-0");
Expand All @@ -564,7 +587,7 @@ mod tests {
assert_eq!(subrequest.shard_id, 1);
assert_eq!(subrequest.to_position_inclusive, Some(11u64.into()));

let response = TruncateResponse {};
let response = TruncateShardsResponse {};
Ok(response)
});

Expand Down Expand Up @@ -792,7 +815,7 @@ mod tests {

let mut ingester_mock_0 = IngesterServiceClient::mock();
ingester_mock_0
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-0");
Expand All @@ -810,14 +833,14 @@ mod tests {
assert_eq!(subrequest_2.shard_id, 3);
assert_eq!(subrequest_2.to_position_inclusive, Some(33u64.into()));

Ok(TruncateResponse {})
Ok(TruncateShardsResponse {})
});
let ingester_0: IngesterServiceClient = ingester_mock_0.into();
ingester_pool.insert("test-ingester-0".into(), ingester_0.clone());

let mut ingester_mock_1 = IngesterServiceClient::mock();
ingester_mock_1
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-1");
Expand All @@ -831,14 +854,14 @@ mod tests {
assert_eq!(subrequest_1.shard_id, 3);
assert_eq!(subrequest_1.to_position_inclusive, Some(33u64.into()));

Ok(TruncateResponse {})
Ok(TruncateShardsResponse {})
});
let ingester_1: IngesterServiceClient = ingester_mock_1.into();
ingester_pool.insert("test-ingester-1".into(), ingester_1.clone());

let mut ingester_mock_3 = IngesterServiceClient::mock();
ingester_mock_3
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-3");
Expand All @@ -848,7 +871,7 @@ mod tests {
assert_eq!(subrequest_0.shard_id, 4);
assert_eq!(subrequest_0.to_position_inclusive, Some(44u64.into()));

Ok(TruncateResponse {})
Ok(TruncateShardsResponse {})
});
let ingester_3: IngesterServiceClient = ingester_mock_3.into();
ingester_pool.insert("test-ingester-3".into(), ingester_3.clone());
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/codegen/ingest_service.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 18 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,9 @@ mod tests {

use bytes::Bytes;
use mrecordlog::MultiRecordLog;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::ingester::{
IngesterServiceClient, IngesterStatus, ObservationMessage,
};
use quickwit_proto::types::queue_id;
use tokio::time::timeout;

Expand All @@ -534,14 +536,17 @@ mod tests {
shard_id: 1,
from_position_exclusive: None,
};
let (new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
state.clone(),
Expand Down Expand Up @@ -700,14 +705,17 @@ mod tests {
shard_id: 1,
from_position_exclusive: Some(Position::from(0u64)),
};
let (new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
state.clone(),
Expand Down Expand Up @@ -800,14 +808,17 @@ mod tests {
shard_id: 1,
from_position_exclusive: None,
};
let (_new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (_new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
state.clone(),
Expand Down Expand Up @@ -838,12 +849,15 @@ mod tests {
shard_id: 1,
from_position_exclusive: None,
};
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, _fetch_task_handle) =
Expand Down
Loading