-
Notifications
You must be signed in to change notification settings - Fork 163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] daft-connect support for parquet #3236
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #3236 will degrade performances by 26.63%Comparing Summary
Benchmarks breakdown
|
28933de
to
afefdce
Compare
src/daft-connect/src/convert.rs
Outdated
pub fn run_local<T: Try>( | ||
logical_plan: &LogicalPlanRef, | ||
mut f: impl FnMut(&Table) -> T, | ||
default: impl FnOnce() -> T, | ||
) -> eyre::Result<T> { | ||
let physical_plan = daft_physical_plan::translate(logical_plan)?; | ||
let cfg = Arc::new(DaftExecutionConfig::default()); | ||
let psets = HashMap::new(); | ||
|
||
let stream = daft_local_execution::run_local(&physical_plan, psets, cfg, None) | ||
.wrap_err("running local execution")?; | ||
|
||
for elem in stream { | ||
let elem = elem?; | ||
let tables = elem.get_tables()?; | ||
|
||
for table in tables.as_slice() { | ||
if let ControlFlow::Break(x) = f(table).branch() { | ||
return Ok(T::from_residual(x)); | ||
} | ||
} | ||
} | ||
|
||
Ok(default()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just return a stream of Table
instead of this callback pattern? This seems overly complex to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree the pattern being used right now is not ideal. I think it's being used because I actually tried to implement a stream pattern, but there are issues. In particular, you're consuming values as they're created, rather than calling .next
on a stream to gather values. It seemed like there might have been a need for cloning or something. I'm not really sure, but I'm going to re-examine it because I agree it's quite complex and quite ugly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think if we created an async version of run_local, it would solve a lot of the problems you're running into.
It would make it natively a stream instead of converting the iter into one and possible collecting intermediate values.
It would also remove the issue of spawning a runtime inside of a runtime.
eef5b8a
to
b6dc86e
Compare
7d453be
to
82719f3
Compare
Co-authored-by: Cory Grinstead <[email protected]>
82719f3
to
4c009bf
Compare
Closes #3216