From bea5d822c7f1d2d22c6d520ace89114780c203eb Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 14 Jun 2024 07:29:07 +0000 Subject: [PATCH] Address styling comments --- .../src/source/file_source.rs | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index b739317c1af..ac00147bb29 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -21,6 +21,7 @@ use std::borrow::Borrow; use std::fmt; use std::time::Duration; +use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::FileSourceParams; @@ -67,18 +68,17 @@ impl Source for FileSource { let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT; let mut new_offset = self.counters.offset; let mut batch_builder = BatchBuilder::new(SourceType::File); - let is_eof = loop { - if new_offset >= limit_num_bytes { - break false; - } + let mut is_eof = false; + while new_offset < limit_num_bytes { if let Some(record) = ctx.protect_future(self.reader.next_record()).await? { new_offset = record.next_offset; self.counters.num_lines_processed += 1; batch_builder.add_doc(record.doc); } else { - break true; + is_eof = true; + break; } - }; + } if new_offset > self.counters.offset { if let Some(partition_id) = &self.partition_id { batch_builder @@ -122,41 +122,39 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { - let file_source = if let Some(filepath) = ¶ms.filepath { - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); - let checkpoint = source_runtime.fetch_checkpoint().await?; - let offset = checkpoint - .position_for_partition(&partition_id) - .map(|position| { - position - .as_usize() - .expect("file offset should be stored as usize") - }) - .unwrap_or(0); - - let reader = - DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset) - .await?; - - FileSource { - source_id: source_runtime.source_id().to_string(), - reader, - partition_id: Some(partition_id), - counters: FileSourceCounters { - offset: offset as u64, - ..Default::default() - }, - } - } else { - // We cannot use the checkpoint. - FileSource { + let Some(filepath) = ¶ms.filepath else { + return Ok(FileSource { source_id: source_runtime.source_id().to_string(), reader: DocFileReader::from_stdin(), partition_id: None, counters: FileSourceCounters::default(), - } + }); }; - Ok(file_source) + + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); + let checkpoint = source_runtime.fetch_checkpoint().await?; + let offset = checkpoint + .position_for_partition(&partition_id) + .map(|position| { + position + .as_usize() + .context("file offset should be stored as usize") + }) + .transpose()? + .unwrap_or(0); + + let reader = + DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset).await?; + + Ok(FileSource { + source_id: source_runtime.source_id().to_string(), + reader, + partition_id: Some(partition_id), + counters: FileSourceCounters { + offset: offset as u64, + ..Default::default() + }, + }) } }