Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Include file paths as column from read_parquet/csv/json #2953

Merged
merged 16 commits into from
Oct 11, 2024
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ class ScanOperatorHandle:
storage_config: StorageConfig,
infer_schema: bool,
schema: PySchema | None = None,
file_path_column: str | None = None,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down
3 changes: 3 additions & 0 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def read_csv(
comment: Optional[str] = None,
allow_variable_columns: bool = False,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -54,6 +55,7 @@ def read_csv(
comment (str): Character to treat as the start of a comment line, or None to not support comments
allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.

Expand Down Expand Up @@ -97,5 +99,6 @@ def read_csv(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def read_json(
infer_schema: bool = True,
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -41,6 +42,7 @@ def read_json(
infer_schema (bool): Whether to infer the schema of the JSON, defaults to True.
schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.

Expand Down Expand Up @@ -74,5 +76,6 @@ def read_json(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def read_parquet(
infer_schema: bool = True,
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
Expand All @@ -45,6 +46,7 @@ def read_parquet(
infer_schema (bool): Whether to infer the schema of the Parquet, defaults to True.
schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
Expand Down Expand Up @@ -93,5 +95,6 @@ def read_parquet(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
2 changes: 2 additions & 0 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_tabular_files_scan(
schema: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
file_path_column: str | None = None,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -40,6 +41,7 @@ def get_tabular_files_scan(
storage_config,
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
file_path_column=file_path_column,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
1 change: 1 addition & 0 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn tables_concat(mut tables: Vec<Table>) -> DaftResult<Table> {
)
}

#[allow(clippy::too_many_arguments)]
async fn read_csv_single_into_table(
uri: &str,
convert_options: Option<CsvConvertOptions>,
Expand Down
3 changes: 3 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ impl MicroPartition {
field_id_mapping.clone(),
parquet_metadata,
chunk_size,
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)
}
Expand Down Expand Up @@ -1121,6 +1122,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
parquet_metadata: Option<Vec<Arc<FileMetaData>>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset
&& so > 0
Expand Down Expand Up @@ -1308,6 +1310,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
}),
num_rows,
),
file_path_column.map(|s| s.to_string()),
);

let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());
Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/src/ops/cast_to_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl MicroPartition {
schema,
scan_task.storage_config.clone(),
scan_task.pushdowns.clone(),
scan_task.file_path_column.clone(),
))
};
Ok(Self::new_unloaded(
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@
None,
None,
None,
None,
)
})?;
Ok(mp.into())
Expand Down Expand Up @@ -666,6 +667,7 @@
None,
None,
chunk_size,
None,

Check warning on line 670 in src/daft-micropartition/src/python.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-micropartition/src/python.rs#L670

Added line #L670 was not covered by tests
)
})?;
Ok(mp.into())
Expand Down
65 changes: 49 additions & 16 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PhysicalScanInfo, Pushdowns, ScanOperatorRef,
};
use daft_schema::{
dtype::DataType,
field::Field,
schema::{Schema, SchemaRef},
};
Expand Down Expand Up @@ -146,7 +147,7 @@
io_config: Option<IOConfig>,
multithreaded_io: bool,
) -> DaftResult<Self> {
use daft_scan::storage_config::PyStorageConfig;
use daft_scan::storage_config::{NativeStorageConfig, PyStorageConfig, StorageConfig};

Python::with_gil(|py| {
let io_config = io_config.unwrap_or_default();
Expand Down Expand Up @@ -194,21 +195,45 @@
pushdowns.clone().unwrap_or_default(),
));
// If column selection (projection) pushdown is specified, prune unselected columns from the schema.
let output_schema = if let Some(Pushdowns {
columns: Some(columns),
..
}) = &pushdowns
&& columns.len() < schema.fields.len()
{
let pruned_upstream_schema = schema
.fields
.iter()
.filter(|&(name, _)| columns.contains(name))
.map(|(_, field)| field.clone())
.collect::<Vec<_>>();
Arc::new(Schema::new(pruned_upstream_schema)?)
} else {
schema
// If file path column is specified, add it to the schema.
let output_schema = match (&pushdowns, &scan_operator.0.file_path_column()) {
(
Some(Pushdowns {
columns: Some(columns),
..
}),
file_path_column_opt,
) if columns.len() < schema.fields.len() => {
let pruned_fields = schema
.fields
.iter()
.filter(|(name, _)| columns.contains(name))
.map(|(_, field)| field.clone());

let finalized_fields = match file_path_column_opt {
Some(file_path_column) => pruned_fields
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>(),

Check warning on line 219 in src/daft-plan/src/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/builder.rs#L214-L219

Added lines #L214 - L219 were not covered by tests
None => pruned_fields.collect::<Vec<_>>(),
};
Arc::new(Schema::new(finalized_fields)?)
}
(None, Some(file_path_column)) => {
let schema_with_file_path = schema
.fields
.values()
.cloned()
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>();
Arc::new(Schema::new(schema_with_file_path)?)
}
_ => schema,
};
let logical_plan: LogicalPlan =
logical_ops::Source::new(output_schema, source_info.into()).into();
Expand Down Expand Up @@ -585,6 +610,7 @@
pub io_config: Option<IOConfig>,
pub multithreaded: bool,
pub schema: Option<SchemaRef>,
pub file_path_column: Option<String>,
}

impl ParquetScanBuilder {
Expand All @@ -605,6 +631,7 @@
multithreaded: true,
schema: None,
io_config: None,
file_path_column: None,

Check warning on line 634 in src/daft-plan/src/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/builder.rs#L634

Added line #L634 was not covered by tests
}
}
pub fn infer_schema(mut self, infer_schema: bool) -> Self {
Expand Down Expand Up @@ -642,6 +669,11 @@
self
}

pub fn file_path_column(mut self, file_path_column: String) -> Self {
self.file_path_column = Some(file_path_column);
self
}

Check warning on line 675 in src/daft-plan/src/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/builder.rs#L672-L675

Added lines #L672 - L675 were not covered by tests

pub fn finish(self) -> DaftResult<LogicalPlanBuilder> {
let cfg = ParquetSourceConfig {
coerce_int96_timestamp_unit: self.coerce_int96_timestamp_unit,
Expand All @@ -658,6 +690,7 @@
))),
self.infer_schema,
self.schema,
self.file_path_column,
)?);

LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None)
Expand Down
5 changes: 5 additions & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl ScanOperator for AnonymousScanOperator {
&[]
}

fn file_path_column(&self) -> Option<&str> {
None
}

fn can_absorb_filter(&self) -> bool {
false
}
Expand Down Expand Up @@ -101,6 +105,7 @@ impl ScanOperator for AnonymousScanOperator {
schema.clone(),
storage_config.clone(),
pushdowns.clone(),
None,
)
.into())
},
Expand Down
Loading
Loading