Skip to content

Commit

Permalink
[PERF] Swordfish Dynamic Pipelines (#2599)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Aug 6, 2024
1 parent fd8c940 commit 702ac73
Show file tree
Hide file tree
Showing 24 changed files with 885 additions and 513 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/daft-core/src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::datatypes::PythonArray;
/// * `capacity` - Helps pre-allocate memory to the [`Growable`] by providing a capacity up-front. Note that variable-length types
/// such as [`ListArray`] only understands this as the "top-level" capacity, but the capacity of nested children arrays cannot be specified
/// through this [`make_growable`] API. Instead, you may wish to instantiate and use the [`nested_growable::ListGrowable`] directly if
/// this is important to your use0-case.
/// this is important to your use-case.
pub fn make_growable<'a>(
name: &str,
dtype: &DataType,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[dependencies]
async-stream = {workspace = true}
async-trait = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
Expand All @@ -14,7 +15,6 @@ daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
dyn-clone = {workspace = true}
futures = {workspace = true}
lazy_static = {workspace = true}
pyo3 = {workspace = true, optional = true}
Expand Down
82 changes: 69 additions & 13 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
use std::sync::Arc;
use std::{env, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;
use tracing::{info_span, instrument};

use async_trait::async_trait;

use crate::{
channel::{
create_channel, create_single_channel, spawn_compute_task, MultiReceiver, MultiSender,
SingleReceiver, SingleSender,
},
pipeline::PipelineNode,
DEFAULT_MORSEL_SIZE, NUM_CPUS,
};

pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {
pub trait IntermediateOperator: Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
#[allow(dead_code)]

fn name(&self) -> &'static str;
}

dyn_clone::clone_trait_object!(IntermediateOperator);
/// The number of rows that will trigger an intermediate operator to output its data.
#[allow(dead_code)]
fn get_output_threshold() -> usize {
env::var("OUTPUT_THRESHOLD")
.unwrap_or_else(|_| "1000".to_string())
.parse()
.expect("OUTPUT_THRESHOLD must be a number")
}

/// State of an operator task, used to buffer data and output it when a threshold is reached.
pub struct OperatorTaskState {
Expand Down Expand Up @@ -55,6 +67,10 @@ impl OperatorTaskState {
if self.buffer.is_empty() {
return None;
}
assert!(
!self.buffer.is_empty(),
"We can not run concat with no data"
);
let concated =
MicroPartition::concat(&self.buffer.iter().map(|x| x.as_ref()).collect::<Vec<_>>())
.map(Arc::new);
Expand All @@ -70,12 +86,12 @@ impl OperatorTaskState {
pub struct IntermediateOpActor {
sender: MultiSender,
receiver: MultiReceiver,
op: Box<dyn IntermediateOperator>,
op: Arc<dyn IntermediateOperator>,
}

impl IntermediateOpActor {
pub fn new(
op: Box<dyn IntermediateOperator>,
op: Arc<dyn IntermediateOperator>,
receiver: MultiReceiver,
sender: MultiSender,
) -> Self {
Expand All @@ -91,7 +107,7 @@ impl IntermediateOpActor {
async fn run_single(
mut receiver: SingleReceiver,
sender: SingleSender,
op: Box<dyn IntermediateOperator>,
op: Arc<dyn IntermediateOperator>,
) -> DaftResult<()> {
let mut state = OperatorTaskState::new();
let span = info_span!("IntermediateOp::execute");
Expand Down Expand Up @@ -139,11 +155,51 @@ impl IntermediateOpActor {
}
}

pub fn run_intermediate_op(op: Box<dyn IntermediateOperator>, send_to: MultiSender) -> MultiSender {
let (sender, receiver) = create_channel(*NUM_CPUS, send_to.in_order());
let mut actor = IntermediateOpActor::new(op, receiver, send_to);
tokio::spawn(async move {
let _ = actor.run_parallel().await;
});
sender
pub(crate) struct IntermediateNode {
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
}

impl IntermediateNode {
pub(crate) fn new(
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
) -> Self {
IntermediateNode {
intermediate_op,
children,
}
}
pub(crate) fn boxed(self) -> Box<dyn PipelineNode> {
Box::new(self)
}
}

#[async_trait]
impl PipelineNode for IntermediateNode {
fn children(&self) -> Vec<&dyn PipelineNode> {
self.children.iter().map(|v| v.as_ref()).collect()
}

async fn start(&mut self, destination: MultiSender) -> DaftResult<()> {
assert_eq!(
self.children.len(),
1,
"we only support 1 child for Intermediate Node for now"
);

let (sender, receiver) = create_channel(*NUM_CPUS, destination.in_order());

let child = self
.children
.get_mut(0)
.expect("we should only have 1 child");
child.start(sender).await?;

let mut actor =
IntermediateOpActor::new(self.intermediate_op.clone(), receiver, destination);
// this should ideally be in the actor
spawn_compute_task(async move { actor.run_parallel().await });
Ok(())
}
}
1 change: 1 addition & 0 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(let_chains)]
mod channel;
mod intermediate_ops;
mod pipeline;
Expand Down
Loading

0 comments on commit 702ac73

Please sign in to comment.