Skip to content

Commit

Permalink
Implement fetch stream retry
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Nov 9, 2023
1 parent 8baaa23 commit 17f4dde
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 52 deletions.
91 changes: 59 additions & 32 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::retry::RetryParams;
use quickwit_ingest::{
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
};
Expand Down Expand Up @@ -60,10 +61,15 @@ impl TypedSourceFactory for IngestSourceFactory {

async fn typed_create_source(
runtime_args: Arc<SourceRuntimeArgs>,
_: Self::Params,
checkpoint: SourceCheckpoint,
_params: Self::Params,
_checkpoint: SourceCheckpoint,
) -> anyhow::Result<Self::Source> {
IngestSource::try_new(runtime_args, checkpoint).await
let retry_params = RetryParams {
max_attempts: usize::MAX,
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(10 * 60), // 10 minutes
};
IngestSource::try_new(runtime_args, retry_params).await
}
}

Expand Down Expand Up @@ -98,14 +104,9 @@ impl ClientId {
}
}

#[cfg(not(test))]
fn new_publish_token(&self) -> String {
format!("{}/{}", self, Ulid::new())
}

#[cfg(test)]
fn new_publish_token(&self) -> String {
format!("{}/{}", self, Ulid::nil())
let ulid = if cfg!(test) { Ulid::nil() } else { Ulid::new() };
format!("{}/{}", self, ulid)
}
}

Expand Down Expand Up @@ -146,7 +147,7 @@ impl fmt::Debug for IngestSource {
impl IngestSource {
pub async fn try_new(
runtime_args: Arc<SourceRuntimeArgs>,
_checkpoint: SourceCheckpoint,
retry_params: RetryParams,
) -> anyhow::Result<Self> {
let self_node_id: NodeId = runtime_args.node_id().into();
let client_id = ClientId::new(
Expand All @@ -158,8 +159,12 @@ impl IngestSource {
let metastore = runtime_args.metastore.clone();
let ingester_pool = runtime_args.ingester_pool.clone();
let assigned_shards = HashMap::new();
let fetch_stream =
MultiFetchStream::new(self_node_id, client_id.to_string(), ingester_pool.clone());
let fetch_stream = MultiFetchStream::new(
self_node_id,
client_id.to_string(),
ingester_pool.clone(),
retry_params,
);
let publish_lock = PublishLock::default();
let publish_token = client_id.new_publish_token();

Expand Down Expand Up @@ -187,6 +192,8 @@ impl IngestSource {
.get_mut(&fetch_response.shard_id)
.expect("shard should be assigned");

assigned_shard.status = IndexingStatus::Active;

let partition_id = assigned_shard.partition_id.clone();
let from_position_exclusive = fetch_response.from_position_exclusive();
let to_position_inclusive = fetch_response.to_position_inclusive();
Expand Down Expand Up @@ -228,15 +235,6 @@ impl IngestSource {
}
}

fn contains_publish_token(&self, subresponse: &AcquireShardsSubresponse) -> bool {
if let Some(acquired_shard) = subresponse.acquired_shards.get(0) {
if let Some(publish_token) = &acquired_shard.publish_token {
return *publish_token == self.publish_token;
}
}
false
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
HashMap::new();
Expand All @@ -247,8 +245,7 @@ impl IngestSource {
}
let Some(shard) = self.assigned_shards.get(shard_id) else {
warn!(
"failed to truncate shard: shard `{}` is no longer assigned",
shard_id
"failed to truncate shard: shard `{shard_id}` is no longer assigned",
);
continue;
};
Expand All @@ -272,8 +269,7 @@ impl IngestSource {
for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(ingester_id) else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
ingester_id
"failed to truncate shard: ingester `{ingester_id}` is unavailable",
);
continue;
};
Expand All @@ -290,6 +286,15 @@ impl IngestSource {
tokio::spawn(truncate_future);
}
}

fn contains_publish_token(&self, subresponse: &AcquireShardsSubresponse) -> bool {
if let Some(acquired_shard) = subresponse.acquired_shards.get(0) {
if let Some(publish_token) = &acquired_shard.publish_token {
return *publish_token == self.publish_token;
}
}
false
}
}

#[async_trait]
Expand Down Expand Up @@ -582,8 +587,8 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
let retry_params = RetryParams::for_test();
let mut source = IngestSource::try_new(runtime_args, retry_params)
.await
.unwrap();

Expand Down Expand Up @@ -660,8 +665,8 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
let retry_params = RetryParams::for_test();
let mut source = IngestSource::try_new(runtime_args, retry_params)
.await
.unwrap();

Expand Down Expand Up @@ -778,6 +783,28 @@ mod tests {
.unwrap();
let shard = source.assigned_shards.get(&1).unwrap();
assert_eq!(shard.status, IndexingStatus::Error);

fetch_response_tx
.send(Ok(FetchResponseV2 {
index_uid: "test-index:0".into(),
source_id: "test-source".into(),
shard_id: 1,
mrecord_batch: Some(MRecordBatch {
mrecord_buffer: Bytes::from_static(b"\0\0test-doc-114"),
mrecord_lengths: vec![14],
}),
from_position_exclusive: Some(14u64.into()),
to_position_inclusive: Some(15u64.into()),
}))
.await
.unwrap();

source
.emit_batches(&doc_processor_mailbox, &ctx)
.await
.unwrap();
let shard = source.assigned_shards.get(&1).unwrap();
assert_eq!(shard.status, IndexingStatus::Active);
}

#[tokio::test]
Expand Down Expand Up @@ -864,8 +891,8 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
let retry_params = RetryParams::for_test();
let mut source = IngestSource::try_new(runtime_args, retry_params)
.await
.unwrap();

Expand Down
Loading

0 comments on commit 17f4dde

Please sign in to comment.