Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] daft-connect support for parquet #3236

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

andrewgazelka
Copy link
Member

@andrewgazelka andrewgazelka commented Nov 6, 2024

Closes #3216

@andrewgazelka andrewgazelka marked this pull request as draft November 6, 2024 22:02
@github-actions github-actions bot added the enhancement New feature or request label Nov 6, 2024
@andrewgazelka andrewgazelka changed the base branch from main to andrew/spark-connect November 6, 2024 22:03
Copy link

codspeed-hq bot commented Nov 6, 2024

CodSpeed Performance Report

Merging #3236 will degrade performances by 26.63%

Comparing andrew/daft-connect-parquet (7d453be) with main (16f5a8c)

Summary

⚡ 2 improvements
❌ 1 regressions
✅ 14 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main andrew/daft-connect-parquet Change
test_count[1 Small File] 3.8 ms 3.4 ms +11.78%
test_iter_rows_first_row[100 Small Files] 302.5 ms 265.1 ms +14.1%
test_show[100 Small Files] 23.7 ms 32.3 ms -26.63%

Comment on lines 21 to 45
pub fn run_local<T: Try>(
logical_plan: &LogicalPlanRef,
mut f: impl FnMut(&Table) -> T,
default: impl FnOnce() -> T,
) -> eyre::Result<T> {
let physical_plan = daft_physical_plan::translate(logical_plan)?;
let cfg = Arc::new(DaftExecutionConfig::default());
let psets = HashMap::new();

let stream = daft_local_execution::run_local(&physical_plan, psets, cfg, None)
.wrap_err("running local execution")?;

for elem in stream {
let elem = elem?;
let tables = elem.get_tables()?;

for table in tables.as_slice() {
if let ControlFlow::Break(x) = f(table).branch() {
return Ok(T::from_residual(x));
}
}
}

Ok(default())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just return a stream of Table instead of this callback pattern? This seems overly complex to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree the pattern being used right now is not ideal. I think it's being used because I actually tried to implement a stream pattern, but there are issues. In particular, you're consuming values as they're created, rather than calling .next on a stream to gather values. It seemed like there might have been a need for cloning or something. I'm not really sure, but I'm going to re-examine it because I agree it's quite complex and quite ugly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think if we created an async version of run_local, it would solve a lot of the problems you're running into.

It would make it natively a stream instead of converting the iter into one and possible collecting intermediate values.

It would also remove the issue of spawning a runtime inside of a runtime.

@andrewgazelka andrewgazelka force-pushed the andrew/daft-connect-parquet branch 2 times, most recently from eef5b8a to b6dc86e Compare November 8, 2024 18:12
Base automatically changed from andrew/spark-connect to main November 8, 2024 19:36
@andrewgazelka andrewgazelka force-pushed the andrew/daft-connect-parquet branch 7 times, most recently from 7d453be to 82719f3 Compare November 12, 2024 22:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Daft-Connect] Implement Parquet Operations
2 participants