-
Notifications
You must be signed in to change notification settings - Fork 163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] add daft connect #3182
[FEAT] add daft connect #3182
Conversation
use crate::convert::expression; | ||
|
||
pub fn to_logical_plan(plan: Relation) -> Result<LogicalPlanBuilder> { | ||
let scope = std::thread::spawn(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to spawn a thread for this?
fn parse_read(read: Read) -> Result<LogicalPlanBuilder> { | ||
let Read { | ||
is_streaming, | ||
read_type, | ||
} = read; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tip: you can do this in the function signature
fn parse_read(read: Read) -> Result<LogicalPlanBuilder> { | |
let Read { | |
is_streaming, | |
read_type, | |
} = read; | |
fn parse_read(Read { | |
is_streaming, | |
read_type, | |
}: Read ) -> Result<LogicalPlanBuilder> |
|
||
use uuid::Uuid; | ||
|
||
pub struct Session { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we know SQL will need a session at some point, it makes sense to have this in a shared crate
|
||
let logical_plan = logical_plan.build(); | ||
|
||
let res = std::thread::spawn(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not tokio::spawn
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's an issue of the code creating its own runtime inside of a runtime. basically us using a jank temp solution.
mod session; | ||
pub mod util; | ||
|
||
pub fn start(addr: &str) -> eyre::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you can start a connection, you should be able to stop it too.
@@ -2,7 +2,7 @@ pub mod catalog; | |||
pub mod error; | |||
pub mod functions; | |||
mod modules; | |||
mod planner; | |||
pub mod planner; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't look like this is used anywhere?
src/spark-connect/Cargo.toml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a README.md
to spark-connect
saying that it's all generated code from the protobufs.
uv.lock
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this intentionally committed?
pub fn map_to_tables<T: Try>( | ||
logical_plan: &LogicalPlanRef, | ||
mut f: impl FnMut(&Table) -> T, | ||
default: impl FnOnce() -> T, | ||
) -> eyre::Result<T> { | ||
let physical_plan = daft_physical_plan::translate(logical_plan)?; | ||
let cfg = Arc::new(DaftExecutionConfig::default()); | ||
let psets = HashMap::new(); | ||
|
||
let stream = daft_local_execution::run::run_local(&physical_plan, psets, cfg, None) | ||
.wrap_err("running local execution")?; | ||
|
||
for elem in stream { | ||
let elem = elem?; | ||
let tables = elem.get_tables()?; | ||
|
||
for table in tables.as_slice() { | ||
if let ControlFlow::Break(x) = f(table).branch() { | ||
return Ok(T::from_residual(x)); | ||
} | ||
} | ||
} | ||
|
||
Ok(default()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear what exactly this table does. Can you add some docstrings and/or comments explaining the intent behind this.
eprintln!("Daft-Connect server error: {e:?}"); | ||
} | ||
|
||
println!("done with runtime"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove println
c08868c
to
68692f9
Compare
68692f9
to
a66e0f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context: Discussed offline with Andrew about how to break up this PR and strategy to incrementally add features
closing this as per @samster25 and my discussion. See #3236 for related PR. |
This change adds a new Spark Connect server implementation in Rust, enabling Daft to communicate with Spark Connect clients. Key additions include:
The commit also includes a small change to
Utf8Array
to usei64
as the default offset type.