Skip to content

Commit

Permalink
Fix Gzip file source (#4457)
Browse files Browse the repository at this point in the history
Co-authored-by: François Massot <[email protected]>
  • Loading branch information
rdettai and fmassot authored Jan 29, 2024
1 parent be6ec44 commit 59e15aa
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 19 deletions.
52 changes: 40 additions & 12 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,8 @@ mod tests {

async fn test_indexing_pipeline_num_fails_before_success(
mut num_fails: usize,
) -> anyhow::Result<bool> {
test_file: &str,
) -> anyhow::Result<()> {
let universe = Universe::new();
let mut metastore = MetastoreServiceClient::mock();
metastore
Expand Down Expand Up @@ -696,7 +697,7 @@ mod tests {
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")),
source_params: SourceParams::file(PathBuf::from(test_file)),
transform_config: None,
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -728,23 +729,31 @@ mod tests {
let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await;
assert_eq!(pipeline_statistics.generation, 1);
assert_eq!(pipeline_statistics.num_spawn_attempts, 1 + num_fails);
Ok(pipeline_exit_status.is_success())
assert!(pipeline_exit_status.is_success());
Ok(())
}

#[tokio::test]
async fn test_indexing_pipeline_retry_0() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(0).await?;
Ok(())
test_indexing_pipeline_num_fails_before_success(0, "data/test_corpus.json").await
}

#[tokio::test]
async fn test_indexing_pipeline_retry_1() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(1).await?;
Ok(())
test_indexing_pipeline_num_fails_before_success(1, "data/test_corpus.json").await
}

#[tokio::test]
async fn test_indexing_pipeline_simple() -> anyhow::Result<()> {
async fn test_indexing_pipeline_retry_0_gz() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(0, "data/test_corpus.json.gz").await
}

#[tokio::test]
async fn test_indexing_pipeline_retry_1_gz() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(1, "data/test_corpus.json.gz").await
}

async fn indexing_pipeline_simple(test_file: &str) -> anyhow::Result<()> {
let mut metastore = MetastoreServiceClient::mock();
metastore
.expect_index_metadata()
Expand Down Expand Up @@ -796,7 +805,7 @@ mod tests {
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")),
source_params: SourceParams::file(PathBuf::from(test_file)),
transform_config: None,
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -833,6 +842,16 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_indexing_pipeline_simple() -> anyhow::Result<()> {
indexing_pipeline_simple("data/test_corpus.json").await
}

#[tokio::test]
async fn test_indexing_pipeline_simple_gz() -> anyhow::Result<()> {
indexing_pipeline_simple("data/test_corpus.json.gz").await
}

#[tokio::test]
async fn test_merge_pipeline_does_not_stop_on_indexing_pipeline_failure() {
let mut mock_metastore = MetastoreServiceClient::mock();
Expand Down Expand Up @@ -930,8 +949,7 @@ mod tests {
panic!("Pipeline was apparently not restarted.");
}

#[tokio::test]
async fn test_indexing_pipeline_all_failures_handling() -> anyhow::Result<()> {
async fn indexing_pipeline_all_failures_handling(test_file: &str) -> anyhow::Result<()> {
let mut metastore = MetastoreServiceClient::mock();
metastore
.expect_index_metadata()
Expand Down Expand Up @@ -981,7 +999,7 @@ mod tests {
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")),
source_params: SourceParams::file(PathBuf::from(test_file)),
transform_config: None,
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -1040,4 +1058,14 @@ mod tests {
universe.assert_quit().await;
Ok(())
}

#[tokio::test]
async fn test_indexing_pipeline_all_failures_handling() -> anyhow::Result<()> {
indexing_pipeline_all_failures_handling("data/test_corpus.json").await
}

#[tokio::test]
async fn test_indexing_pipeline_all_failures_handling_gz() -> anyhow::Result<()> {
indexing_pipeline_all_failures_handling("data/test_corpus.json.gz").await
}
}
7 changes: 0 additions & 7 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,6 @@ impl TypedSourceFactory for FileSourceFactory {
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
if offset > file_size {
return Err(anyhow::anyhow!(
"offset {} can't be greater than the file size {}",
offset,
file_size
));
}
// If it's a gzip file, we can't seek to a specific offset, we need to start from the
// beginning of the file, decompress and skip the first `offset` bytes.
if filepath.extension() == Some(OsStr::new("gz")) {
Expand Down

0 comments on commit 59e15aa

Please sign in to comment.