Skip to content

Commit

Permalink
Fix reader edge case
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 3, 2024
1 parent bfd360e commit 3a85586
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 56 deletions.
206 changes: 169 additions & 37 deletions quickwit/quickwit-indexing/src/source/doc_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_common::uri::Uri;
use quickwit_common::Progress;
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::Position;
use quickwit_storage::StorageResolver;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};

use super::{BatchBuilder, SourceContext, BATCH_NUM_BYTES_LIMIT};
use super::{BatchBuilder, BATCH_NUM_BYTES_LIMIT};

pub struct FileRecord {
pub next_offset: u64,
pub doc: Bytes,
pub is_last: bool,
}

/// A helper wrapper that lets you skip bytes in compressed files where you
Expand Down Expand Up @@ -67,11 +69,18 @@ impl SkipReader {
Ok(())
}

async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result<usize> {
/// Reads a line and peeks into the readers buffer. Returns the number of
/// bytes read and true the end of the file is reached.
async fn read_line_and_peek<'a>(&mut self, buf: &'a mut String) -> io::Result<(usize, bool)> {
if self.num_bytes_to_skip > 0 {
self.skip().await?;
}
self.reader.read_line(buf).await
let line_size = self.reader.read_line(buf).await?;
if line_size == 0 {
return Ok((0, true));
}
let next_bytes = self.reader.fill_buf().await?;
Ok((line_size, next_bytes.is_empty()))
}
}

Expand Down Expand Up @@ -122,24 +131,27 @@ impl DocFileReader {
/// is reached.
pub async fn next_record(&mut self) -> anyhow::Result<Option<FileRecord>> {
let mut buf = String::new();
let res = self.reader.read_line(&mut buf).await?;
if res == 0 {
let (bytes_read, is_last) = self.reader.read_line_and_peek(&mut buf).await?;
if bytes_read == 0 {
Ok(None)
} else {
self.next_offset += res as u64;
self.next_offset += bytes_read as u64;
Ok(Some(FileRecord {
next_offset: self.next_offset,
doc: Bytes::from(buf),
is_last,
}))
}
}
}

#[async_trait]
pub trait BatchReader: Send {
/// Read a batch from an underlying reader. Marks progress on the provided
/// handle when slow reads are possible.
async fn read_batch(
&mut self,
source_ctx: &SourceContext,
source_progress: &Progress,
source_type: SourceType,
) -> anyhow::Result<BatchBuilder>;

Expand Down Expand Up @@ -188,22 +200,29 @@ impl ObjectUriBatchReader {
impl BatchReader for ObjectUriBatchReader {
async fn read_batch(
&mut self,
source_ctx: &SourceContext,
source_progress: &Progress,
source_type: SourceType,
) -> anyhow::Result<BatchBuilder> {
let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize;
let mut new_offset = self.current_offset;
let mut batch_builder = BatchBuilder::new(source_type);
while new_offset < limit_num_bytes {
if let Some(record) = source_ctx.protect_future(self.reader.next_record()).await? {
if let Some(record) = source_progress
.protect_future(self.reader.next_record())
.await?
{
new_offset = record.next_offset as usize;
batch_builder.add_doc(record.doc);
if record.is_last {
self.is_eof = true;
break;
}
} else {
self.is_eof = true;
break;
}
}
if new_offset > self.current_offset {
if batch_builder.num_bytes > 0 {
let to_position = if self.is_eof {
Position::eof(new_offset)
} else {
Expand Down Expand Up @@ -243,13 +262,13 @@ impl StdinBatchReader {
impl BatchReader for StdinBatchReader {
async fn read_batch(
&mut self,
source_ctx: &SourceContext,
source_progress: &Progress,
source_type: SourceType,
) -> anyhow::Result<BatchBuilder> {
let mut batch_builder = BatchBuilder::new(source_type);
while batch_builder.num_bytes < BATCH_NUM_BYTES_LIMIT {
let mut buf = String::new();
let bytes_read = source_ctx
let bytes_read = source_progress
.protect_future(self.reader.read_line(&mut buf))
.await?;
if bytes_read > 0 {
Expand Down Expand Up @@ -328,11 +347,16 @@ pub mod file_test_helpers {
(file, size)
}

/// Generates a file with increasing padded numbers. Each line is 8 bytes
/// including the newline char.
///
/// 0000000\n0000001\n0000002\n...
pub async fn generate_index_doc_file(gzip: bool, lines: usize) -> NamedTempFile {
assert!(lines < 9999999, "each line is 7 digits + newline");
let mut documents_bytes = Vec::new();
for i in 0..lines {
documents_bytes
.write_all(format!("{i}\n").as_bytes())
.write_all(format!("{:0>7}\n", i).as_bytes())
.unwrap();
}
write_to_tmp(documents_bytes, gzip).await
Expand All @@ -345,6 +369,7 @@ mod tests {
use std::str::FromStr;

use file_test_helpers::generate_index_doc_file;
use quickwit_metastore::checkpoint::SourceCheckpointDelta;

use super::*;

Expand All @@ -354,49 +379,55 @@ mod tests {
// Skip 0 bytes.
let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 0);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
let (bytes_read, eof) = reader.read_line_and_peek(&mut buf).await.unwrap();
assert_eq!(buf, "hello");
assert!(eof);
assert_eq!(bytes_read, 5)
}
{
// Skip 2 bytes.
let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 2);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
let (bytes_read, eof) = reader.read_line_and_peek(&mut buf).await.unwrap();
assert_eq!(buf, "llo");
assert!(eof);
assert_eq!(bytes_read, 3)
}
{
let input = "hello";
let cursor = Cursor::new(input);
let mut reader = SkipReader::new(Box::new(cursor), 5);
let mut buf = String::new();
assert!(reader.read_line(&mut buf).await.is_ok());
let (bytes_read, eof) = reader.read_line_and_peek(&mut buf).await.unwrap();
assert!(eof);
assert_eq!(bytes_read, 0)
}
{
let input = "hello";
let cursor = Cursor::new(input);
let mut reader = SkipReader::new(Box::new(cursor), 10);
let mut buf = String::new();
assert!(reader.read_line(&mut buf).await.is_err());
assert!(reader.read_line_and_peek(&mut buf).await.is_err());
}
{
let input = "hello world".repeat(10000);
let cursor = Cursor::new(input.clone());
let mut reader = SkipReader::new(Box::new(cursor), 64000);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
reader.read_line_and_peek(&mut buf).await.unwrap();
assert_eq!(buf, input[64000..]);
}
{
let input = "hello world".repeat(10000);
let cursor = Cursor::new(input.clone());
let mut reader = SkipReader::new(Box::new(cursor), 64001);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
reader.read_line_and_peek(&mut buf).await.unwrap();
assert_eq!(buf, input[64001..]);
}
}

async fn aux_test_full_read(file: impl AsRef<str>, expected_lines: usize) {
async fn aux_test_full_read_record(file: impl AsRef<str>, expected_lines: usize) {
let storage_resolver = StorageResolver::for_test();
let uri = Uri::from_str(file.as_ref()).unwrap();
let mut doc_reader = DocFileReader::from_uri(&storage_resolver, &uri, 0)
Expand All @@ -410,16 +441,16 @@ mod tests {
}

#[tokio::test]
async fn test_full_read() {
aux_test_full_read("data/test_corpus.json", 4).await;
async fn test_full_read_record() {
aux_test_full_read_record("data/test_corpus.json", 4).await;
}

#[tokio::test]
async fn test_full_read_gz() {
aux_test_full_read("data/test_corpus.json.gz", 4).await;
async fn test_full_read_record_gz() {
aux_test_full_read_record("data/test_corpus.json.gz", 4).await;
}

async fn aux_test_resumed_read(
async fn aux_test_resumed_read_record(
file: impl AsRef<str>,
expected_lines: usize,
stop_at_line: usize,
Expand All @@ -439,7 +470,7 @@ mod tests {
.unwrap()
.expect("EOF happened before stop_at_line");
resume_offset = rec.next_offset as usize;
assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc);
assert_eq!(Bytes::from(format!("{:0>7}\n", parsed_lines)), rec.doc);
parsed_lines += 1;
}
// read the second part of the file
Expand All @@ -448,29 +479,130 @@ mod tests {
.await
.unwrap();
while let Some(rec) = second_part_reader.next_record().await.unwrap() {
assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc);
assert_eq!(Bytes::from(format!("{:0>7}\n", parsed_lines)), rec.doc);
parsed_lines += 1;
}
assert_eq!(parsed_lines, expected_lines);
}

#[tokio::test]
async fn test_resume_read() {
async fn test_resumed_read_record() {
let dummy_doc_file = generate_index_doc_file(false, 1000).await;
let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap();
aux_test_resumed_read(dummy_doc_file_uri, 1000, 1).await;
aux_test_resumed_read(dummy_doc_file_uri, 1000, 40).await;
aux_test_resumed_read(dummy_doc_file_uri, 1000, 999).await;
aux_test_resumed_read(dummy_doc_file_uri, 1000, 1000).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 40).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 999).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1000).await;
}

#[tokio::test]
async fn test_resume_read_gz() {
async fn test_resumed_read_record_gz() {
let dummy_doc_file = generate_index_doc_file(true, 1000).await;
let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap();
aux_test_resumed_read(dummy_doc_file_uri, 1000, 1).await;
aux_test_resumed_read(dummy_doc_file_uri, 1000, 40).await;
aux_test_resumed_read(dummy_doc_file_uri, 1000, 999).await;
aux_test_resumed_read(dummy_doc_file_uri, 1000, 1000).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 40).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 999).await;
aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1000).await;
}

async fn aux_test_full_read_batch(
file: impl AsRef<str>,
expected_lines: usize,
expected_batches: usize,
file_size: usize,
from: Position,
) {
let progress = Progress::default();
let storage_resolver = StorageResolver::for_test();
let uri = Uri::from_str(file.as_ref()).unwrap();
let partition = PartitionId::from("test");
let mut batch_reader =
ObjectUriBatchReader::try_new(&storage_resolver, partition.clone(), &uri, from)
.await
.unwrap();

let mut parsed_lines = 0;
let mut parsed_batches = 0;
let mut checkpoint_delta = SourceCheckpointDelta::default();
while !batch_reader.is_eof() {
let batch = batch_reader
.read_batch(&progress, SourceType::Unspecified)
.await
.unwrap();
parsed_lines += batch.docs.len();
if batch.num_bytes > 0 {
parsed_batches += 1;
}
checkpoint_delta.extend(batch.checkpoint_delta).unwrap();
}
assert_eq!(parsed_lines, expected_lines);
assert_eq!(parsed_batches, expected_batches);
let position = checkpoint_delta
.get_source_checkpoint()
.position_for_partition(&partition)
.unwrap()
.clone();
assert_eq!(position, Position::eof(file_size))
}

#[tokio::test]
async fn test_full_read_single_batch() {
let num_lines = 10;
let dummy_doc_file = generate_index_doc_file(false, num_lines).await;
let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap();
aux_test_full_read_batch(
dummy_doc_file_uri,
num_lines,
1,
num_lines * 8,
Position::Beginning,
)
.await;
}

#[tokio::test]
async fn test_full_read_single_batch_max_size() {
let num_lines = BATCH_NUM_BYTES_LIMIT as usize / 8;
let dummy_doc_file = generate_index_doc_file(false, num_lines).await;
let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap();
aux_test_full_read_batch(
dummy_doc_file_uri,
num_lines,
1,
num_lines * 8,
Position::Beginning,
)
.await;
}

#[tokio::test]
async fn test_full_read_two_batches() {
let num_lines = BATCH_NUM_BYTES_LIMIT as usize / 8 + 10;
let dummy_doc_file = generate_index_doc_file(false, num_lines).await;
let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap();
aux_test_full_read_batch(
dummy_doc_file_uri,
num_lines,
2,
num_lines * 8,
Position::Beginning,
)
.await;
}

#[tokio::test]
async fn test_resume_read_batches() {
let total_num_lines = BATCH_NUM_BYTES_LIMIT as usize / 8 * 3;
let resume_after_lines = total_num_lines / 2;
let dummy_doc_file = generate_index_doc_file(false, total_num_lines).await;
let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap();
aux_test_full_read_batch(
dummy_doc_file_uri,
total_num_lines - resume_after_lines,
2,
total_num_lines * 8,
Position::offset(resume_after_lines * 8),
)
.await;
}
}
Loading

0 comments on commit 3a85586

Please sign in to comment.