Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] add daft connect #3182

Closed
wants to merge 1 commit into from
Closed

Conversation

andrewgazelka
Copy link
Member

@andrewgazelka andrewgazelka commented Nov 5, 2024

This change adds a new Spark Connect server implementation in Rust, enabling Daft to communicate with Spark Connect clients. Key additions include:

  • Session management for handling client connections and query execution
  • Command handling for executing plans, analyzing plans, and managing configs
  • Data conversion utilities for translating between Spark Connect and Daft types
  • Support for range, showString, and other basic Spark operations
  • Config and user context management infrastructure
  • Full protobuf-based RPC service implementation

The commit also includes a small change to Utf8Array to use i64 as the default offset type.

@andrewgazelka andrewgazelka changed the title [FEATURE] add daft connect [FEAT] add daft connect Nov 5, 2024
@github-actions github-actions bot added the enhancement New feature or request label Nov 5, 2024
use crate::convert::expression;

pub fn to_logical_plan(plan: Relation) -> Result<LogicalPlanBuilder> {
let scope = std::thread::spawn(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to spawn a thread for this?

Comment on lines +90 to +94
fn parse_read(read: Read) -> Result<LogicalPlanBuilder> {
let Read {
is_streaming,
read_type,
} = read;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip: you can do this in the function signature

Suggested change
fn parse_read(read: Read) -> Result<LogicalPlanBuilder> {
let Read {
is_streaming,
read_type,
} = read;
fn parse_read(Read {
is_streaming,
read_type,
}: Read ) -> Result<LogicalPlanBuilder>


use uuid::Uuid;

pub struct Session {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we know SQL will need a session at some point, it makes sense to have this in a shared crate


let logical_plan = logical_plan.build();

let res = std::thread::spawn(move || {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not tokio::spawn instead?

Copy link
Member Author

@andrewgazelka andrewgazelka Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an issue of the code creating its own runtime inside of a runtime. basically us using a jank temp solution.

mod session;
pub mod util;

pub fn start(addr: &str) -> eyre::Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you can start a connection, you should be able to stop it too.

@@ -2,7 +2,7 @@ pub mod catalog;
pub mod error;
pub mod functions;
mod modules;
mod planner;
pub mod planner;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't look like this is used anywhere?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a README.md to spark-connect saying that it's all generated code from the protobufs.

uv.lock Outdated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this intentionally committed?

Comment on lines +21 to +45
pub fn map_to_tables<T: Try>(
logical_plan: &LogicalPlanRef,
mut f: impl FnMut(&Table) -> T,
default: impl FnOnce() -> T,
) -> eyre::Result<T> {
let physical_plan = daft_physical_plan::translate(logical_plan)?;
let cfg = Arc::new(DaftExecutionConfig::default());
let psets = HashMap::new();

let stream = daft_local_execution::run::run_local(&physical_plan, psets, cfg, None)
.wrap_err("running local execution")?;

for elem in stream {
let elem = elem?;
let tables = elem.get_tables()?;

for table in tables.as_slice() {
if let ControlFlow::Break(x) = f(table).branch() {
return Ok(T::from_residual(x));
}
}
}

Ok(default())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear what exactly this table does. Can you add some docstrings and/or comments explaining the intent behind this.

eprintln!("Daft-Connect server error: {e:?}");
}

println!("done with runtime");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove println

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context: Discussed offline with Andrew about how to break up this PR and strategy to incrementally add features

@andrewgazelka
Copy link
Member Author

closing this as per @samster25 and my discussion. See #3236 for related PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants