diff --git a/Cargo.lock b/Cargo.lock index c0b0bef72a..fd62c251cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1788,6 +1788,7 @@ name = "daft-local-execution" version = "0.2.0-dev0" dependencies = [ "async-stream", + "async-trait", "common-error", "common-tracing", "daft-core", @@ -1802,7 +1803,6 @@ dependencies = [ "daft-scan", "daft-stats", "daft-table", - "dyn-clone", "futures", "lazy_static", "pyo3", @@ -1880,6 +1880,7 @@ dependencies = [ "daft-dsl", "daft-plan", "daft-scan", + "log", "strum 0.26.2", ] diff --git a/src/daft-core/src/array/growable/mod.rs b/src/daft-core/src/array/growable/mod.rs index 812f9d4f8c..80b4250a08 100644 --- a/src/daft-core/src/array/growable/mod.rs +++ b/src/daft-core/src/array/growable/mod.rs @@ -43,7 +43,7 @@ use crate::datatypes::PythonArray; /// * `capacity` - Helps pre-allocate memory to the [`Growable`] by providing a capacity up-front. Note that variable-length types /// such as [`ListArray`] only understands this as the "top-level" capacity, but the capacity of nested children arrays cannot be specified /// through this [`make_growable`] API. Instead, you may wish to instantiate and use the [`nested_growable::ListGrowable`] directly if -/// this is important to your use0-case. +/// this is important to your use-case. pub fn make_growable<'a>( name: &str, dtype: &DataType, diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 4ac61ccfc0..509b73a0ee 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -1,5 +1,6 @@ [dependencies] async-stream = {workspace = true} +async-trait = {workspace = true} common-error = {path = "../common/error", default-features = false} common-tracing = {path = "../common/tracing", default-features = false} daft-core = {path = "../daft-core", default-features = false} @@ -14,7 +15,6 @@ daft-plan = {path = "../daft-plan", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} -dyn-clone = {workspace = true} futures = {workspace = true} lazy_static = {workspace = true} pyo3 = {workspace = true, optional = true} diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index f3d09dc5ae..0d3a5d980c 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -1,23 +1,35 @@ -use std::sync::Arc; +use std::{env, sync::Arc}; use common_error::DaftResult; use daft_micropartition::MicroPartition; use tracing::{info_span, instrument}; +use async_trait::async_trait; + use crate::{ channel::{ create_channel, create_single_channel, spawn_compute_task, MultiReceiver, MultiSender, SingleReceiver, SingleSender, }, + pipeline::PipelineNode, DEFAULT_MORSEL_SIZE, NUM_CPUS, }; -pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync { +pub trait IntermediateOperator: Send + Sync { fn execute(&self, input: &Arc) -> DaftResult>; + #[allow(dead_code)] + fn name(&self) -> &'static str; } -dyn_clone::clone_trait_object!(IntermediateOperator); +/// The number of rows that will trigger an intermediate operator to output its data. +#[allow(dead_code)] +fn get_output_threshold() -> usize { + env::var("OUTPUT_THRESHOLD") + .unwrap_or_else(|_| "1000".to_string()) + .parse() + .expect("OUTPUT_THRESHOLD must be a number") +} /// State of an operator task, used to buffer data and output it when a threshold is reached. pub struct OperatorTaskState { @@ -55,6 +67,10 @@ impl OperatorTaskState { if self.buffer.is_empty() { return None; } + assert!( + !self.buffer.is_empty(), + "We can not run concat with no data" + ); let concated = MicroPartition::concat(&self.buffer.iter().map(|x| x.as_ref()).collect::>()) .map(Arc::new); @@ -70,12 +86,12 @@ impl OperatorTaskState { pub struct IntermediateOpActor { sender: MultiSender, receiver: MultiReceiver, - op: Box, + op: Arc, } impl IntermediateOpActor { pub fn new( - op: Box, + op: Arc, receiver: MultiReceiver, sender: MultiSender, ) -> Self { @@ -91,7 +107,7 @@ impl IntermediateOpActor { async fn run_single( mut receiver: SingleReceiver, sender: SingleSender, - op: Box, + op: Arc, ) -> DaftResult<()> { let mut state = OperatorTaskState::new(); let span = info_span!("IntermediateOp::execute"); @@ -139,11 +155,51 @@ impl IntermediateOpActor { } } -pub fn run_intermediate_op(op: Box, send_to: MultiSender) -> MultiSender { - let (sender, receiver) = create_channel(*NUM_CPUS, send_to.in_order()); - let mut actor = IntermediateOpActor::new(op, receiver, send_to); - tokio::spawn(async move { - let _ = actor.run_parallel().await; - }); - sender +pub(crate) struct IntermediateNode { + intermediate_op: Arc, + children: Vec>, +} + +impl IntermediateNode { + pub(crate) fn new( + intermediate_op: Arc, + children: Vec>, + ) -> Self { + IntermediateNode { + intermediate_op, + children, + } + } + pub(crate) fn boxed(self) -> Box { + Box::new(self) + } +} + +#[async_trait] +impl PipelineNode for IntermediateNode { + fn children(&self) -> Vec<&dyn PipelineNode> { + self.children.iter().map(|v| v.as_ref()).collect() + } + + async fn start(&mut self, destination: MultiSender) -> DaftResult<()> { + assert_eq!( + self.children.len(), + 1, + "we only support 1 child for Intermediate Node for now" + ); + + let (sender, receiver) = create_channel(*NUM_CPUS, destination.in_order()); + + let child = self + .children + .get_mut(0) + .expect("we should only have 1 child"); + child.start(sender).await?; + + let mut actor = + IntermediateOpActor::new(self.intermediate_op.clone(), receiver, destination); + // this should ideally be in the actor + spawn_compute_task(async move { actor.run_parallel().await }); + Ok(()) + } } diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index f9b3f6896c..913186ab81 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(let_chains)] mod channel; mod intermediate_ops; mod pipeline; diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 8ba4586337..5dec192650 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -1,142 +1,81 @@ use std::{collections::HashMap, sync::Arc}; -use daft_dsl::Expr; -use daft_micropartition::MicroPartition; -use daft_physical_plan::{ - Concat, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, - Project, Sort, UnGroupedAggregate, -}; -use daft_plan::populate_aggregation_stages; - use crate::{ - channel::MultiSender, intermediate_ops::{ - aggregate::AggregateOperator, - filter::FilterOperator, - intermediate_op::{run_intermediate_op, IntermediateOperator}, + aggregate::AggregateOperator, filter::FilterOperator, intermediate_op::IntermediateNode, project::ProjectOperator, }, sinks::{ aggregate::AggregateSink, - concat::ConcatSink, - hash_join::HashJoinSink, + blocking_sink::BlockingSinkNode, + hash_join::{HashJoinNode, HashJoinOperator}, limit::LimitSink, - sink::{run_double_input_sink, run_single_input_sink, DoubleInputSink, SingleInputSink}, sort::SortSink, + streaming_sink::StreamingSinkNode, }, - sources::{ - in_memory::InMemorySource, - scan_task::ScanTaskSource, - source::{run_source, Source}, - }, + sources::in_memory::InMemorySource, }; -pub enum PipelineNode { - Source { - source: Arc, - }, - IntermediateOp { - intermediate_op: Box, - child: Box, - }, - SingleInputSink { - sink: Box, - child: Box, - }, - DoubleInputSink { - sink: Box, - left_child: Box, - right_child: Box, - }, -} +use async_trait::async_trait; +use common_error::DaftResult; +use daft_dsl::Expr; +use daft_micropartition::MicroPartition; +use daft_physical_plan::{ + Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project, Sort, + UnGroupedAggregate, +}; +use daft_plan::populate_aggregation_stages; -impl PipelineNode { - pub fn start(self, sender: MultiSender) { - match self { - PipelineNode::Source { source } => { - run_source(source.clone(), sender); - } - PipelineNode::IntermediateOp { - intermediate_op, - child, - } => { - let sender = run_intermediate_op(intermediate_op.clone(), sender); - child.start(sender); - } - PipelineNode::SingleInputSink { sink, child } => { - let sender = run_single_input_sink(sink, sender); - child.start(sender); - } - PipelineNode::DoubleInputSink { - sink, - left_child, - right_child, - } => { - let (left_sender, right_sender) = run_double_input_sink(sink, sender); - left_child.start(left_sender); - right_child.start(right_sender); - } - } - } +use crate::channel::MultiSender; + +#[async_trait] +pub trait PipelineNode: Sync + Send { + fn children(&self) -> Vec<&dyn PipelineNode>; + async fn start(&mut self, destination: MultiSender) -> DaftResult<()>; } pub fn physical_plan_to_pipeline( physical_plan: &LocalPhysicalPlan, psets: &HashMap>>, -) -> PipelineNode { - match physical_plan { +) -> DaftResult> { + use crate::sources::scan_task::ScanTaskSource; + use daft_physical_plan::PhysicalScan; + let out: Box = match physical_plan { LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => { let scan_task_source = ScanTaskSource::new(scan_tasks.clone()); - PipelineNode::Source { - source: Arc::new(scan_task_source), - } + scan_task_source.boxed().into() } LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { let partitions = psets.get(&info.cache_key).expect("Cache key not found"); - let in_memory_source = InMemorySource::new(partitions.clone()); - PipelineNode::Source { - source: Arc::new(in_memory_source), - } + InMemorySource::new(partitions.clone()).boxed().into() } LocalPhysicalPlan::Project(Project { input, projection, .. }) => { let proj_op = ProjectOperator::new(projection.clone()); - let child_node = physical_plan_to_pipeline(input, psets); - PipelineNode::IntermediateOp { - intermediate_op: Box::new(proj_op), - child: Box::new(child_node), - } + let child_node = physical_plan_to_pipeline(input, psets)?; + IntermediateNode::new(Arc::new(proj_op), vec![child_node]).boxed() } LocalPhysicalPlan::Filter(Filter { input, predicate, .. }) => { let filter_op = FilterOperator::new(predicate.clone()); - let child_node = physical_plan_to_pipeline(input, psets); - PipelineNode::IntermediateOp { - intermediate_op: Box::new(filter_op), - child: Box::new(child_node), - } + let child_node = physical_plan_to_pipeline(input, psets)?; + IntermediateNode::new(Arc::new(filter_op), vec![child_node]).boxed() } LocalPhysicalPlan::Limit(Limit { input, num_rows, .. }) => { let sink = LimitSink::new(*num_rows as usize); - let child_node = physical_plan_to_pipeline(input, psets); - PipelineNode::SingleInputSink { - sink: Box::new(sink), - child: Box::new(child_node), - } + let child_node = physical_plan_to_pipeline(input, psets)?; + StreamingSinkNode::new(sink.boxed(), vec![child_node]).boxed() } - LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { - let sink = ConcatSink::new(); - let left_child = physical_plan_to_pipeline(input, psets); - let right_child = physical_plan_to_pipeline(other, psets); - PipelineNode::DoubleInputSink { - sink: Box::new(sink), - left_child: Box::new(left_child), - right_child: Box::new(right_child), - } + LocalPhysicalPlan::Concat(_) => { + todo!("concat") + // let sink = ConcatSink::new(); + // let left_child = physical_plan_to_pipeline(input, psets)?; + // let right_child = physical_plan_to_pipeline(other, psets)?; + // PipelineNode::double_sink(sink, left_child, right_child) } LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { input, @@ -154,6 +93,11 @@ pub fn physical_plan_to_pipeline( .collect(), vec![], ); + + let child_node = physical_plan_to_pipeline(input, psets)?; + let post_first_agg_node = + IntermediateNode::new(Arc::new(first_stage_agg_op), vec![child_node]).boxed(); + let second_stage_agg_sink = AggregateSink::new( second_stage_aggs .values() @@ -162,23 +106,12 @@ pub fn physical_plan_to_pipeline( .collect(), vec![], ); - let final_stage_project = ProjectOperator::new(final_exprs); + let second_stage_node = + BlockingSinkNode::new(second_stage_agg_sink.boxed(), post_first_agg_node).boxed(); - let child_node = physical_plan_to_pipeline(input, psets); - let intermediate_agg_op_node = PipelineNode::IntermediateOp { - intermediate_op: Box::new(first_stage_agg_op), - child: Box::new(child_node), - }; - - let sink_node = PipelineNode::SingleInputSink { - sink: Box::new(second_stage_agg_sink), - child: Box::new(intermediate_agg_op_node), - }; + let final_stage_project = ProjectOperator::new(final_exprs); - PipelineNode::IntermediateOp { - intermediate_op: Box::new(final_stage_project), - child: Box::new(sink_node), - } + IntermediateNode::new(Arc::new(final_stage_project), vec![second_stage_node]).boxed() } LocalPhysicalPlan::HashAggregate(HashAggregate { input, @@ -197,6 +130,11 @@ pub fn physical_plan_to_pipeline( .collect(), group_by.clone(), ); + + let child_node = physical_plan_to_pipeline(input, psets)?; + let post_first_agg_node = + IntermediateNode::new(Arc::new(first_stage_agg_op), vec![child_node]).boxed(); + let second_stage_agg_sink = AggregateSink::new( second_stage_aggs .values() @@ -205,23 +143,12 @@ pub fn physical_plan_to_pipeline( .collect(), group_by.clone(), ); - let final_stage_project = ProjectOperator::new(final_exprs); - - let child_node = physical_plan_to_pipeline(input, psets); - let intermediate_agg_op_node = PipelineNode::IntermediateOp { - intermediate_op: Box::new(first_stage_agg_op), - child: Box::new(child_node), - }; + let second_stage_node = + BlockingSinkNode::new(second_stage_agg_sink.boxed(), post_first_agg_node).boxed(); - let sink_node = PipelineNode::SingleInputSink { - sink: Box::new(second_stage_agg_sink), - child: Box::new(intermediate_agg_op_node), - }; + let final_stage_project = ProjectOperator::new(final_exprs); - PipelineNode::IntermediateOp { - intermediate_op: Box::new(final_stage_project), - child: Box::new(sink_node), - } + IntermediateNode::new(Arc::new(final_stage_project), vec![second_stage_node]).boxed() } LocalPhysicalPlan::Sort(Sort { input, @@ -230,11 +157,8 @@ pub fn physical_plan_to_pipeline( .. }) => { let sort_sink = SortSink::new(sort_by.clone(), descending.clone()); - let child_node = physical_plan_to_pipeline(input, psets); - PipelineNode::SingleInputSink { - sink: Box::new(sort_sink), - child: Box::new(child_node), - } + let child_node = physical_plan_to_pipeline(input, psets)?; + BlockingSinkNode::new(sort_sink.boxed(), child_node).boxed() } LocalPhysicalPlan::HashJoin(HashJoin { left, @@ -244,17 +168,25 @@ pub fn physical_plan_to_pipeline( join_type, .. }) => { - let left_node = physical_plan_to_pipeline(left, psets); - let right_node = physical_plan_to_pipeline(right, psets); - let sink = HashJoinSink::new(left_on.clone(), right_on.clone(), *join_type); - PipelineNode::DoubleInputSink { - sink: Box::new(sink), - left_child: Box::new(left_node), - right_child: Box::new(right_node), - } + let left_schema = left.schema(); + let right_schema = right.schema(); + let left_node = physical_plan_to_pipeline(left, psets)?; + let right_node = physical_plan_to_pipeline(right, psets)?; + + // we should move to a builder pattern + let sink = HashJoinOperator::new( + left_on.clone(), + right_on.clone(), + *join_type, + left_schema, + right_schema, + )?; + HashJoinNode::new(sink, left_node, right_node).boxed() } _ => { unimplemented!("Physical plan not supported: {}", physical_plan.name()); } - } + }; + + Ok(out) } diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index d2c2dce9d1..a30a60720d 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -103,14 +103,15 @@ pub fn run_local( .expect("Failed to create tokio runtime"); let res = runtime.block_on(async { - let pipeline = physical_plan_to_pipeline(physical_plan, &psets); + let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets).unwrap(); + let (sender, mut receiver) = create_channel(1, true); - pipeline.start(sender); + pipeline.start(sender).await?; let mut result = vec![]; while let Some(val) = receiver.recv().await { result.push(val); } - result.into_iter() + DaftResult::Ok(result.into_iter()) }); - Ok(Box::new(res)) + Ok(Box::new(res?)) } diff --git a/src/daft-local-execution/src/sinks/aggregate.rs b/src/daft-local-execution/src/sinks/aggregate.rs index 2842c0d54f..912fba108b 100644 --- a/src/daft-local-execution/src/sinks/aggregate.rs +++ b/src/daft-local-execution/src/sinks/aggregate.rs @@ -3,15 +3,22 @@ use std::sync::Arc; use common_error::DaftResult; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; +use futures::{stream, StreamExt}; use tracing::instrument; -use super::sink::{SingleInputSink, SinkResultType}; +use crate::sources::source::Source; + +use super::blocking_sink::{BlockingSink, BlockingSinkStatus}; + +enum AggregateState { + Accumulating(Vec>), + Done(Arc), +} -#[derive(Clone)] pub struct AggregateSink { agg_exprs: Vec, group_by: Vec, - parts: Vec>, + state: AggregateState, } impl AggregateSink { @@ -19,27 +26,56 @@ impl AggregateSink { Self { agg_exprs, group_by, - parts: Vec::new(), + state: AggregateState::Accumulating(vec![]), } } + + pub fn boxed(self) -> Box { + Box::new(self) + } } -impl SingleInputSink for AggregateSink { +impl BlockingSink for AggregateSink { #[instrument(skip_all, name = "AggregateSink::sink")] - fn sink(&mut self, input: &Arc) -> DaftResult { - self.parts.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) + fn sink(&mut self, input: &Arc) -> DaftResult { + if let AggregateState::Accumulating(parts) = &mut self.state { + parts.push(input.clone()); + Ok(BlockingSinkStatus::NeedMoreInput) + } else { + panic!("sink must be in Accumulating phase") + } } - fn in_order(&self) -> bool { - true + #[instrument(skip_all, name = "AggregateSink::finalize")] + fn finalize(&mut self) -> DaftResult<()> { + if let AggregateState::Accumulating(parts) = &mut self.state { + assert!( + !parts.is_empty(), + "We can not finalize AggregateSink with no data" + ); + let concated = + MicroPartition::concat(&parts.iter().map(|x| x.as_ref()).collect::>())?; + let agged = concated.agg(&self.agg_exprs, &self.group_by)?; + self.state = AggregateState::Done(Arc::new(agged)); + Ok(()) + } else { + panic!("finalize must be in Accumulating phase") + } + } + fn name(&self) -> &'static str { + "AggregateSink" + } + fn as_source(&mut self) -> &mut dyn crate::sources::source::Source { + self } +} - #[instrument(skip_all, name = "AggregateSink::finalize")] - fn finalize(&mut self) -> DaftResult>> { - let concated = - MicroPartition::concat(&self.parts.iter().map(|x| x.as_ref()).collect::>())?; - let agged = concated.agg(&self.agg_exprs, &self.group_by)?; - Ok(vec![Arc::new(agged)]) +impl Source for AggregateSink { + fn get_data(&self, _maintain_order: bool) -> crate::sources::source::SourceStream { + if let AggregateState::Done(parts) = &self.state { + stream::iter([Ok(parts.clone())]).boxed() + } else { + panic!("as_source must be in Done phase") + } } } diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs new file mode 100644 index 0000000000..580ffea4ac --- /dev/null +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -0,0 +1,87 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use futures::StreamExt; +use tracing::{info_span, Instrument}; + +use crate::{ + channel::{create_channel, MultiSender}, + pipeline::PipelineNode, + sources::source::Source, + NUM_CPUS, +}; +use async_trait::async_trait; +pub enum BlockingSinkStatus { + NeedMoreInput, + #[allow(dead_code)] + Finished, +} + +pub trait BlockingSink: Send + Sync { + fn sink(&mut self, input: &Arc) -> DaftResult; + fn finalize(&mut self) -> DaftResult<()> { + Ok(()) + } + #[allow(dead_code)] + fn name(&self) -> &'static str; + fn as_source(&mut self) -> &mut dyn Source; +} + +pub(crate) struct BlockingSinkNode { + // use a RW lock + op: Arc>>, + child: Box, +} + +impl BlockingSinkNode { + pub(crate) fn new(op: Box, child: Box) -> Self { + BlockingSinkNode { + op: Arc::new(tokio::sync::Mutex::new(op)), + child, + } + } + pub(crate) fn boxed(self) -> Box { + Box::new(self) + } +} + +#[async_trait] +impl PipelineNode for BlockingSinkNode { + fn children(&self) -> Vec<&dyn PipelineNode> { + vec![self.child.as_ref()] + } + + async fn start(&mut self, mut destination: MultiSender) -> DaftResult<()> { + let (sender, mut streaming_receiver) = create_channel(*NUM_CPUS, true); + // now we can start building the right side + let child = self.child.as_mut(); + child.start(sender).await?; + let op = self.op.clone(); + let sink_build = tokio::spawn(async move { + let span = info_span!("BlockingSinkNode::execute"); + let mut guard = op.lock().await; + while let Some(val) = streaming_receiver.recv().await { + if let BlockingSinkStatus::Finished = span.in_scope(|| guard.sink(&val?))? { + break; + } + } + info_span!("BlockingSinkNode::finalize").in_scope(|| guard.finalize())?; + DaftResult::Ok(()) + }); + let op = self.op.clone(); + + tokio::spawn(async move { + sink_build.await.unwrap()?; + let mut guard = op.lock().await; + let source = guard.as_source(); + let mut source_stream = source.get_data(destination.in_order()); + while let Some(val) = source_stream.next().in_current_span().await { + let _ = destination.get_next_sender().send(val).await; + } + DaftResult::Ok(()) + }); + + Ok(()) + } +} diff --git a/src/daft-local-execution/src/sinks/concat.rs b/src/daft-local-execution/src/sinks/concat.rs index a8df9fe33d..010bed0aaf 100644 --- a/src/daft-local-execution/src/sinks/concat.rs +++ b/src/daft-local-execution/src/sinks/concat.rs @@ -1,54 +1,61 @@ -use std::sync::Arc; - -use common_error::DaftResult; -use daft_micropartition::MicroPartition; -use tracing::instrument; - -use super::sink::{DoubleInputSink, SinkResultType}; - -#[derive(Clone)] -pub struct ConcatSink { - result_left: Vec>, - result_right: Vec>, -} - -impl ConcatSink { - pub fn new() -> Self { - Self { - result_left: Vec::new(), - result_right: Vec::new(), - } - } -} - -impl DoubleInputSink for ConcatSink { - #[instrument(skip_all, name = "ConcatSink::sink")] - fn sink_left(&mut self, input: &Arc) -> DaftResult { - self.result_left.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) - } - - #[instrument(skip_all, name = "ConcatSink::sink")] - fn sink_right(&mut self, input: &Arc) -> DaftResult { - self.result_right.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) - } - - fn in_order(&self) -> bool { - true - } - - #[instrument(skip_all, name = "ConcatSink::finalize")] - fn finalize(&mut self) -> DaftResult>> { - Ok(self - .result_left - .clone() - .into_iter() - .chain(self.result_right.clone()) - .collect()) - } - - fn name(&self) -> &'static str { - "Concat" - } -} +// use std::sync::Arc; + +// use common_error::DaftResult; +// use daft_micropartition::MicroPartition; +// use tracing::instrument; + +// use super::sink::{Sink, SinkResultType}; + +// #[derive(Clone)] +// pub struct ConcatSink { +// result_left: Vec>, +// result_right: Vec>, +// } + +// impl ConcatSink { +// pub fn new() -> Self { +// Self { +// result_left: Vec::new(), +// result_right: Vec::new(), +// } +// } + +// #[instrument(skip_all, name = "ConcatSink::sink")] +// fn sink_left(&mut self, input: &Arc) -> DaftResult { +// self.result_left.push(input.clone()); +// Ok(SinkResultType::NeedMoreInput) +// } + +// #[instrument(skip_all, name = "ConcatSink::sink")] +// fn sink_right(&mut self, input: &Arc) -> DaftResult { +// self.result_right.push(input.clone()); +// Ok(SinkResultType::NeedMoreInput) +// } +// } + +// impl Sink for ConcatSink { +// fn sink(&mut self, index: usize, input: &Arc) -> DaftResult { +// match index { +// 0 => self.sink_left(input), +// 1 => self.sink_right(input), +// _ => panic!("concat only supports 2 inputs, got {index}"), +// } +// } + +// fn in_order(&self) -> bool { +// true +// } + +// fn num_inputs(&self) -> usize { +// 2 +// } + +// #[instrument(skip_all, name = "ConcatSink::finalize")] +// fn finalize(self: Box) -> DaftResult>> { +// Ok(self +// .result_left +// .into_iter() +// .chain(self.result_right.into_iter()) +// .collect()) +// } +// } diff --git a/src/daft-local-execution/src/sinks/hash_join.rs b/src/daft-local-execution/src/sinks/hash_join.rs index d20edc61b2..82dbee00f5 100644 --- a/src/daft-local-execution/src/sinks/hash_join.rs +++ b/src/daft-local-execution/src/sinks/hash_join.rs @@ -1,77 +1,323 @@ use std::sync::Arc; +use crate::{ + channel::{create_channel, spawn_compute_task, MultiSender}, + intermediate_ops::intermediate_op::{IntermediateOpActor, IntermediateOperator}, + pipeline::PipelineNode, + sources::source::Source, + NUM_CPUS, +}; +use async_trait::async_trait; use common_error::DaftResult; +use daft_core::{ + datatypes::Field, + schema::{Schema, SchemaRef}, + utils::supertype, +}; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; use daft_plan::JoinType; -use tracing::instrument; +use futures::{stream, StreamExt}; +use tracing::info_span; -use super::sink::{DoubleInputSink, SinkResultType}; +use super::blocking_sink::{BlockingSink, BlockingSinkStatus}; +use daft_table::{ + infer_join_schema_mapper, GrowableTable, JoinOutputMapper, ProbeTable, ProbeTableBuilder, Table, +}; -#[derive(Clone)] -pub struct HashJoinSink { - result_left: Vec>, - result_right: Vec>, - left_on: Vec, - right_on: Vec, - join_type: JoinType, +enum HashJoinState { + Building { + probe_table_builder: Option, + projection: Vec, + tables: Vec, + }, + Probing { + probe_table: Arc, + tables: Arc>, + }, } -impl HashJoinSink { - pub fn new(left_on: Vec, right_on: Vec, join_type: JoinType) -> Self { - Self { - result_left: Vec::new(), - result_right: Vec::new(), - left_on, - right_on, - join_type, +impl HashJoinState { + fn new(key_schema: &SchemaRef, projection: Vec) -> DaftResult { + Ok(Self::Building { + probe_table_builder: Some(ProbeTableBuilder::new(key_schema.clone())?), + projection, + tables: vec![], + }) + } + + fn add_tables(&mut self, input: &Arc) -> DaftResult<()> { + if let Self::Building { + ref mut probe_table_builder, + projection, + tables, + } = self + { + let probe_table_builder = probe_table_builder.as_mut().unwrap(); + for table in input.get_tables()?.iter() { + tables.push(table.clone()); + let join_keys = table.eval_expression_list(projection)?; + + probe_table_builder.add_table(&join_keys)?; + } + Ok(()) + } else { + panic!("add_tables can only be used during the Building Phase") } } + fn finalize(&mut self, join_mapper: &JoinOutputMapper) -> DaftResult<()> { + if let Self::Building { + probe_table_builder, + tables, + .. + } = self + { + let ptb = std::mem::take(probe_table_builder).expect("should be set in building mode"); + let pt = ptb.build(); + let mapped_tables = tables + .iter() + .map(|t| join_mapper.map_left(t)) + .collect::>>()?; + + *self = Self::Probing { + probe_table: Arc::new(pt), + tables: Arc::new(mapped_tables), + }; + Ok(()) + } else { + panic!("finalize can only be used during the Building Phase") + } + } +} + +pub(crate) struct HashJoinOperator { + right_on: Vec, + _join_type: JoinType, + join_mapper: Arc, + join_state: HashJoinState, } -impl DoubleInputSink for HashJoinSink { - #[instrument(skip_all, name = "HashJoin::sink")] - fn sink_left(&mut self, input: &Arc) -> DaftResult { - self.result_left.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) +impl HashJoinOperator { + pub(crate) fn new( + left_on: Vec, + right_on: Vec, + join_type: JoinType, + left_schema: &SchemaRef, + right_schema: &SchemaRef, + ) -> DaftResult { + let left_key_fields = left_on + .iter() + .map(|e| e.to_field(left_schema)) + .collect::>>()?; + let right_key_fields = right_on + .iter() + .map(|e| e.to_field(right_schema)) + .collect::>>()?; + let key_schema: SchemaRef = Schema::new( + left_key_fields + .into_iter() + .zip(right_key_fields.into_iter()) + .map(|(l, r)| { + // TODO we should be using the comparison_op function here instead but i'm just using existing behavior for now + let dtype = supertype::try_get_supertype(&l.dtype, &r.dtype)?; + Ok(Field::new(l.name, dtype)) + }) + .collect::>>()?, + )? + .into(); + + let join_mapper = + infer_join_schema_mapper(left_schema, right_schema, &left_on, &right_on, join_type)?; + + let left_on = left_on + .into_iter() + .zip(key_schema.fields.values()) + .map(|(e, f)| e.cast(&f.dtype)) + .collect::>(); + let right_on = right_on + .into_iter() + .zip(key_schema.fields.values()) + .map(|(e, f)| e.cast(&f.dtype)) + .collect::>(); + assert_eq!(join_type, JoinType::Inner); + Ok(Self { + right_on, + _join_type: join_type, + join_mapper: Arc::new(join_mapper), + join_state: HashJoinState::new(&key_schema, left_on)?, + }) } - #[instrument(skip_all, name = "HashJoin::sink")] - fn sink_right(&mut self, input: &Arc) -> DaftResult { - self.result_right.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) + fn as_sink(&mut self) -> &mut dyn BlockingSink { + self } - fn in_order(&self) -> bool { - false + fn as_intermediate_op(&self) -> Arc { + if let HashJoinState::Probing { + probe_table, + tables, + } = &self.join_state + { + Arc::new(HashJoinProber { + probe_table: probe_table.clone(), + tables: tables.clone(), + right_on: self.right_on.clone(), + join_mapper: self.join_mapper.clone(), + }) + } else { + panic!("can't call as_intermediate_op when not in probing state") + } } +} - #[instrument(skip_all, name = "HashJoin::finalize")] - fn finalize(&mut self) -> DaftResult>> { - let concated_left = MicroPartition::concat( - &self - .result_left - .iter() - .map(|x| x.as_ref()) - .collect::>(), - )?; - let concated_right = MicroPartition::concat( - &self - .result_right - .iter() - .map(|x| x.as_ref()) - .collect::>(), - )?; - let joined = concated_left.hash_join( - &concated_right, - &self.left_on, - &self.right_on, - self.join_type, - )?; - Ok(vec![Arc::new(joined)]) +struct HashJoinProber { + probe_table: Arc, + tables: Arc>, + right_on: Vec, + join_mapper: Arc, +} + +impl IntermediateOperator for HashJoinProber { + fn name(&self) -> &'static str { + "HashJoinProber" } + fn execute(&self, input: &Arc) -> DaftResult> { + let _span = info_span!("HashJoinOperator::execute").entered(); + let _growables = info_span!("HashJoinOperator::build_growables").entered(); + + // Left should only be created once per probe table + let mut left_growable = + GrowableTable::new(&self.tables.iter().collect::>(), false, 20)?; + // right should only be created morsel + + let right_input_tables = input.get_tables()?; + + let right_tables = right_input_tables + .iter() + .map(|t| self.join_mapper.map_right(t)) + .collect::>>()?; + + let mut right_growable = + GrowableTable::new(&right_tables.iter().collect::>(), false, 20)?; + + drop(_growables); + { + let _loop = info_span!("HashJoinOperator::eval_and_probe").entered(); + for (r_table_idx, table) in right_input_tables.iter().enumerate() { + // we should emit one table at a time when this is streaming + let join_keys = table.eval_expression_list(&self.right_on)?; + let iter = self.probe_table.probe(&join_keys)?; + + for (l_table_idx, l_row_idx, right_idx) in iter { + left_growable.extend(l_table_idx as usize, l_row_idx as usize, 1); + // we can perform run length compression for this to make this more efficient + right_growable.extend(r_table_idx, right_idx as usize, 1); + } + } + } + let left_table = left_growable.build()?; + let right_table = right_growable.build()?; + let final_table = left_table.union(&right_table)?; + Ok(Arc::new(MicroPartition::new_loaded( + final_table.schema.clone(), + Arc::new(vec![final_table]), + None, + ))) + } +} + +impl BlockingSink for HashJoinOperator { fn name(&self) -> &'static str { "HashJoin" } + + fn sink(&mut self, input: &Arc) -> DaftResult { + self.join_state.add_tables(input)?; + Ok(BlockingSinkStatus::NeedMoreInput) + } + fn finalize(&mut self) -> DaftResult<()> { + self.join_state.finalize(&self.join_mapper)?; + Ok(()) + } + fn as_source(&mut self) -> &mut dyn Source { + self + } +} + +impl Source for HashJoinOperator { + fn get_data(&self, _maintain_order: bool) -> crate::sources::source::SourceStream { + stream::empty().boxed() + } +} + +pub(crate) struct HashJoinNode { + // use a RW lock + hash_join: Arc>, + left: Box, + right: Box, +} + +impl HashJoinNode { + pub(crate) fn new( + op: HashJoinOperator, + left: Box, + right: Box, + ) -> Self { + HashJoinNode { + hash_join: Arc::new(tokio::sync::Mutex::new(op)), + left, + right, + } + } + pub(crate) fn boxed(self) -> Box { + Box::new(self) + } +} + +#[async_trait] +impl PipelineNode for HashJoinNode { + fn children(&self) -> Vec<&dyn PipelineNode> { + vec![self.left.as_ref(), self.right.as_ref()] + } + + async fn start(&mut self, destination: MultiSender) -> DaftResult<()> { + let (sender, mut pt_receiver) = create_channel(*NUM_CPUS, false); + self.left.start(sender).await?; + let hash_join = self.hash_join.clone(); + + let probe_table_build = tokio::spawn(async move { + let span = info_span!("ProbeTable::sink"); + let mut guard = hash_join.lock().await; + let sink = guard.as_sink(); + while let Some(val) = pt_receiver.recv().await { + if let BlockingSinkStatus::Finished = span.in_scope(|| sink.sink(&val?))? { + break; + } + } + + info_span!("ProbeTable::finalize").in_scope(|| sink.finalize())?; + DaftResult::Ok(()) + }); + // should wrap in context join handle + + let (right_sender, streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order()); + // now we can start building the right side + self.right.start(right_sender).await?; + + probe_table_build.await.unwrap()?; + + let hash_join = self.hash_join.clone(); + let destination = destination; + let probing_op = { + let guard = hash_join.lock().await; + guard.as_intermediate_op() + }; + + let mut actor = IntermediateOpActor::new(probing_op, streaming_receiver, destination); + // this should ideally be in the actor + spawn_compute_task(async move { actor.run_parallel().await }); + + Ok(()) + } } diff --git a/src/daft-local-execution/src/sinks/limit.rs b/src/daft-local-execution/src/sinks/limit.rs index d2079bca00..2153ab5e0d 100644 --- a/src/daft-local-execution/src/sinks/limit.rs +++ b/src/daft-local-execution/src/sinks/limit.rs @@ -4,53 +4,57 @@ use common_error::DaftResult; use daft_micropartition::MicroPartition; use tracing::instrument; -use super::sink::{SingleInputSink, SinkResultType}; +use super::streaming_sink::{StreamSinkOutput, StreamingSink}; #[derive(Clone)] pub struct LimitSink { + #[allow(dead_code)] limit: usize, - num_rows_taken: usize, - result: Vec>, + remaining: usize, } impl LimitSink { pub fn new(limit: usize) -> Self { Self { limit, - num_rows_taken: 0, - result: Vec::new(), + remaining: limit, } } + pub fn boxed(self) -> Box { + Box::new(self) + } } -impl SingleInputSink for LimitSink { +impl StreamingSink for LimitSink { #[instrument(skip_all, name = "LimitSink::sink")] - fn sink(&mut self, input: &Arc) -> DaftResult { - let input_num_rows = input.len(); + fn execute( + &mut self, + index: usize, + input: &Arc, + ) -> DaftResult { + assert_eq!(index, 0); - if self.num_rows_taken == self.limit { - return Ok(SinkResultType::Finished); - } + let input_num_rows = input.len(); - if self.num_rows_taken + input_num_rows <= self.limit { - self.num_rows_taken += input_num_rows; - self.result.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) - } else { - let num_rows_to_take = self.limit - self.num_rows_taken; - let taken = input.head(num_rows_to_take)?; - self.num_rows_taken = self.limit; - self.result.push(Arc::new(taken)); - Ok(SinkResultType::Finished) + use std::cmp::Ordering::*; + match input_num_rows.cmp(&self.remaining) { + Less => { + self.remaining -= input_num_rows; + Ok(StreamSinkOutput::NeedMoreInput(Some(input.clone()))) + } + Equal => { + self.remaining = 0; + Ok(StreamSinkOutput::Finished(Some(input.clone()))) + } + Greater => { + let taken = input.head(self.remaining)?; + self.remaining -= taken.len(); + Ok(StreamSinkOutput::Finished(Some(Arc::new(taken)))) + } } } - fn in_order(&self) -> bool { - false - } - - #[instrument(skip_all, name = "LimitSink::finalize")] - fn finalize(&mut self) -> DaftResult>> { - Ok(self.result.clone()) + fn name(&self) -> &'static str { + "Limit" } } diff --git a/src/daft-local-execution/src/sinks/mod.rs b/src/daft-local-execution/src/sinks/mod.rs index 6923310612..865c6df167 100644 --- a/src/daft-local-execution/src/sinks/mod.rs +++ b/src/daft-local-execution/src/sinks/mod.rs @@ -1,6 +1,7 @@ pub mod aggregate; +pub mod blocking_sink; pub mod concat; pub mod hash_join; pub mod limit; -pub mod sink; pub mod sort; +pub mod streaming_sink; diff --git a/src/daft-local-execution/src/sinks/sink.rs b/src/daft-local-execution/src/sinks/sink.rs deleted file mode 100644 index d1bd0c5415..0000000000 --- a/src/daft-local-execution/src/sinks/sink.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::sync::Arc; - -use common_error::DaftResult; -use daft_micropartition::MicroPartition; -use tracing::{info_span, instrument}; - -use crate::{ - channel::{create_channel, MultiReceiver, MultiSender}, - NUM_CPUS, -}; - -pub enum SinkResultType { - NeedMoreInput, - Finished, -} - -pub trait SingleInputSink: Send + Sync { - fn sink(&mut self, input: &Arc) -> DaftResult; - fn in_order(&self) -> bool; - fn finalize(&mut self) -> DaftResult>>; -} - -pub struct SingleInputSinkActor { - sink: Box, - receiver: MultiReceiver, - sender: MultiSender, -} - -impl SingleInputSinkActor { - pub fn new( - sink: Box, - receiver: MultiReceiver, - sender: MultiSender, - ) -> Self { - Self { - sink, - receiver, - sender, - } - } - - #[instrument(level = "info", skip(self), name = "SingleInputSinkActor::run")] - pub async fn run(&mut self) -> DaftResult<()> { - while let Some(val) = self.receiver.recv().await { - let _sink_span = info_span!("Sink::sink").entered(); - - let sink_result = self.sink.sink(&val?)?; - match sink_result { - SinkResultType::NeedMoreInput => { - continue; - } - SinkResultType::Finished => { - break; - } - } - } - let final_span = info_span!("Sink::finalize"); - - let finalized_values = final_span.in_scope(|| self.sink.finalize())?; - for val in finalized_values { - let _ = self.sender.get_next_sender().send(Ok(val)).await; - } - Ok(()) - } -} - -pub fn run_single_input_sink(sink: Box, send_to: MultiSender) -> MultiSender { - let (sender, receiver) = create_channel(*NUM_CPUS, sink.in_order()); - let mut actor = SingleInputSinkActor::new(sink, receiver, send_to); - tokio::spawn(async move { - let _ = actor.run().await; - }); - sender -} - -pub trait DoubleInputSink: Send + Sync + dyn_clone::DynClone { - fn sink_left(&mut self, input: &Arc) -> DaftResult; - fn sink_right(&mut self, input: &Arc) -> DaftResult; - fn in_order(&self) -> bool; - fn finalize(&mut self) -> DaftResult>>; - fn name(&self) -> &'static str; -} - -dyn_clone::clone_trait_object!(DoubleInputSink); - -pub struct DoubleInputSinkActor { - sink: Box, - left_receiver: MultiReceiver, - right_receiver: MultiReceiver, - sender: MultiSender, -} - -impl DoubleInputSinkActor { - pub fn new( - sink: Box, - left_receiver: MultiReceiver, - right_receiver: MultiReceiver, - sender: MultiSender, - ) -> Self { - Self { - sink, - left_receiver, - right_receiver, - sender, - } - } - - #[instrument(level = "info", skip(self), name = "DoubleInputSinkActor::run")] - pub async fn run(&mut self) -> DaftResult<()> { - while let Some(val) = self.left_receiver.recv().await { - let _sink_span = info_span!("Sink::sink").entered(); - let sink_result = self.sink.sink_left(&val?)?; - match sink_result { - SinkResultType::NeedMoreInput => { - continue; - } - SinkResultType::Finished => { - break; - } - } - } - - while let Some(val) = self.right_receiver.recv().await { - let _sink_span = info_span!("Sink::sink").entered(); - let sink_result = self.sink.sink_right(&val?)?; - match sink_result { - SinkResultType::NeedMoreInput => { - continue; - } - SinkResultType::Finished => { - break; - } - } - } - - let final_span = info_span!("Sink::finalize"); - - let finalized_values = final_span.in_scope(|| self.sink.finalize())?; - for val in finalized_values { - let _ = self.sender.get_next_sender().send(Ok(val)).await; - } - Ok(()) - } -} - -pub fn run_double_input_sink( - sink: Box, - send_to: MultiSender, -) -> (MultiSender, MultiSender) { - let (left_sender, left_receiver) = create_channel(*NUM_CPUS, sink.in_order()); - let (right_sender, right_receiver) = create_channel(*NUM_CPUS, sink.in_order()); - let mut actor = DoubleInputSinkActor::new(sink, left_receiver, right_receiver, send_to); - tokio::spawn(async move { - let _ = actor.run().await; - }); - (left_sender, right_sender) -} diff --git a/src/daft-local-execution/src/sinks/sort.rs b/src/daft-local-execution/src/sinks/sort.rs index 4118d67867..2af6d9eb59 100644 --- a/src/daft-local-execution/src/sinks/sort.rs +++ b/src/daft-local-execution/src/sinks/sort.rs @@ -3,15 +3,22 @@ use std::sync::Arc; use common_error::DaftResult; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; +use futures::{stream, StreamExt}; use tracing::instrument; -use super::sink::{SingleInputSink, SinkResultType}; +use crate::sources::source::Source; + +use super::blocking_sink::{BlockingSink, BlockingSinkStatus}; -#[derive(Clone)] pub struct SortSink { sort_by: Vec, descending: Vec, - parts: Vec>, + state: SortState, +} + +enum SortState { + Building(Vec>), + Done(Arc), } impl SortSink { @@ -19,27 +26,54 @@ impl SortSink { Self { sort_by, descending, - parts: Vec::new(), + state: SortState::Building(vec![]), } } + pub fn boxed(self) -> Box { + Box::new(self) + } } -impl SingleInputSink for SortSink { +impl BlockingSink for SortSink { #[instrument(skip_all, name = "SortSink::sink")] - fn sink(&mut self, input: &Arc) -> DaftResult { - self.parts.push(input.clone()); - Ok(SinkResultType::NeedMoreInput) + fn sink(&mut self, input: &Arc) -> DaftResult { + if let SortState::Building(parts) = &mut self.state { + parts.push(input.clone()); + } else { + panic!("sink should be in building phase"); + } + Ok(BlockingSinkStatus::NeedMoreInput) } - - fn in_order(&self) -> bool { - false + fn name(&self) -> &'static str { + "Sort" } - #[instrument(skip_all, name = "SortSink::finalize")] - fn finalize(&mut self) -> DaftResult>> { - let concated = - MicroPartition::concat(&self.parts.iter().map(|x| x.as_ref()).collect::>())?; - let sorted = concated.sort(&self.sort_by, &self.descending)?; - Ok(vec![Arc::new(sorted)]) + fn finalize(&mut self) -> DaftResult<()> { + if let SortState::Building(parts) = &mut self.state { + assert!( + !parts.is_empty(), + "We can not finalize SortSink with no data" + ); + let concated = + MicroPartition::concat(&parts.iter().map(|x| x.as_ref()).collect::>())?; + let sorted = concated.sort(&self.sort_by, &self.descending)?; + self.state = SortState::Done(Arc::new(sorted)); + Ok(()) + } else { + panic!("finalize should be in building phase"); + } + } + fn as_source(&mut self) -> &mut dyn crate::sources::source::Source { + self + } +} + +impl Source for SortSink { + fn get_data(&self, _maintain_order: bool) -> crate::sources::source::SourceStream { + if let SortState::Done(parts) = &self.state { + stream::iter([Ok(parts.clone())]).boxed() + } else { + panic!("get_data should be in done phase"); + } } } diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs new file mode 100644 index 0000000000..f4ad9a9fd5 --- /dev/null +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use tracing::info_span; + +use crate::{ + channel::{create_channel, MultiSender}, + pipeline::PipelineNode, + NUM_CPUS, +}; +use async_trait::async_trait; +pub enum StreamSinkOutput { + NeedMoreInput(Option>), + #[allow(dead_code)] + HasMoreOutput(Arc), + Finished(Option>), +} + +pub trait StreamingSink: Send + Sync { + fn execute( + &mut self, + index: usize, + input: &Arc, + ) -> DaftResult; + #[allow(dead_code)] + fn name(&self) -> &'static str; +} + +pub(crate) struct StreamingSinkNode { + // use a RW lock + op: Arc>>, + children: Vec>, +} + +impl StreamingSinkNode { + pub(crate) fn new(op: Box, children: Vec>) -> Self { + StreamingSinkNode { + op: Arc::new(tokio::sync::Mutex::new(op)), + children, + } + } + pub(crate) fn boxed(self) -> Box { + Box::new(self) + } +} + +#[async_trait] +impl PipelineNode for StreamingSinkNode { + fn children(&self) -> Vec<&dyn PipelineNode> { + self.children.iter().map(|v| v.as_ref()).collect() + } + + async fn start(&mut self, mut destination: MultiSender) -> DaftResult<()> { + let (sender, mut streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order()); + // now we can start building the right side + let child = self + .children + .get_mut(0) + .expect("we should only have 1 child"); + child.start(sender).await?; + let op = self.op.clone(); + tokio::spawn(async move { + // this should be a RWLock and run in concurrent workers + let span = info_span!("StreamingSink::execute"); + + let mut sink = op.lock().await; + let mut is_active = true; + while is_active && let Some(val) = streaming_receiver.recv().await { + let val = val?; + loop { + let result = span.in_scope(|| sink.execute(0, &val))?; + match result { + StreamSinkOutput::HasMoreOutput(mp) => { + let sender = destination.get_next_sender(); + sender.send(Ok(mp)).await.unwrap(); + } + StreamSinkOutput::NeedMoreInput(mp) => { + if let Some(mp) = mp { + let sender = destination.get_next_sender(); + sender.send(Ok(mp)).await.unwrap(); + } + break; + } + StreamSinkOutput::Finished(mp) => { + if let Some(mp) = mp { + let sender = destination.get_next_sender(); + sender.send(Ok(mp)).await.unwrap(); + } + is_active = false; + break; + } + } + } + } + DaftResult::Ok(()) + }); + Ok(()) + } +} diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index 259040ac11..55283b04f7 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -14,6 +14,9 @@ impl InMemorySource { pub fn new(data: Vec>) -> Self { Self { data } } + pub fn boxed(self) -> Box { + Box::new(self) as Box + } } impl Source for InMemorySource { diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 21723cf97c..f4576513fe 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -59,6 +59,9 @@ impl ScanTaskSource { } } } + pub fn boxed(self) -> Box { + Box::new(self) as Box + } } impl Source for ScanTaskSource { diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index a209ea0876..a63fef9b4f 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -3,9 +3,11 @@ use std::sync::Arc; use common_error::DaftResult; use daft_micropartition::MicroPartition; use futures::{stream::BoxStream, StreamExt}; -use tracing::{instrument, Instrument}; -use crate::channel::MultiSender; +use async_trait::async_trait; +use tracing::Instrument; + +use crate::{channel::MultiSender, pipeline::PipelineNode}; pub type SourceStream<'a> = BoxStream<'a, DaftResult>>; @@ -13,32 +15,33 @@ pub trait Source: Send + Sync { fn get_data(&self, maintain_order: bool) -> SourceStream; } -pub struct SourceActor { - source: Arc, - sender: MultiSender, +struct SourceNode { + source_op: Arc>>, } -impl SourceActor { - pub fn new(source: Arc, sender: MultiSender) -> Self { - Self { source, sender } +#[async_trait] +impl PipelineNode for SourceNode { + fn children(&self) -> Vec<&dyn PipelineNode> { + vec![] } - - #[instrument(level = "info", skip(self), name = "SourceActor::run")] - pub async fn run(&mut self, maintain_order: bool) -> DaftResult<()> { - let mut source_stream = self.source.get_data(maintain_order); - while let Some(val) = source_stream.next().in_current_span().await { - let _ = self.sender.get_next_sender().send(val).await; - } + async fn start(&mut self, mut destination: MultiSender) -> DaftResult<()> { + let op = self.source_op.clone(); + tokio::spawn(async move { + let guard = op.lock().await; + let mut source_stream = guard.get_data(destination.in_order()); + while let Some(val) = source_stream.next().in_current_span().await { + let _ = destination.get_next_sender().send(val).await; + } + DaftResult::Ok(()) + }); Ok(()) } } -pub fn run_source(source: Arc, sender: MultiSender) { - let maintain_order = sender.in_order(); - let mut actor = SourceActor::new(source, sender); - tokio::spawn( - async move { - let _ = actor.run(maintain_order).in_current_span().await; - } - .in_current_span(), - ); + +impl From> for Box { + fn from(value: Box) -> Self { + Box::new(SourceNode { + source_op: Arc::new(tokio::sync::Mutex::new(value)), + }) + } } diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 3c84acac4e..2904df926a 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -720,7 +720,10 @@ impl MicroPartition { TableState::Loaded(tables) => Ok(tables.clone()), } } - + pub fn get_tables(&self) -> crate::Result>> { + let tables = self.tables_or_read(IOStatsContext::new("get tables"))?; + Ok(tables) + } pub fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result>> { let tables = self.tables_or_read(io_stats)?; if tables.len() <= 1 { diff --git a/src/daft-physical-plan/Cargo.toml b/src/daft-physical-plan/Cargo.toml index ccfe99f160..02d8275e9f 100644 --- a/src/daft-physical-plan/Cargo.toml +++ b/src/daft-physical-plan/Cargo.toml @@ -4,6 +4,7 @@ daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-plan = {path = "../daft-plan", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} +log = {workspace = true} strum = {version = "0.26", features = ["derive"]} [package] diff --git a/src/daft-physical-plan/src/local_plan.rs b/src/daft-physical-plan/src/local_plan.rs index 6caa84c425..7b0e7e58c0 100644 --- a/src/daft-physical-plan/src/local_plan.rs +++ b/src/daft-physical-plan/src/local_plan.rs @@ -191,7 +191,7 @@ impl LocalPhysicalPlan { .arced() } - pub(crate) fn schema(&self) -> &SchemaRef { + pub fn schema(&self) -> &SchemaRef { match self { LocalPhysicalPlan::PhysicalScan(PhysicalScan { schema, .. }) | LocalPhysicalPlan::Filter(Filter { schema, .. }) diff --git a/src/daft-physical-plan/src/translate.rs b/src/daft-physical-plan/src/translate.rs index 6d33d8453a..a14f62ee22 100644 --- a/src/daft-physical-plan/src/translate.rs +++ b/src/daft-physical-plan/src/translate.rs @@ -102,6 +102,10 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { let other = translate(&concat.other)?; Ok(LocalPhysicalPlan::concat(input, other)) } + LogicalPlan::Repartition(repartition) => { + log::warn!("Repartition Not supported for Local Executor!; This will be a No-Op"); + translate(&repartition.input) + } _ => todo!("{} not yet implemented", plan.name()), } } diff --git a/src/daft-table/src/probe_table/mod.rs b/src/daft-table/src/probe_table/mod.rs index d9bb37a7b2..d03d962871 100644 --- a/src/daft-table/src/probe_table/mod.rs +++ b/src/daft-table/src/probe_table/mod.rs @@ -18,6 +18,8 @@ pub struct ProbeTable { hash_table: HashMap, IdentityBuildHasher>, tables: Vec, compare_fn: MultiDynArrayComparator, + num_groups: usize, + num_rows: usize, } impl ProbeTable { @@ -39,6 +41,8 @@ impl ProbeTable { hash_table, tables: vec![], compare_fn, + num_groups: 0, + num_rows: 0, }) } @@ -140,12 +144,14 @@ impl ProbeTable { }, vec![idx as u64], ); + self.num_groups += 1; } RawEntryMut::Occupied(mut entry) => { entry.get_mut().push(idx as u64); } } } + self.num_rows += table.len(); Ok(()) } }