From 1e4ae6e7deb7ea200c5d274a44dff984646abc63 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 12 Jun 2024 21:05:58 +0000 Subject: [PATCH] Remove batch logic from DocFileReader --- .../src/source/doc_file_reader.rs | 259 +++++++----------- .../src/source/file_source.rs | 101 +++++-- 2 files changed, 174 insertions(+), 186 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index 491f940bc8b..a6eb686fc94 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::Borrow; use std::ffi::OsStr; use std::io; use std::path::Path; @@ -26,28 +25,16 @@ use anyhow::Context; use async_compression::tokio::bufread::GzipDecoder; use bytes::Bytes; use quickwit_common::uri::Uri; -use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; -use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::Position; use quickwit_storage::StorageResolver; -use serde::Serialize; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; -use super::{BatchBuilder, SourceContext}; -use crate::models::RawDocBatch; - -/// Number of bytes after which a new batch is cut. -pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64; - -#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] -pub struct DocFileCounters { - pub previous_offset: u64, - pub current_offset: u64, - pub num_lines_processed: u64, +pub struct FileRecord { + pub next_offset: u64, + pub doc: Bytes, } /// A helper wrapper that lets you skip bytes in compressed files where you -/// cannot seek. +/// cannot seek (e.g. gzip files). struct SkipReader { reader: BufReader>, num_bytes_to_skip: usize, @@ -61,10 +48,8 @@ impl SkipReader { } } - // This function is only called for GZIP file. - // Because they cannot be seeked into, we have to scan them to the right initial position. async fn skip(&mut self) -> io::Result<()> { - // Allocate once a 64kb buffer. + // Allocating 64KB once on the stack should be fine (<1% of the Linux stack size) let mut buf = [0u8; 64000]; while self.num_bytes_to_skip > 0 { let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len()); @@ -87,128 +72,62 @@ impl SkipReader { pub struct DocFileReader { reader: SkipReader, - counters: DocFileCounters, - partition_id: Option, -} - -/// Result of a read attempt on a `DocFileReader` -pub struct ReadBatchResponse { - pub batch_opt: Option, - pub is_eof: bool, + next_offset: u64, } impl DocFileReader { pub async fn from_path( - checkpoint: &SourceCheckpoint, - filepath: &Path, storage_resolver: &StorageResolver, + filepath: &Path, + offset: usize, ) -> anyhow::Result { - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); - let offset = checkpoint - .position_for_partition(&partition_id) - .map(|position| { - position - .as_usize() - .expect("file offset should be stored as usize") - }) - .unwrap_or(0); let (dir_uri, file_name) = dir_and_filename(filepath)?; let storage = storage_resolver.resolve(&dir_uri).await?; let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); - // If it's a gzip file, we can't seek to a specific offset, we need to start from the - // beginning of the file, decompress and skip the first `offset` bytes. + // If it's a gzip file, we can't seek to a specific offset. `SkipReader` + // starts from the beginning of the file, decompresses and skips the + // first `offset` bytes. let reader = if filepath.extension() == Some(OsStr::new("gz")) { let stream = storage.get_slice_stream(file_name, 0..file_size).await?; - DocFileReader::new( - Some(partition_id), - Box::new(GzipDecoder::new(BufReader::new(stream))), - offset, - offset, - ) + let decompressed_stream = Box::new(GzipDecoder::new(BufReader::new(stream))); + DocFileReader { + reader: SkipReader::new(decompressed_stream, offset), + next_offset: offset as u64, + } } else { let stream = storage .get_slice_stream(file_name, offset..file_size) .await?; - DocFileReader::new(Some(partition_id), stream, 0, offset) + DocFileReader { + reader: SkipReader::new(stream, 0), + next_offset: offset as u64, + } }; Ok(reader) } pub fn from_stdin() -> Self { - DocFileReader::new(None, Box::new(tokio::io::stdin()), 0, 0) - } - - fn new( - partition_id: Option, - reader: Box, - num_bytes_to_skip: usize, - offset: usize, - ) -> Self { - Self { - reader: SkipReader::new(reader, num_bytes_to_skip), - counters: DocFileCounters { - num_lines_processed: 0, - current_offset: offset as u64, - previous_offset: offset as u64, - }, - partition_id, + DocFileReader { + reader: SkipReader::new(Box::new(tokio::io::stdin()), 0), + next_offset: 0, } } - /// Builds a new record batch from the reader. - /// - /// Sets `is_eof` to true when the underlying reader reaches EOF while - /// building the batch. This helps detecting early that we are done reading - /// the file. The returned `batch_opt` is `None` if the reader is already - /// EOF. - pub async fn read_batch(&mut self, ctx: &SourceContext) -> anyhow::Result { - let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; - let mut reached_eof = false; - let mut batch_builder = BatchBuilder::new(SourceType::File); - - while self.counters.current_offset < limit_num_bytes { - let mut doc_line = String::new(); - // guard the zone in case of slow read, such as reading from someone - // typing to stdin - let num_bytes = ctx - .protect_future(self.reader.read_line(&mut doc_line)) - .await - .map_err(anyhow::Error::from)?; - if num_bytes == 0 { - reached_eof = true; - break; - } - batch_builder.add_doc(Bytes::from(doc_line)); - self.counters.current_offset += num_bytes as u64; - self.counters.num_lines_processed += 1; - } - if !batch_builder.docs.is_empty() { - if let Some(partition_id) = &self.partition_id { - batch_builder - .checkpoint_delta - .record_partition_delta( - partition_id.clone(), - Position::offset(self.counters.previous_offset), - Position::offset(self.counters.current_offset), - ) - .unwrap(); - } - self.counters.previous_offset = self.counters.current_offset; - Ok(ReadBatchResponse { - batch_opt: Some(batch_builder.build()), - is_eof: reached_eof, - }) + /// Reads the next record from the underlying file. Returns `None` when EOF + /// is reached. + pub async fn next_record(&mut self) -> anyhow::Result> { + let mut buf = String::new(); + let res = self.reader.read_line(&mut buf).await?; + if res == 0 { + Ok(None) } else { - Ok(ReadBatchResponse { - batch_opt: None, - is_eof: reached_eof, - }) + self.next_offset += res as u64; + Ok(Some(FileRecord { + next_offset: self.next_offset, + doc: Bytes::from(buf), + })) } } - - pub fn counters(&self) -> &DocFileCounters { - &self.counters - } } pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { @@ -288,10 +207,7 @@ mod tests { use std::io::Cursor; use std::path::PathBuf; - use file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC}; - use quickwit_actors::{ActorContext, Universe}; - use serde_json::json; - use tokio::sync::watch; + use file_test_helpers::generate_index_doc_file; use super::*; @@ -343,68 +259,79 @@ mod tests { } } - fn setup_test_source_ctx() -> SourceContext { - let universe = Universe::with_accelerated_time(); - let (source_mailbox, _source_inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - ActorContext::for_test(&universe, source_mailbox, observable_state_tx) - } - - async fn aux_test_batch_reader( - file: impl Into, - expected_lines: usize, - expected_batches: usize, - ) { - let checkpoint = SourceCheckpoint::default(); + async fn aux_test_full_read(file: impl Into, expected_lines: usize) { let storage_resolver = StorageResolver::for_test(); let file_path = file.into(); - let mut doc_batch_reader = - DocFileReader::from_path(&checkpoint, &file_path, &storage_resolver) - .await - .unwrap(); - let ctx = setup_test_source_ctx(); + let mut doc_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + .await + .unwrap(); let mut parsed_lines = 0; - let mut parsed_batches = 0; - loop { - let ReadBatchResponse { batch_opt, is_eof } = - doc_batch_reader.read_batch(&ctx).await.unwrap(); - if let Some(batch) = batch_opt { - parsed_lines += batch.docs.len(); - parsed_batches += 1; - assert!(parsed_lines > 0); - assert!(parsed_lines <= expected_lines); - } else { - assert!(is_eof); - } - if is_eof { - break; - } + while doc_reader.next_record().await.unwrap().is_some() { + parsed_lines += 1; } assert_eq!(parsed_lines, expected_lines); - assert_eq!(parsed_batches, expected_batches); } #[tokio::test] - async fn test_batch_reader_small() { - aux_test_batch_reader("data/test_corpus.json", 4, 1).await; + async fn test_full_read() { + aux_test_full_read("data/test_corpus.json", 4).await; } #[tokio::test] - async fn test_batch_reader_small_gz() { - aux_test_batch_reader("data/test_corpus.json.gz", 4, 1).await; + async fn test_full_read_gz() { + aux_test_full_read("data/test_corpus.json.gz", 4).await; + } + + async fn aux_test_resumed_read( + file: impl Into, + expected_lines: usize, + stop_at_line: usize, + ) { + let storage_resolver = StorageResolver::for_test(); + let file_path = file.into(); + // read the first part of the file + let mut first_part_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + .await + .unwrap(); + let mut resume_offset = 0; + let mut parsed_lines = 0; + for _ in 0..stop_at_line { + let rec = first_part_reader + .next_record() + .await + .unwrap() + .expect("EOF happened before stop_at_line"); + resume_offset = rec.next_offset as usize; + assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + parsed_lines += 1; + } + // read the second part of the file + let mut second_part_reader = + DocFileReader::from_path(&storage_resolver, &file_path, resume_offset) + .await + .unwrap(); + while let Some(rec) = second_part_reader.next_record().await.unwrap() { + assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + parsed_lines += 1; + } + assert_eq!(parsed_lines, expected_lines); } #[tokio::test] - async fn test_read_multiple_batches() { - let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10; - let dummy_doc_file = generate_dummy_doc_file(false, lines).await; - aux_test_batch_reader(dummy_doc_file.path(), lines, 2).await; + async fn test_resume_read() { + let dummy_doc_file = generate_index_doc_file(false, 1000).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; } #[tokio::test] - async fn test_read_multiple_batches_gz() { - let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10; - let dummy_doc_file_gz = generate_dummy_doc_file(true, lines).await; - aux_test_batch_reader(dummy_doc_file_gz.path(), lines, 2).await; + async fn test_resume_read_gz() { + let dummy_doc_file = generate_index_doc_file(true, 1000).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index b008b427759..b739317c1af 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -17,22 +17,38 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::borrow::Borrow; use std::fmt; use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::FileSourceParams; -use quickwit_proto::types::SourceId; +use quickwit_metastore::checkpoint::PartitionId; +use quickwit_proto::metastore::SourceType; +use quickwit_proto::types::{Position, SourceId}; +use serde::Serialize; use tracing::info; -use super::doc_file_reader::{DocFileReader, ReadBatchResponse}; +use super::doc_file_reader::DocFileReader; +use super::BatchBuilder; use crate::actors::DocProcessor; use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; +/// Number of bytes after which a new batch is cut. +pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000; + +#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] +pub struct FileSourceCounters { + pub offset: u64, + pub num_lines_processed: usize, +} + pub struct FileSource { source_id: SourceId, reader: DocFileReader, + partition_id: Option, + counters: FileSourceCounters, } impl fmt::Debug for FileSource { @@ -48,9 +64,35 @@ impl Source for FileSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - let ReadBatchResponse { batch_opt, is_eof } = self.reader.read_batch(ctx).await?; - if let Some(batch) = batch_opt { - ctx.send_message(doc_processor_mailbox, batch).await?; + let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT; + let mut new_offset = self.counters.offset; + let mut batch_builder = BatchBuilder::new(SourceType::File); + let is_eof = loop { + if new_offset >= limit_num_bytes { + break false; + } + if let Some(record) = ctx.protect_future(self.reader.next_record()).await? { + new_offset = record.next_offset; + self.counters.num_lines_processed += 1; + batch_builder.add_doc(record.doc); + } else { + break true; + } + }; + if new_offset > self.counters.offset { + if let Some(partition_id) = &self.partition_id { + batch_builder + .checkpoint_delta + .record_partition_delta( + partition_id.clone(), + Position::offset(self.counters.offset), + Position::offset(new_offset), + ) + .unwrap(); + } + ctx.send_message(doc_processor_mailbox, batch_builder.build()) + .await?; + self.counters.offset = new_offset; } if is_eof { info!("reached end of file"); @@ -65,7 +107,7 @@ impl Source for FileSource { } fn observable_state(&self) -> serde_json::Value { - serde_json::to_value(self.reader.counters()).unwrap() + serde_json::to_value(&self.counters).unwrap() } } @@ -80,17 +122,39 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { - let reader: DocFileReader = if let Some(filepath) = ¶ms.filepath { + let file_source = if let Some(filepath) = ¶ms.filepath { + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); let checkpoint = source_runtime.fetch_checkpoint().await?; - DocFileReader::from_path(&checkpoint, filepath, &source_runtime.storage_resolver) - .await? + let offset = checkpoint + .position_for_partition(&partition_id) + .map(|position| { + position + .as_usize() + .expect("file offset should be stored as usize") + }) + .unwrap_or(0); + + let reader = + DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset) + .await?; + + FileSource { + source_id: source_runtime.source_id().to_string(), + reader, + partition_id: Some(partition_id), + counters: FileSourceCounters { + offset: offset as u64, + ..Default::default() + }, + } } else { // We cannot use the checkpoint. - DocFileReader::from_stdin() - }; - let file_source = FileSource { - source_id: source_runtime.source_id().to_string(), - reader, + FileSource { + source_id: source_runtime.source_id().to_string(), + reader: DocFileReader::from_stdin(), + partition_id: None, + counters: FileSourceCounters::default(), + } }; Ok(file_source) } @@ -152,8 +216,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 1030u64, - "current_offset": 1030u64, + "offset": 1030u64, "num_lines_processed": 4u32 }) ); @@ -208,8 +271,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 700_000u64, - "current_offset": 700_000u64, + "offset": 700_000u64, "num_lines_processed": 20_000u64 }) ); @@ -292,8 +354,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 290u64, - "current_offset": 290u64, + "offset": 290u64, "num_lines_processed": 98u64 }) );