Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add more fields in FileScanTask #609

Merged
merged 1 commit into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ArrowReader {
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
let file_path = task.data_file_path().to_string();
let file_path = task.data_file_path.to_string();

spawn(async move {
Self::process_file_scan_task(
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ArrowReader {
) -> Result<()> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(task.data_file_path())?;
let parquet_file = file_io.new_input(&task.data_file_path)?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
Expand All @@ -187,8 +187,8 @@ impl ArrowReader {
// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = Self::get_arrow_projection_mask(
task.project_field_ids(),
task.schema(),
&task.project_field_ids,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
)?;
Expand All @@ -198,7 +198,7 @@ impl ArrowReader {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

if let Some(predicate) = task.predicate() {
if let Some(predicate) = &task.predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
predicate,
Expand All @@ -218,7 +218,7 @@ impl ArrowReader {
predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
task.schema(),
&task.schema,
)?;

selected_row_groups = Some(result);
Expand Down
76 changes: 41 additions & 35 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema,
SchemaRef, SnapshotRef, TableMetadataRef,
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -529,14 +529,19 @@ impl ManifestEntryContext {
/// created from it
fn into_file_scan_task(self) -> FileScanTask {
FileScanTask {
data_file_path: self.manifest_entry.file_path().to_string(),
start: 0,
length: self.manifest_entry.file_size_in_bytes(),
record_count: Some(self.manifest_entry.record_count()),

data_file_path: self.manifest_entry.file_path().to_string(),
data_file_content: self.manifest_entry.content_type(),
data_file_format: self.manifest_entry.file_format(),

schema: self.snapshot_schema,
project_field_ids: self.field_ids.to_vec(),
predicate: self
.bound_predicates
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
schema: self.snapshot_schema,
}
}
}
Expand Down Expand Up @@ -854,35 +859,30 @@ impl ExpressionEvaluatorCache {
/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {
data_file_path: String,
start: u64,
length: u64,
project_field_ids: Vec<i32>,
/// The start offset of the file to scan.
pub start: u64,
/// The length of the file to scan.
pub length: u64,
/// The number of records in the file to scan.
///
/// This is an optional field, and only available if we are
/// reading the entire data file.
pub record_count: Option<u64>,

/// The data file path corresponding to the task.
pub data_file_path: String,
/// The content type of the file to scan.
pub data_file_content: DataContentType,
/// The format of the file to scan.
pub data_file_format: DataFileFormat,

/// The schema of the file to scan.
pub schema: SchemaRef,
/// The field ids to project.
pub project_field_ids: Vec<i32>,
/// The predicate to filter.
#[serde(skip_serializing_if = "Option::is_none")]
predicate: Option<BoundPredicate>,
schema: SchemaRef,
}

impl FileScanTask {
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
&self.data_file_path
}

/// Returns the project field id of this file scan task.
pub fn project_field_ids(&self) -> &[i32] {
&self.project_field_ids
}

/// Returns the predicate of this file scan task.
pub fn predicate(&self) -> Option<&BoundPredicate> {
self.predicate.as_ref()
}

/// Returns the schema id of this file scan task.
pub fn schema(&self) -> &Schema {
&self.schema
}
pub predicate: Option<BoundPredicate>,
}

#[cfg(test)]
Expand Down Expand Up @@ -1219,17 +1219,17 @@ mod tests {

assert_eq!(tasks.len(), 2);

tasks.sort_by_key(|t| t.data_file_path().to_string());
tasks.sort_by_key(|t| t.data_file_path.to_string());

// Check first task is added data file
assert_eq!(
tasks[0].data_file_path(),
tasks[0].data_file_path,
format!("{}/1.parquet", &fixture.table_location)
);

// Check second task is existing data file
assert_eq!(
tasks[1].data_file_path(),
tasks[1].data_file_path,
format!("{}/3.parquet", &fixture.table_location)
);
}
Expand Down Expand Up @@ -1582,22 +1582,28 @@ mod tests {
);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: None,
schema: schema.clone(),
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
};
test_fn(task);

// with predicate
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: Some(BoundPredicate::AlwaysTrue),
schema,
record_count: None,
data_file_format: DataFileFormat::Avro,
};
test_fn(task);
}
Expand Down
12 changes: 10 additions & 2 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;

use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter};
use bytes::Bytes;
use serde_derive::{Deserialize, Serialize};
use serde_json::to_vec;
use serde_with::{DeserializeFromStr, SerializeDisplay};
use typed_builder::TypedBuilder;

use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
Expand Down Expand Up @@ -866,6 +868,12 @@ impl ManifestEntry {
&self.data_file.file_path
}

/// Data file record count of the manifest entry.
#[inline]
pub fn record_count(&self) -> u64 {
self.data_file.record_count
}

/// Inherit data from manifest list, such as snapshot id, sequence number.
pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
if self.snapshot_id.is_none() {
Expand Down Expand Up @@ -1141,7 +1149,7 @@ impl DataFile {
}
/// Type of content stored by the data file: data, equality deletes, or
/// position deletes (all v1 files are data files)
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum DataContentType {
/// value: 0
Data = 0,
Expand All @@ -1168,7 +1176,7 @@ impl TryFrom<i32> for DataContentType {
}

/// Format of this data.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
pub enum DataFileFormat {
/// Avro file format: <https://avro.apache.org/>
Avro,
Expand Down
Loading