Skip to content

Commit

Permalink
[PERF] native streaming parquet (#1193)
Browse files Browse the repository at this point in the history
* Improves perf of parquet reader by performing pipelining between
compute and IO
* Uses rayon and oneshot channels to fan out expensive work to blocking
threads
  • Loading branch information
samster25 committed Jul 29, 2023
1 parent f95b55e commit bacd70e
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 224 deletions.
84 changes: 84 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
]

[workspace.dependencies]
bytes = "1.4.0"
futures = "0.3.28"
html-escape = "0.2.13"
num-derive = "0.3.3"
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ aws-credential-types = {version = "0.55.3", features = ["hardcoded-credentials"]
aws-sdk-s3 = "0.28.0"
aws-sig-auth = "0.55.3"
aws-sigv4 = "0.55.3"
bytes = "1.4.0"
bytes = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
futures = {workspace = true}
Expand Down
6 changes: 6 additions & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ impl From<Error> for DaftError {
}
}

impl From<Error> for std::io::Error {
fn from(err: Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, err)
}
}

type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Default)]
Expand Down
6 changes: 6 additions & 0 deletions src/daft-parquet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
[dependencies]
arrow2 = {workspace = true, features = ["io_parquet", "io_parquet_compression"]}
async-compat = "0.2.1"
async-stream = "0.3.5"
bytes = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
Expand All @@ -9,8 +12,11 @@ log = {workspace = true}
parquet2 = "0.17.2"
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
rayon = "1.7.0"
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = "0.1.14"
tokio-util = "0.7.8"

[features]
default = ["python"]
Expand Down
Loading

0 comments on commit bacd70e

Please sign in to comment.