Skip to content

Commit

Permalink
More explicit reponse for read_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 570ba64 commit f96e61e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
36 changes: 25 additions & 11 deletions quickwit/quickwit-indexing/src/source/doc_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ pub struct DocFileReader {
partition_id: Option<PartitionId>,
}

/// Result of a read attempt on a `DocFileReader`
pub struct ReadBatchResponse {
pub batch_opt: Option<RawDocBatch>,
pub is_eof: bool,
}

impl DocFileReader {
pub async fn from_path(
checkpoint: &SourceCheckpoint,
Expand Down Expand Up @@ -149,12 +155,13 @@ impl DocFileReader {
}
}

// Return true if the end of the file has been reached.
pub async fn read_batch(
&mut self,
ctx: &SourceContext,
) -> anyhow::Result<(Option<RawDocBatch>, bool)> {
// We collect batches of documents before sending them to the indexer.
/// Builds a new record batch from the reader.
///
/// Sets `is_eof` to true when the underlying reader reaches EOF while
/// building the batch. This helps detecting early that we are done reading
/// the file. The returned `batch_opt` is `None` if the reader is already
/// EOF.
pub async fn read_batch(&mut self, ctx: &SourceContext) -> anyhow::Result<ReadBatchResponse> {
let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT;
let mut reached_eof = false;
let mut batch_builder = BatchBuilder::new(SourceType::File);
Expand Down Expand Up @@ -187,9 +194,15 @@ impl DocFileReader {
.unwrap();
}
self.counters.previous_offset = self.counters.current_offset;
Ok((Some(batch_builder.build()), reached_eof))
Ok(ReadBatchResponse {
batch_opt: Some(batch_builder.build()),
is_eof: reached_eof,
})
} else {
Ok((None, reached_eof))
Ok(ReadBatchResponse {
batch_opt: None,
is_eof: reached_eof,
})
}
}

Expand Down Expand Up @@ -268,8 +281,6 @@ pub mod file_test_helpers {
}
write_to_tmp(documents_bytes, gzip).await
}

// let temp_file_path = temp_file.path().canonicalize().unwrap();
}

#[cfg(test)]
Expand Down Expand Up @@ -354,14 +365,17 @@ mod tests {
let mut parsed_lines = 0;
let mut parsed_batches = 0;
loop {
let (batch_opt, is_eof) = doc_batch_reader.read_batch(&ctx).await.unwrap();
let ReadBatchResponse { batch_opt, is_eof } =
doc_batch_reader.read_batch(&ctx).await.unwrap();
if let Some(batch) = batch_opt {
parsed_lines += batch.docs.len();
parsed_batches += 1;
assert!(parsed_lines > 0);
assert!(parsed_lines <= expected_lines);
} else {
assert!(is_eof);
}
if is_eof {
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_config::FileSourceParams;
use quickwit_proto::types::SourceId;
use tracing::info;

use super::doc_file_reader::DocFileReader;
use super::doc_file_reader::{DocFileReader, ReadBatchResponse};
use crate::actors::DocProcessor;
use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory};

Expand All @@ -48,11 +48,11 @@ impl Source for FileSource {
doc_processor_mailbox: &Mailbox<DocProcessor>,
ctx: &SourceContext,
) -> Result<Duration, ActorExitStatus> {
let (batch_opt, reached_eof) = self.reader.read_batch(ctx).await?;
let ReadBatchResponse { batch_opt, is_eof } = self.reader.read_batch(ctx).await?;
if let Some(batch) = batch_opt {
ctx.send_message(doc_processor_mailbox, batch).await?;
}
if reached_eof {
if is_eof {
info!("reached end of file");
ctx.send_exit_with_success(doc_processor_mailbox).await?;
return Err(ActorExitStatus::Success);
Expand Down

0 comments on commit f96e61e

Please sign in to comment.