Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Store DataFile in FileScanTask instead #607

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ArrowReader {
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
let file_path = task.data_file_path().to_string();
let file_path = task.data_file().file_path.to_string();

spawn(async move {
Self::process_file_scan_task(
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ArrowReader {
) -> Result<()> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(task.data_file_path())?;
let parquet_file = file_io.new_input(&task.data_file().file_path)?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
Expand Down
81 changes: 13 additions & 68 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use serde::{Deserialize, Serialize};

use crate::arrow::ArrowReaderBuilder;
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
Expand All @@ -36,8 +35,8 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema,
SchemaRef, SnapshotRef, TableMetadataRef,
DataContentType, DataFile, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -529,9 +528,7 @@ impl ManifestEntryContext {
/// created from it
fn into_file_scan_task(self) -> FileScanTask {
FileScanTask {
data_file_path: self.manifest_entry.file_path().to_string(),
start: 0,
length: self.manifest_entry.file_size_in_bytes(),
data_file: self.manifest_entry.data_file().clone(),
project_field_ids: self.field_ids.to_vec(),
predicate: self
.bound_predicates
Expand Down Expand Up @@ -852,21 +849,18 @@ impl ExpressionEvaluatorCache {
}

/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct FileScanTask {
data_file_path: String,
start: u64,
length: u64,
data_file: DataFile,
project_field_ids: Vec<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
predicate: Option<BoundPredicate>,
schema: SchemaRef,
}

impl FileScanTask {
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
&self.data_file_path
/// Returns the data file of this file scan task.
pub fn data_file(&self) -> &DataFile {
&self.data_file
}

/// Returns the project field id of this file scan task.
Expand Down Expand Up @@ -902,14 +896,12 @@ mod tests {
use uuid::Uuid;

use crate::arrow::ArrowReaderBuilder;
use crate::expr::{BoundPredicate, Reference};
use crate::expr::Reference;
use crate::io::{FileIO, OutputFile};
use crate::scan::FileScanTask;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
EMPTY_SNAPSHOT_ID,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
};
use crate::table::Table;
use crate::TableIdent;
Expand Down Expand Up @@ -1219,17 +1211,17 @@ mod tests {

assert_eq!(tasks.len(), 2);

tasks.sort_by_key(|t| t.data_file_path().to_string());
tasks.sort_by_key(|t| t.data_file().file_path.to_string());

// Check first task is added data file
assert_eq!(
tasks[0].data_file_path(),
tasks[0].data_file().file_path,
format!("{}/1.parquet", &fixture.table_location)
);

// Check second task is existing data file
assert_eq!(
tasks[1].data_file_path(),
tasks[1].data_file().file_path,
format!("{}/3.parquet", &fixture.table_location)
);
}
Expand Down Expand Up @@ -1554,51 +1546,4 @@ mod tests {
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Apache");
}

#[test]
fn test_file_scan_task_serialize_deserialize() {
let test_fn = |task: FileScanTask| {
let serialized = serde_json::to_string(&task).unwrap();
let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();

assert_eq!(task.data_file_path, deserialized.data_file_path);
assert_eq!(task.start, deserialized.start);
assert_eq!(task.length, deserialized.length);
assert_eq!(task.project_field_ids, deserialized.project_field_ids);
assert_eq!(task.predicate, deserialized.predicate);
assert_eq!(task.schema, deserialized.schema);
};

// without predicate
let schema = Arc::new(
Schema::builder()
.with_fields(vec![Arc::new(NestedField::required(
1,
"x",
Type::Primitive(PrimitiveType::Binary),
))])
.build()
.unwrap(),
);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: None,
schema: schema.clone(),
};
test_fn(task);

// with predicate
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: Some(BoundPredicate::AlwaysTrue),
schema,
};
test_fn(task);
}
}
Loading