From 3a855860c9daae409b1b8fa900cbf94dadd90b72 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 3 Jul 2024 12:36:59 +0200 Subject: [PATCH] Fix reader edge case --- .../src/source/doc_file_reader.rs | 206 ++++++++++++++---- .../src/source/file_source.rs | 14 +- .../src/source/queue_sources/coordinator.rs | 7 +- .../src/source/queue_sources/shared_state.rs | 15 +- 4 files changed, 186 insertions(+), 56 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index ea020449793..572ff0ff445 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -25,17 +25,19 @@ use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use bytes::Bytes; use quickwit_common::uri::Uri; +use quickwit_common::Progress; use quickwit_metastore::checkpoint::PartitionId; use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use quickwit_storage::StorageResolver; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; -use super::{BatchBuilder, SourceContext, BATCH_NUM_BYTES_LIMIT}; +use super::{BatchBuilder, BATCH_NUM_BYTES_LIMIT}; pub struct FileRecord { pub next_offset: u64, pub doc: Bytes, + pub is_last: bool, } /// A helper wrapper that lets you skip bytes in compressed files where you @@ -67,11 +69,18 @@ impl SkipReader { Ok(()) } - async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result { + /// Reads a line and peeks into the readers buffer. Returns the number of + /// bytes read and true the end of the file is reached. + async fn read_line_and_peek<'a>(&mut self, buf: &'a mut String) -> io::Result<(usize, bool)> { if self.num_bytes_to_skip > 0 { self.skip().await?; } - self.reader.read_line(buf).await + let line_size = self.reader.read_line(buf).await?; + if line_size == 0 { + return Ok((0, true)); + } + let next_bytes = self.reader.fill_buf().await?; + Ok((line_size, next_bytes.is_empty())) } } @@ -122,14 +131,15 @@ impl DocFileReader { /// is reached. pub async fn next_record(&mut self) -> anyhow::Result> { let mut buf = String::new(); - let res = self.reader.read_line(&mut buf).await?; - if res == 0 { + let (bytes_read, is_last) = self.reader.read_line_and_peek(&mut buf).await?; + if bytes_read == 0 { Ok(None) } else { - self.next_offset += res as u64; + self.next_offset += bytes_read as u64; Ok(Some(FileRecord { next_offset: self.next_offset, doc: Bytes::from(buf), + is_last, })) } } @@ -137,9 +147,11 @@ impl DocFileReader { #[async_trait] pub trait BatchReader: Send { + /// Read a batch from an underlying reader. Marks progress on the provided + /// handle when slow reads are possible. async fn read_batch( &mut self, - source_ctx: &SourceContext, + source_progress: &Progress, source_type: SourceType, ) -> anyhow::Result; @@ -188,22 +200,29 @@ impl ObjectUriBatchReader { impl BatchReader for ObjectUriBatchReader { async fn read_batch( &mut self, - source_ctx: &SourceContext, + source_progress: &Progress, source_type: SourceType, ) -> anyhow::Result { let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize; let mut new_offset = self.current_offset; let mut batch_builder = BatchBuilder::new(source_type); while new_offset < limit_num_bytes { - if let Some(record) = source_ctx.protect_future(self.reader.next_record()).await? { + if let Some(record) = source_progress + .protect_future(self.reader.next_record()) + .await? + { new_offset = record.next_offset as usize; batch_builder.add_doc(record.doc); + if record.is_last { + self.is_eof = true; + break; + } } else { self.is_eof = true; break; } } - if new_offset > self.current_offset { + if batch_builder.num_bytes > 0 { let to_position = if self.is_eof { Position::eof(new_offset) } else { @@ -243,13 +262,13 @@ impl StdinBatchReader { impl BatchReader for StdinBatchReader { async fn read_batch( &mut self, - source_ctx: &SourceContext, + source_progress: &Progress, source_type: SourceType, ) -> anyhow::Result { let mut batch_builder = BatchBuilder::new(source_type); while batch_builder.num_bytes < BATCH_NUM_BYTES_LIMIT { let mut buf = String::new(); - let bytes_read = source_ctx + let bytes_read = source_progress .protect_future(self.reader.read_line(&mut buf)) .await?; if bytes_read > 0 { @@ -328,11 +347,16 @@ pub mod file_test_helpers { (file, size) } + /// Generates a file with increasing padded numbers. Each line is 8 bytes + /// including the newline char. + /// + /// 0000000\n0000001\n0000002\n... pub async fn generate_index_doc_file(gzip: bool, lines: usize) -> NamedTempFile { + assert!(lines < 9999999, "each line is 7 digits + newline"); let mut documents_bytes = Vec::new(); for i in 0..lines { documents_bytes - .write_all(format!("{i}\n").as_bytes()) + .write_all(format!("{:0>7}\n", i).as_bytes()) .unwrap(); } write_to_tmp(documents_bytes, gzip).await @@ -345,6 +369,7 @@ mod tests { use std::str::FromStr; use file_test_helpers::generate_index_doc_file; + use quickwit_metastore::checkpoint::SourceCheckpointDelta; use super::*; @@ -354,36 +379,42 @@ mod tests { // Skip 0 bytes. let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 0); let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); + let (bytes_read, eof) = reader.read_line_and_peek(&mut buf).await.unwrap(); assert_eq!(buf, "hello"); + assert!(eof); + assert_eq!(bytes_read, 5) } { // Skip 2 bytes. let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 2); let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); + let (bytes_read, eof) = reader.read_line_and_peek(&mut buf).await.unwrap(); assert_eq!(buf, "llo"); + assert!(eof); + assert_eq!(bytes_read, 3) } { let input = "hello"; let cursor = Cursor::new(input); let mut reader = SkipReader::new(Box::new(cursor), 5); let mut buf = String::new(); - assert!(reader.read_line(&mut buf).await.is_ok()); + let (bytes_read, eof) = reader.read_line_and_peek(&mut buf).await.unwrap(); + assert!(eof); + assert_eq!(bytes_read, 0) } { let input = "hello"; let cursor = Cursor::new(input); let mut reader = SkipReader::new(Box::new(cursor), 10); let mut buf = String::new(); - assert!(reader.read_line(&mut buf).await.is_err()); + assert!(reader.read_line_and_peek(&mut buf).await.is_err()); } { let input = "hello world".repeat(10000); let cursor = Cursor::new(input.clone()); let mut reader = SkipReader::new(Box::new(cursor), 64000); let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); + reader.read_line_and_peek(&mut buf).await.unwrap(); assert_eq!(buf, input[64000..]); } { @@ -391,12 +422,12 @@ mod tests { let cursor = Cursor::new(input.clone()); let mut reader = SkipReader::new(Box::new(cursor), 64001); let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); + reader.read_line_and_peek(&mut buf).await.unwrap(); assert_eq!(buf, input[64001..]); } } - async fn aux_test_full_read(file: impl AsRef, expected_lines: usize) { + async fn aux_test_full_read_record(file: impl AsRef, expected_lines: usize) { let storage_resolver = StorageResolver::for_test(); let uri = Uri::from_str(file.as_ref()).unwrap(); let mut doc_reader = DocFileReader::from_uri(&storage_resolver, &uri, 0) @@ -410,16 +441,16 @@ mod tests { } #[tokio::test] - async fn test_full_read() { - aux_test_full_read("data/test_corpus.json", 4).await; + async fn test_full_read_record() { + aux_test_full_read_record("data/test_corpus.json", 4).await; } #[tokio::test] - async fn test_full_read_gz() { - aux_test_full_read("data/test_corpus.json.gz", 4).await; + async fn test_full_read_record_gz() { + aux_test_full_read_record("data/test_corpus.json.gz", 4).await; } - async fn aux_test_resumed_read( + async fn aux_test_resumed_read_record( file: impl AsRef, expected_lines: usize, stop_at_line: usize, @@ -439,7 +470,7 @@ mod tests { .unwrap() .expect("EOF happened before stop_at_line"); resume_offset = rec.next_offset as usize; - assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + assert_eq!(Bytes::from(format!("{:0>7}\n", parsed_lines)), rec.doc); parsed_lines += 1; } // read the second part of the file @@ -448,29 +479,130 @@ mod tests { .await .unwrap(); while let Some(rec) = second_part_reader.next_record().await.unwrap() { - assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + assert_eq!(Bytes::from(format!("{:0>7}\n", parsed_lines)), rec.doc); parsed_lines += 1; } assert_eq!(parsed_lines, expected_lines); } #[tokio::test] - async fn test_resume_read() { + async fn test_resumed_read_record() { let dummy_doc_file = generate_index_doc_file(false, 1000).await; let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); - aux_test_resumed_read(dummy_doc_file_uri, 1000, 1).await; - aux_test_resumed_read(dummy_doc_file_uri, 1000, 40).await; - aux_test_resumed_read(dummy_doc_file_uri, 1000, 999).await; - aux_test_resumed_read(dummy_doc_file_uri, 1000, 1000).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 40).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 999).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1000).await; } #[tokio::test] - async fn test_resume_read_gz() { + async fn test_resumed_read_record_gz() { let dummy_doc_file = generate_index_doc_file(true, 1000).await; let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); - aux_test_resumed_read(dummy_doc_file_uri, 1000, 1).await; - aux_test_resumed_read(dummy_doc_file_uri, 1000, 40).await; - aux_test_resumed_read(dummy_doc_file_uri, 1000, 999).await; - aux_test_resumed_read(dummy_doc_file_uri, 1000, 1000).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 40).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 999).await; + aux_test_resumed_read_record(dummy_doc_file_uri, 1000, 1000).await; + } + + async fn aux_test_full_read_batch( + file: impl AsRef, + expected_lines: usize, + expected_batches: usize, + file_size: usize, + from: Position, + ) { + let progress = Progress::default(); + let storage_resolver = StorageResolver::for_test(); + let uri = Uri::from_str(file.as_ref()).unwrap(); + let partition = PartitionId::from("test"); + let mut batch_reader = + ObjectUriBatchReader::try_new(&storage_resolver, partition.clone(), &uri, from) + .await + .unwrap(); + + let mut parsed_lines = 0; + let mut parsed_batches = 0; + let mut checkpoint_delta = SourceCheckpointDelta::default(); + while !batch_reader.is_eof() { + let batch = batch_reader + .read_batch(&progress, SourceType::Unspecified) + .await + .unwrap(); + parsed_lines += batch.docs.len(); + if batch.num_bytes > 0 { + parsed_batches += 1; + } + checkpoint_delta.extend(batch.checkpoint_delta).unwrap(); + } + assert_eq!(parsed_lines, expected_lines); + assert_eq!(parsed_batches, expected_batches); + let position = checkpoint_delta + .get_source_checkpoint() + .position_for_partition(&partition) + .unwrap() + .clone(); + assert_eq!(position, Position::eof(file_size)) + } + + #[tokio::test] + async fn test_full_read_single_batch() { + let num_lines = 10; + let dummy_doc_file = generate_index_doc_file(false, num_lines).await; + let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); + aux_test_full_read_batch( + dummy_doc_file_uri, + num_lines, + 1, + num_lines * 8, + Position::Beginning, + ) + .await; + } + + #[tokio::test] + async fn test_full_read_single_batch_max_size() { + let num_lines = BATCH_NUM_BYTES_LIMIT as usize / 8; + let dummy_doc_file = generate_index_doc_file(false, num_lines).await; + let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); + aux_test_full_read_batch( + dummy_doc_file_uri, + num_lines, + 1, + num_lines * 8, + Position::Beginning, + ) + .await; + } + + #[tokio::test] + async fn test_full_read_two_batches() { + let num_lines = BATCH_NUM_BYTES_LIMIT as usize / 8 + 10; + let dummy_doc_file = generate_index_doc_file(false, num_lines).await; + let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); + aux_test_full_read_batch( + dummy_doc_file_uri, + num_lines, + 2, + num_lines * 8, + Position::Beginning, + ) + .await; + } + + #[tokio::test] + async fn test_resume_read_batches() { + let total_num_lines = BATCH_NUM_BYTES_LIMIT as usize / 8 * 3; + let resume_after_lines = total_num_lines / 2; + let dummy_doc_file = generate_index_doc_file(false, total_num_lines).await; + let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); + aux_test_full_read_batch( + dummy_doc_file_uri, + total_num_lines - resume_after_lines, + 2, + total_num_lines * 8, + Position::offset(resume_after_lines * 8), + ) + .await; } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index b05d9473d4f..b94b0ba18a0 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -88,7 +88,7 @@ impl Source for FileSource { num_bytes_processed, num_lines_processed, } => { - let batch_builder = reader.read_batch(ctx, self.source_type).await?; + let batch_builder = reader.read_batch(ctx.progress(), self.source_type).await?; if batch_builder.num_bytes > 0 { *num_bytes_processed += batch_builder.num_bytes; *num_lines_processed += batch_builder.docs.len() as u64; @@ -214,6 +214,7 @@ mod tests { use std::num::NonZeroUsize; use std::str::FromStr; + use bytes::Bytes; use quickwit_actors::{Command, Universe}; use quickwit_common::uri::Uri; use quickwit_config::{FileSourceUri, SourceConfig, SourceInputFormat, SourceParams}; @@ -374,7 +375,7 @@ mod tests { let source_checkpoint_delta = SourceCheckpointDelta::from_partition_delta( partition_id, Position::Beginning, - Position::offset(4u64), + Position::offset(16usize), ) .unwrap(); @@ -398,12 +399,15 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "num_bytes_processed": 286u64, - "num_lines_processed": 98u64 + "num_bytes_processed": (800-16) as u64, + "num_lines_processed": (100-2) as u64, }) ); let indexer_messages: Vec = doc_processor_inbox.drain_for_test_typed(); - assert!(&indexer_messages[0].docs[0].starts_with(b"2\n")); + assert_eq!( + indexer_messages[0].docs[0], + Bytes::from_static(b"0000002\n") + ); } } diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index 3b393cc5465..c22b1dfce2c 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -224,7 +224,7 @@ impl QueueCoordinator { // TODO: should we kill the publish lock if the message visibility extension fails? let batch_builder = in_progress_ref .reader - .read_batch(ctx, self.source_type) + .read_batch(ctx.progress(), self.source_type) .await?; if batch_builder.num_bytes > 0 { self.observable_state.num_lines_processed += batch_builder.docs.len() as u64; @@ -452,10 +452,7 @@ mod tests { "test-index", &[( partition_id.clone(), - ( - "existing_token".to_string(), - Position::Eof(Some(file_size.into())), - ), + ("existing_token".to_string(), Position::eof(file_size)), )], ); let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone()); diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs index d8f386c84b9..cda7409972c 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs @@ -292,10 +292,7 @@ mod tests { async fn test_acquire_shards_with_completed() { let index_id = "test-sqs-index"; let index_uid = IndexUid::new_with_random_ulid(index_id); - let init_state = &[( - "p1".into(), - ("token2".to_string(), Position::Eof(Some(100usize.into()))), - )]; + let init_state = &[("p1".into(), ("token2".to_string(), Position::eof(100usize)))]; let metastore = mock_metastore(init_state, Some(1), Some(0)); let shared_state = QueueSharedState { @@ -308,7 +305,7 @@ mod tests { .acquire_partitions("token1", vec!["p1".into(), "p2".into()]) .await .unwrap(); - assert!(aquired.contains(&("p1".into(), Position::Eof(Some(100usize.into()))))); + assert!(aquired.contains(&("p1".into(), Position::eof(100usize)))); assert!(aquired.contains(&("p2".into(), Position::Beginning))); } @@ -318,7 +315,7 @@ mod tests { let index_uid = IndexUid::new_with_random_ulid(index_id); let init_state = &[( "p1".into(), - ("token2".to_string(), Position::Offset(100usize.into())), + ("token2".to_string(), Position::offset(100usize)), )]; let metastore = mock_metastore(init_state, Some(1), Some(1)); @@ -334,7 +331,7 @@ mod tests { .unwrap(); // TODO: this test should fail once we implement the grace // period before a partition can be re-acquired - assert!(aquired.contains(&("p1".into(), Position::Offset(100usize.into())))); + assert!(aquired.contains(&("p1".into(), Position::offset(100usize)))); assert!(aquired.contains(&("p2".into(), Position::Beginning))); } @@ -349,7 +346,7 @@ mod tests { let init_state = &[( completed_partition_id.clone(), - ("token2".to_string(), Position::Eof(Some(100usize.into()))), + ("token2".to_string(), Position::eof(100usize)), )]; let metastore = mock_metastore(init_state, Some(1), Some(0)); let shared_state = QueueSharedState { @@ -366,7 +363,7 @@ mod tests { .iter() .find(|(msg, _)| msg.partition_id() == completed_partition_id) .unwrap(); - assert_eq!(completed_msg.1, Position::Eof(Some(100usize.into()))); + assert_eq!(completed_msg.1, Position::eof(100usize)); let new_msg = checkpointed_msg .iter() .find(|(msg, _)| msg.partition_id() == new_partition_id)