From f96e61ee684691ca17152cac922083fb5ea21188 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 10 Jun 2024 16:42:00 +0000 Subject: [PATCH] More explicit reponse for read_batch --- .../src/source/doc_file_reader.rs | 36 +++++++++++++------ .../src/source/file_source.rs | 6 ++-- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index d7fbfa94f6e..80d9e662d46 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -91,6 +91,12 @@ pub struct DocFileReader { partition_id: Option, } +/// Result of a read attempt on a `DocFileReader` +pub struct ReadBatchResponse { + pub batch_opt: Option, + pub is_eof: bool, +} + impl DocFileReader { pub async fn from_path( checkpoint: &SourceCheckpoint, @@ -149,12 +155,13 @@ impl DocFileReader { } } - // Return true if the end of the file has been reached. - pub async fn read_batch( - &mut self, - ctx: &SourceContext, - ) -> anyhow::Result<(Option, bool)> { - // We collect batches of documents before sending them to the indexer. + /// Builds a new record batch from the reader. + /// + /// Sets `is_eof` to true when the underlying reader reaches EOF while + /// building the batch. This helps detecting early that we are done reading + /// the file. The returned `batch_opt` is `None` if the reader is already + /// EOF. + pub async fn read_batch(&mut self, ctx: &SourceContext) -> anyhow::Result { let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; let mut reached_eof = false; let mut batch_builder = BatchBuilder::new(SourceType::File); @@ -187,9 +194,15 @@ impl DocFileReader { .unwrap(); } self.counters.previous_offset = self.counters.current_offset; - Ok((Some(batch_builder.build()), reached_eof)) + Ok(ReadBatchResponse { + batch_opt: Some(batch_builder.build()), + is_eof: reached_eof, + }) } else { - Ok((None, reached_eof)) + Ok(ReadBatchResponse { + batch_opt: None, + is_eof: reached_eof, + }) } } @@ -268,8 +281,6 @@ pub mod file_test_helpers { } write_to_tmp(documents_bytes, gzip).await } - - // let temp_file_path = temp_file.path().canonicalize().unwrap(); } #[cfg(test)] @@ -354,7 +365,8 @@ mod tests { let mut parsed_lines = 0; let mut parsed_batches = 0; loop { - let (batch_opt, is_eof) = doc_batch_reader.read_batch(&ctx).await.unwrap(); + let ReadBatchResponse { batch_opt, is_eof } = + doc_batch_reader.read_batch(&ctx).await.unwrap(); if let Some(batch) = batch_opt { parsed_lines += batch.docs.len(); parsed_batches += 1; @@ -362,6 +374,8 @@ mod tests { assert!(parsed_lines <= expected_lines); } else { assert!(is_eof); + } + if is_eof { break; } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 2f427b49d04..b008b427759 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -26,7 +26,7 @@ use quickwit_config::FileSourceParams; use quickwit_proto::types::SourceId; use tracing::info; -use super::doc_file_reader::DocFileReader; +use super::doc_file_reader::{DocFileReader, ReadBatchResponse}; use crate::actors::DocProcessor; use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; @@ -48,11 +48,11 @@ impl Source for FileSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - let (batch_opt, reached_eof) = self.reader.read_batch(ctx).await?; + let ReadBatchResponse { batch_opt, is_eof } = self.reader.read_batch(ctx).await?; if let Some(batch) = batch_opt { ctx.send_message(doc_processor_mailbox, batch).await?; } - if reached_eof { + if is_eof { info!("reached end of file"); ctx.send_exit_with_success(doc_processor_mailbox).await?; return Err(ActorExitStatus::Success);