Skip to content

Commit

Permalink
Remove batch logic from DocFileReader
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 12, 2024
1 parent 2b0822c commit 46208bf
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 185 deletions.
257 changes: 92 additions & 165 deletions quickwit/quickwit-indexing/src/source/doc_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Borrow;
use std::ffi::OsStr;
use std::io;
use std::path::Path;
Expand All @@ -26,28 +25,16 @@ use anyhow::Context;
use async_compression::tokio::bufread::GzipDecoder;
use bytes::Bytes;
use quickwit_common::uri::Uri;
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::Position;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};

use super::{BatchBuilder, SourceContext};
use crate::models::RawDocBatch;

/// Number of bytes after which a new batch is cut.
pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64;

#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)]
pub struct DocFileCounters {
pub previous_offset: u64,
pub current_offset: u64,
pub num_lines_processed: u64,
pub struct FileRecord {
pub next_offset: u64,
pub doc: Bytes,
}

/// A helper wrapper that lets you skip bytes in compressed files where you
/// cannot seek.
/// cannot seek (e.g. gzip files).
struct SkipReader {
reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
num_bytes_to_skip: usize,
Expand All @@ -61,10 +48,8 @@ impl SkipReader {
}
}

// This function is only called for GZIP file.
// Because they cannot be seeked into, we have to scan them to the right initial position.
async fn skip(&mut self) -> io::Result<()> {
// Allocate once a 64kb buffer.
// Allocating 64KB once on the stack should be fine (<1% of the Linux stack size)
let mut buf = [0u8; 64000];
while self.num_bytes_to_skip > 0 {
let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len());
Expand All @@ -87,128 +72,62 @@ impl SkipReader {

pub struct DocFileReader {
reader: SkipReader,
counters: DocFileCounters,
partition_id: Option<PartitionId>,
}

/// Result of a read attempt on a `DocFileReader`
pub struct ReadBatchResponse {
pub batch_opt: Option<RawDocBatch>,
pub is_eof: bool,
next_offset: u64,
}

impl DocFileReader {
pub async fn from_path(
checkpoint: &SourceCheckpoint,
filepath: &Path,
storage_resolver: &StorageResolver,
filepath: &Path,
offset: usize,
) -> anyhow::Result<Self> {
let partition_id = PartitionId::from(filepath.to_string_lossy().borrow());
let offset = checkpoint
.position_for_partition(&partition_id)
.map(|position| {
position
.as_usize()
.expect("file offset should be stored as usize")
})
.unwrap_or(0);
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
// If it's a gzip file, we can't seek to a specific offset, we need to start from the
// beginning of the file, decompress and skip the first `offset` bytes.
// If it's a gzip file, we can't seek to a specific offset. `SkipReader`
// starts from the beginning of the file, decompresses and skips the
// first `offset` bytes.
let reader = if filepath.extension() == Some(OsStr::new("gz")) {
let stream = storage.get_slice_stream(file_name, 0..file_size).await?;
DocFileReader::new(
Some(partition_id),
Box::new(GzipDecoder::new(BufReader::new(stream))),
offset,
offset,
)
let decompressed_stream = Box::new(GzipDecoder::new(BufReader::new(stream)));
DocFileReader {
reader: SkipReader::new(decompressed_stream, offset),
next_offset: offset as u64,
}
} else {
let stream = storage
.get_slice_stream(file_name, offset..file_size)
.await?;
DocFileReader::new(Some(partition_id), stream, 0, offset)
DocFileReader {
reader: SkipReader::new(stream, 0),
next_offset: offset as u64,
}
};
Ok(reader)
}

pub fn from_stdin() -> Self {
DocFileReader::new(None, Box::new(tokio::io::stdin()), 0, 0)
}

fn new(
partition_id: Option<PartitionId>,
reader: Box<dyn AsyncRead + Send + Unpin>,
num_bytes_to_skip: usize,
offset: usize,
) -> Self {
Self {
reader: SkipReader::new(reader, num_bytes_to_skip),
counters: DocFileCounters {
num_lines_processed: 0,
current_offset: offset as u64,
previous_offset: offset as u64,
},
partition_id,
DocFileReader {
reader: SkipReader::new(Box::new(tokio::io::stdin()), 0),
next_offset: 0,
}
}

/// Builds a new record batch from the reader.
///
/// Sets `is_eof` to true when the underlying reader reaches EOF while
/// building the batch. This helps detecting early that we are done reading
/// the file. The returned `batch_opt` is `None` if the reader is already
/// EOF.
pub async fn read_batch(&mut self, ctx: &SourceContext) -> anyhow::Result<ReadBatchResponse> {
let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT;
let mut reached_eof = false;
let mut batch_builder = BatchBuilder::new(SourceType::File);

while self.counters.current_offset < limit_num_bytes {
let mut doc_line = String::new();
// guard the zone in case of slow read, such as reading from someone
// typing to stdin
let num_bytes = ctx
.protect_future(self.reader.read_line(&mut doc_line))
.await
.map_err(anyhow::Error::from)?;
if num_bytes == 0 {
reached_eof = true;
break;
}
batch_builder.add_doc(Bytes::from(doc_line));
self.counters.current_offset += num_bytes as u64;
self.counters.num_lines_processed += 1;
}
if !batch_builder.docs.is_empty() {
if let Some(partition_id) = &self.partition_id {
batch_builder
.checkpoint_delta
.record_partition_delta(
partition_id.clone(),
Position::offset(self.counters.previous_offset),
Position::offset(self.counters.current_offset),
)
.unwrap();
}
self.counters.previous_offset = self.counters.current_offset;
Ok(ReadBatchResponse {
batch_opt: Some(batch_builder.build()),
is_eof: reached_eof,
})
/// Reads the next record from the underlying file. Returns `None` when EOF
/// is reached.
pub async fn next_record(&mut self) -> anyhow::Result<Option<FileRecord>> {
let mut buf = String::new();
let res = self.reader.read_line(&mut buf).await?;
if res == 0 {
Ok(None)
} else {
Ok(ReadBatchResponse {
batch_opt: None,
is_eof: reached_eof,
})
self.next_offset += res as u64;
Ok(Some(FileRecord {
next_offset: self.next_offset,
doc: Bytes::from(buf),
}))
}
}

pub fn counters(&self) -> &DocFileCounters {
&self.counters
}
}

pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
Expand Down Expand Up @@ -288,10 +207,7 @@ mod tests {
use std::io::Cursor;
use std::path::PathBuf;

use file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC};
use quickwit_actors::{ActorContext, Universe};
use serde_json::json;
use tokio::sync::watch;
use file_test_helpers::generate_index_doc_file;

use super::*;

Expand Down Expand Up @@ -343,68 +259,79 @@ mod tests {
}
}

fn setup_test_source_ctx() -> SourceContext {
let universe = Universe::with_accelerated_time();
let (source_mailbox, _source_inbox) = universe.create_test_mailbox();
let (observable_state_tx, _observable_state_rx) = watch::channel(json!({}));
ActorContext::for_test(&universe, source_mailbox, observable_state_tx)
}

async fn aux_test_batch_reader(
file: impl Into<PathBuf>,
expected_lines: usize,
expected_batches: usize,
) {
let checkpoint = SourceCheckpoint::default();
async fn aux_test_full_read(file: impl Into<PathBuf>, expected_lines: usize) {
let storage_resolver = StorageResolver::for_test();
let file_path = file.into();
let mut doc_batch_reader =
DocFileReader::from_path(&checkpoint, &file_path, &storage_resolver)
.await
.unwrap();
let ctx = setup_test_source_ctx();
let mut doc_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0)
.await
.unwrap();
let mut parsed_lines = 0;
let mut parsed_batches = 0;
loop {
let ReadBatchResponse { batch_opt, is_eof } =
doc_batch_reader.read_batch(&ctx).await.unwrap();
if let Some(batch) = batch_opt {
parsed_lines += batch.docs.len();
parsed_batches += 1;
assert!(parsed_lines > 0);
assert!(parsed_lines <= expected_lines);
} else {
assert!(is_eof);
}
if is_eof {
break;
}
while let Some(_) = doc_reader.next_record().await.unwrap() {
parsed_lines += 1;
}
assert_eq!(parsed_lines, expected_lines);
assert_eq!(parsed_batches, expected_batches);
}

#[tokio::test]
async fn test_batch_reader_small() {
aux_test_batch_reader("data/test_corpus.json", 4, 1).await;
async fn test_full_read() {
aux_test_full_read("data/test_corpus.json", 4).await;
}

#[tokio::test]
async fn test_batch_reader_small_gz() {
aux_test_batch_reader("data/test_corpus.json.gz", 4, 1).await;
async fn test_full_read_gz() {
aux_test_full_read("data/test_corpus.json.gz", 4).await;
}

async fn aux_test_resumed_read(
file: impl Into<PathBuf>,
expected_lines: usize,
stop_at_line: usize,
) {
let storage_resolver = StorageResolver::for_test();
let file_path = file.into();
// read the first part of the file
let mut first_part_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0)
.await
.unwrap();
let mut resume_offset = 0;
let mut parsed_lines = 0;
for _ in 0..stop_at_line {
let rec = first_part_reader
.next_record()
.await
.unwrap()
.expect("EOF happened before stop_at_line");
resume_offset = rec.next_offset as usize;
assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc);
parsed_lines += 1;
}
// read the second part of the file
let mut second_part_reader =
DocFileReader::from_path(&storage_resolver, &file_path, resume_offset)
.await
.unwrap();
while let Some(rec) = second_part_reader.next_record().await.unwrap() {
assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc);
parsed_lines += 1;
}
assert_eq!(parsed_lines, expected_lines);
}

#[tokio::test]
async fn test_read_multiple_batches() {
let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10;
let dummy_doc_file = generate_dummy_doc_file(false, lines).await;
aux_test_batch_reader(dummy_doc_file.path(), lines, 2).await;
async fn test_resume_read() {
let dummy_doc_file = generate_index_doc_file(false, 1000).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await;
}

#[tokio::test]
async fn test_read_multiple_batches_gz() {
let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10;
let dummy_doc_file_gz = generate_dummy_doc_file(true, lines).await;
aux_test_batch_reader(dummy_doc_file_gz.path(), lines, 2).await;
let dummy_doc_file = generate_index_doc_file(true, 1000).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await;
aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await;
}
}
Loading

0 comments on commit 46208bf

Please sign in to comment.