Skip to content

Commit

Permalink
Address styling comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 14, 2024
1 parent 3555d3a commit 94f94ee
Showing 1 changed file with 34 additions and 36 deletions.
70 changes: 34 additions & 36 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::borrow::Borrow;
use std::fmt;
use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_config::FileSourceParams;
Expand Down Expand Up @@ -67,18 +68,17 @@ impl Source for FileSource {
let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT;
let mut new_offset = self.counters.offset;
let mut batch_builder = BatchBuilder::new(SourceType::File);
let is_eof = loop {
if new_offset >= limit_num_bytes {
break false;
}
let mut is_eof = false;
while new_offset < limit_num_bytes {
if let Some(record) = ctx.protect_future(self.reader.next_record()).await? {
new_offset = record.next_offset;
self.counters.num_lines_processed += 1;
batch_builder.add_doc(record.doc);
} else {
break true;
is_eof = true;
break;
}
};
}
if new_offset > self.counters.offset {
if let Some(partition_id) = &self.partition_id {
batch_builder
Expand Down Expand Up @@ -122,41 +122,39 @@ impl TypedSourceFactory for FileSourceFactory {
source_runtime: SourceRuntime,
params: FileSourceParams,
) -> anyhow::Result<FileSource> {
let file_source = if let Some(filepath) = &params.filepath {
let partition_id = PartitionId::from(filepath.to_string_lossy().borrow());
let checkpoint = source_runtime.fetch_checkpoint().await?;
let offset = checkpoint
.position_for_partition(&partition_id)
.map(|position| {
position
.as_usize()
.expect("file offset should be stored as usize")
})
.unwrap_or(0);

let reader =
DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset)
.await?;

FileSource {
source_id: source_runtime.source_id().to_string(),
reader,
partition_id: Some(partition_id),
counters: FileSourceCounters {
offset: offset as u64,
..Default::default()
},
}
} else {
// We cannot use the checkpoint.
FileSource {
let Some(filepath) = &params.filepath else {
return Ok(FileSource {
source_id: source_runtime.source_id().to_string(),
reader: DocFileReader::from_stdin(),
partition_id: None,
counters: FileSourceCounters::default(),
}
});
};
Ok(file_source)

let partition_id = PartitionId::from(filepath.to_string_lossy().borrow());
let checkpoint = source_runtime.fetch_checkpoint().await?;
let offset = checkpoint
.position_for_partition(&partition_id)
.map(|position| {
position
.as_usize()
.context("file offset should be stored as usize")
})
.transpose()?
.unwrap_or(0);

let reader =
DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset).await?;

Ok(FileSource {
source_id: source_runtime.source_id().to_string(),
reader,
partition_id: Some(partition_id),
counters: FileSourceCounters {
offset: offset as u64,
..Default::default()
},
})
}
}

Expand Down

0 comments on commit 94f94ee

Please sign in to comment.