Skip to content

Commit

Permalink
Close fetch stream with an error if it does not reach EOF (#4092)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 9, 2023
1 parent 8fa8f56 commit 8baaa23
Showing 1 changed file with 56 additions and 14 deletions.
70 changes: 56 additions & 14 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,7 @@ impl FetchTask {
.mrecordlog
.range(&self.queue_id, self.fetch_range)
else {
warn!(
client_id=%self.client_id,
index_uid=%self.index_uid,
source_id=%self.source_id,
shard_id=%self.shard_id,
"failed to read from record log because it was dropped."
);
// The queue was dropped.
break;
};
for (_position, mrecord) in mrecords {
Expand Down Expand Up @@ -200,13 +194,21 @@ impl FetchTask {
break;
}
}
debug!(
client_id=%self.client_id,
index_uid=%self.index_uid,
source_id=%self.source_id,
shard_id=%self.shard_id,
"fetch task completed"
);
if !has_reached_eof || !self.fetch_range.is_empty() {
error!(
client_id=%self.client_id,
index_uid=%self.index_uid,
source_id=%self.source_id,
shard_id=%self.shard_id,
"fetch stream ended unexpectedly"
);
let _ = self
.fetch_response_tx
.send(Err(IngestV2Error::Internal(
"fetch stream ended unexpectedly".to_string(),
)))
.await;
}
(
num_records_total,
self.fetch_range.from_position_exclusive(),
Expand Down Expand Up @@ -776,6 +778,46 @@ mod tests {
assert_eq!(last_position, Position::from(4u64));
}

#[tokio::test]
async fn test_fetch_task_error() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap();
let client_id = "test-client".to_string();
let index_uid = "test-index:0".to_string();
let source_id = "test-source".to_string();
let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
index_uid: index_uid.clone(),
source_id: source_id.clone(),
shard_id: 1,
from_position_exclusive: None,
to_position_inclusive: None,
};
let (_new_records_tx, new_records_rx) = watch::channel(());
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
}));
let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn(
open_fetch_stream_request,
state.clone(),
new_records_rx,
1024,
);
let ingest_error = timeout(Duration::from_millis(50), fetch_stream.next())
.await
.unwrap()
.unwrap()
.unwrap_err();
assert!(matches!(ingest_error, IngestV2Error::Internal(_)));

let (num_records, last_position) = fetch_task_handle.await.unwrap();
assert_eq!(num_records, 0);
assert_eq!(last_position, Position::Beginning);
}

#[tokio::test]
async fn test_fetch_task_up_to_position() {
let tempdir = tempfile::tempdir().unwrap();
Expand Down

0 comments on commit 8baaa23

Please sign in to comment.