Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] parquet reader refactor, add parquet_stats_reader and parquet_schema_reader (1/2) #1191

Merged
merged 13 commits into from
Jul 27, 2023
10 changes: 9 additions & 1 deletion daft/logical/schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import annotations

from typing import Iterator
from typing import TYPE_CHECKING, Iterator

from daft.daft import PyField as _PyField
from daft.daft import PySchema as _PySchema
from daft.daft import read_parquet_schema as _read_parquet_schema
from daft.datatype import DataType

if TYPE_CHECKING:
from daft.io import IOConfig

Check warning on line 11 in daft/logical/schema.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/schema.py#L11

Added line #L11 was not covered by tests


class Field:
_field: _PyField
Expand Down Expand Up @@ -113,3 +117,7 @@
def __setstate__(self, state: bytes) -> None:
self._schema = _PySchema.__new__(_PySchema)
self._schema.__setstate__(state)

@classmethod
def from_parquet(cls, path: str, io_config: IOConfig | None = None) -> Schema:
return Schema._from_pyschema(_read_parquet_schema(uri=path, io_config=io_config))

Check warning on line 123 in daft/logical/schema.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/schema.py#L123

Added line #L123 was not covered by tests
18 changes: 15 additions & 3 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from daft.arrow_utils import ensure_table
from daft.daft import PyTable as _PyTable
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_statistics as _read_parquet_statistics
from daft.datatype import DataType
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
Expand Down Expand Up @@ -348,10 +349,21 @@
cls,
path: str,
columns: list[str] | None = None,
row_groups: list[int] | None = None,
file_size: None | int = None,
start_offset: int | None = None,
num_rows: int | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

io_config: IOConfig | None = None,
) -> Table:
return Table._from_pytable(
_read_parquet(uri=path, columns=columns, row_groups=row_groups, size=file_size, io_config=io_config)
_read_parquet(uri=path, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config)
)

@classmethod
def read_parquet_statistics(
cls,
paths: Series | list[str],
io_config: IOConfig | None = None,
) -> Table:
if not isinstance(paths, Series):
paths = Series.from_pylist(paths)

Check warning on line 367 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L366-L367

Added lines #L366 - L367 were not covered by tests

return Table._from_pytable(_read_parquet_statistics(uris=paths._series, io_config=io_config))

Check warning on line 369 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L369

Added line #L369 was not covered by tests
304 changes: 304 additions & 0 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
use std::{collections::HashSet, sync::Arc};

use arrow2::io::parquet::read::infer_schema;
use common_error::DaftResult;
use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_io::IOClient;
use daft_table::Table;
use parquet2::read::{BasicDecompressor, PageReader};
use snafu::ResultExt;

use crate::{
metadata::read_parquet_metadata,
read_planner::{self, CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass},
UnableToConvertParquetPagesToArrowSnafu, UnableToOpenFileSnafu,
UnableToParseSchemaFromMetadataSnafu,
};
use arrow2::io::parquet::read::column_iter_to_arrays;

pub(crate) struct ParquetReaderBuilder {
uri: String,
metadata: parquet2::metadata::FileMetaData,
arrow_schema: arrow2::datatypes::Schema,
selected_columns: Option<HashSet<String>>,
row_start_offset: usize,
num_rows: usize,
}

impl ParquetReaderBuilder {
pub async fn from_uri(uri: &str, io_client: Arc<daft_io::IOClient>) -> super::Result<Self> {
// TODO(sammy): We actually don't need this since we can do negative offsets when reading the metadata
let size = io_client
.single_url_get_size(uri.into())
.await
.context(UnableToOpenFileSnafu::<String> { path: uri.into() })?;

let metadata = read_parquet_metadata(uri, size, io_client).await?;
let num_rows = metadata.num_rows;
let schema =
infer_schema(&metadata)
.context(UnableToParseSchemaFromMetadataSnafu::<String> { path: uri.into() })?;
Ok(ParquetReaderBuilder {
uri: uri.into(),
metadata,
arrow_schema: schema,
selected_columns: None,
row_start_offset: 0,
num_rows,
})
}

pub fn metadata(&self) -> &parquet2::metadata::FileMetaData {
&self.metadata
}

pub fn arrow_schema(&self) -> &arrow2::datatypes::Schema {
&self.arrow_schema
}

pub fn prune_columns(mut self, columns: &[&str]) -> super::Result<Self> {
let avail_names = self
.arrow_schema
.fields
.iter()
.map(|f| f.name.as_str())
.collect::<HashSet<_>>();
let mut names_to_keep = HashSet::new();
for col_name in columns {
if avail_names.contains(col_name) {
names_to_keep.insert(col_name.to_string());
} else {
return Err(super::Error::FieldNotFound {
field: col_name.to_string(),
available_fields: avail_names.iter().map(|v| v.to_string()).collect(),
path: self.uri,
});
}
}
self.selected_columns = Some(names_to_keep);
Ok(self)
}

pub fn limit(
mut self,
start_offset: Option<usize>,
num_rows: Option<usize>,
) -> super::Result<Self> {
let start_offset = start_offset.unwrap_or(0);
let num_rows = num_rows.unwrap_or(self.metadata.num_rows.saturating_sub(start_offset));
self.row_start_offset = start_offset;
self.num_rows = num_rows;
Ok(self)
}

pub fn build(mut self) -> super::Result<ParquetFileReader> {
let mut row_ranges = vec![];

let mut curr_row_index = 0;
let mut rows_to_add = self.num_rows;

for (i, rg) in self.metadata.row_groups.iter().enumerate() {
if (curr_row_index + rg.num_rows()) < self.row_start_offset {
curr_row_index += rg.num_rows();
continue;
} else if rows_to_add > 0 {
let range_to_add = RowGroupRange {
row_group_index: i,
start: self.row_start_offset.saturating_sub(curr_row_index),
num_rows: rg.num_rows().min(rows_to_add),
};
rows_to_add = rows_to_add.saturating_sub(rg.num_rows().min(rows_to_add));
row_ranges.push(range_to_add);
} else {
break;
}
curr_row_index += rg.num_rows();
}

if let Some(names_to_keep) = self.selected_columns {
self.arrow_schema
.fields
.retain(|f| names_to_keep.contains(f.name.as_str()));
}

ParquetFileReader::new(self.uri, self.metadata, self.arrow_schema, row_ranges)
}
}

struct RowGroupRange {
row_group_index: usize,
start: usize,
num_rows: usize,
}

pub(crate) struct ParquetFileReader {
uri: String,
metadata: parquet2::metadata::FileMetaData,
arrow_schema: arrow2::datatypes::Schema,
row_ranges: Vec<RowGroupRange>,
}

impl ParquetFileReader {
fn new(
uri: String,
metadata: parquet2::metadata::FileMetaData,
arrow_schema: arrow2::datatypes::Schema,
row_ranges: Vec<RowGroupRange>,
) -> super::Result<Self> {
Ok(ParquetFileReader {
uri,
metadata,
arrow_schema,
row_ranges,
})
}

fn naive_read_plan(&self) -> super::Result<ReadPlanner> {
let arrow_fields = &self.arrow_schema.fields;

let mut read_planner = ReadPlanner::new(&self.uri);

for row_group_range in self.row_ranges.iter() {
let rg = self
.metadata
.row_groups
.get(row_group_range.row_group_index)
.unwrap();

let columns = rg.columns();
for field in arrow_fields.iter() {
let field_name = field.name.clone();
let filtered_cols = columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect::<Vec<_>>();

for col in filtered_cols {
let (start, len) = col.byte_range();
let end = start + len;

read_planner.add_range(start as usize, end as usize);
}
}
}

Ok(read_planner)
}

pub async fn prebuffer_ranges(&self, io_client: Arc<IOClient>) -> DaftResult<RangesContainer> {
let mut read_planner = self.naive_read_plan()?;
// TODO(sammy) these values should be populated by io_client
read_planner.add_pass(Box::new(SplitLargeRequestPass {
max_request_size: 16 * 1024 * 1024,
split_threshold: 24 * 1024 * 1024,
}));

read_planner.add_pass(Box::new(CoalescePass {
max_hole_size: 1024 * 1024,
max_request_size: 16 * 1024 * 1024,
}));

read_planner.run_passes()?;
read_planner.collect(io_client).await
}

pub fn read_from_ranges(self, ranges: RangesContainer) -> DaftResult<Table> {
let all_series = self
.arrow_schema
.fields
.iter()
.map(|field| {
let field_series = self
.row_ranges
.iter()
.map(|row_range| {
let rg = self
.metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds");
let columns = rg.columns();
let field_name = &field.name;
let filtered_cols = columns
.iter()
.filter(|x| &x.descriptor().path_in_schema[0] == field_name)
.collect::<Vec<_>>();

let mut decompressed_iters = Vec::with_capacity(filtered_cols.len());
let mut ptypes = Vec::with_capacity(filtered_cols.len());

for col in filtered_cols {
let (start, len) = col.byte_range();
let end = start + len;

// should stream this instead
let range_reader: read_planner::MultiRead<'_> =
ranges.get_range_reader(start as usize..end as usize)?;
let pages = PageReader::new(
range_reader,
col,
Arc::new(|_, _| true),
vec![],
4 * 1024 * 1024,
);

decompressed_iters.push(BasicDecompressor::new(pages, vec![]));

ptypes.push(&col.descriptor().descriptor.primitive_type);
}

let arr_iter = column_iter_to_arrays(
decompressed_iters,
ptypes,
field.clone(),
Some(4096),
rg.num_rows().min(row_range.start + row_range.num_rows),
)
.context(
UnableToConvertParquetPagesToArrowSnafu::<String> {
path: self.uri.clone(),
},
)?;

let mut all_arrays = vec![];

let mut curr_index = 0;

for arr in arr_iter {
let arr = arr?;

if (curr_index + arr.len()) < row_range.start {
// throw arrays less than what we need
curr_index += arr.len();
continue;
} else if curr_index < row_range.start {
let offset = row_range.start.saturating_sub(curr_index);
all_arrays.push(arr.sliced(offset, arr.len() - offset));
curr_index += arr.len();
} else {
curr_index += arr.len();
all_arrays.push(arr);
}
}

all_arrays
.into_iter()
.map(|a| {
Series::try_from((
field.name.as_str(),
cast_array_for_daft_if_needed(a),
))
})
.collect::<DaftResult<Vec<Series>>>()
})
.collect::<DaftResult<Vec<_>>>()?;

Series::concat(&field_series.iter().flatten().collect::<Vec<_>>())
})
.collect::<DaftResult<Vec<_>>>()?;

let daft_schema = daft_core::schema::Schema::try_from(&self.arrow_schema)?;

Table::new(daft_schema, all_series)
}
}
Loading
Loading