Skip to content

Commit

Permalink
redo
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Nov 15, 2024
1 parent e751f61 commit 6a3d3fc
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 26 deletions.
18 changes: 15 additions & 3 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use daft_core::{
use daft_dsl::{col, join::get_common_join_keys, Expr};
use daft_local_plan::{
ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan,
Limit, LocalPhysicalPlan, PhysicalWrite, Pivot, Project, Sample, Sort, UnGroupedAggregate,
Unpivot,
Limit, LocalPhysicalPlan, MonotonicallyIncreasingId, PhysicalWrite, Pivot, Project, Sample,
Sort, UnGroupedAggregate, Unpivot,
};
use daft_logical_plan::JoinType;
use daft_micropartition::MicroPartition;
Expand All @@ -38,6 +38,7 @@ use crate::{
concat::ConcatSink,
hash_join_build::{HashJoinBuildSink, ProbeStateBridge},
limit::LimitSink,
monotonically_increasing_id::MonotonicallyIncreasingIdSink,
outer_hash_join_probe::OuterHashJoinProbeSink,
pivot::PivotSink,
sort::SortSink,
Expand Down Expand Up @@ -281,7 +282,18 @@ pub fn physical_plan_to_pipeline(
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
BlockingSinkNode::new(Arc::new(sort_sink), child_node).boxed()
}

LocalPhysicalPlan::MonotonicallyIncreasingId(MonotonicallyIncreasingId {
input,
column_name,
schema,
..
}) => {
let child_node = physical_plan_to_pipeline(input, psets, cfg)?;
let monotonically_increasing_id_sink =
MonotonicallyIncreasingIdSink::new(column_name.clone(), schema.clone());
StreamingSinkNode::new(Arc::new(monotonically_increasing_id_sink), vec![child_node])
.boxed()
}
LocalPhysicalPlan::HashJoin(HashJoin {
left,
right,
Expand Down
1 change: 1 addition & 0 deletions src/daft-local-execution/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod blocking_sink;
pub mod concat;
pub mod hash_join_build;
pub mod limit;
pub mod monotonically_increasing_id;
pub mod outer_hash_join_probe;
pub mod pivot;
pub mod sort;
Expand Down
126 changes: 126 additions & 0 deletions src/daft-local-execution/src/sinks/monotonically_increasing_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::sync::Arc;

use common_runtime::RuntimeRef;
use daft_core::prelude::SchemaRef;
use daft_micropartition::MicroPartition;
use tracing::instrument;

use super::streaming_sink::{
StreamingSink, StreamingSinkExecuteResult, StreamingSinkFinalizeResult, StreamingSinkOutput,
StreamingSinkState,
};
use crate::{
dispatcher::{DispatchSpawner, UnorderedDispatcher},
ExecutionRuntimeContext,
};

struct MonotonicallyIncreasingIdState {
id_offset: u64,
}

impl MonotonicallyIncreasingIdState {
fn fetch_and_increment_offset(&mut self, increment: u64) -> u64 {
let id_offset = self.id_offset;
self.id_offset += increment;
id_offset
}
}

impl StreamingSinkState for MonotonicallyIncreasingIdState {
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}

struct MonotonicallyIncreasingIdParams {
column_name: String,
output_schema: SchemaRef,
}

pub struct MonotonicallyIncreasingIdSink {
params: Arc<MonotonicallyIncreasingIdParams>,
}
impl MonotonicallyIncreasingIdSink {
pub fn new(column_name: String, output_schema: SchemaRef) -> Self {
Self {
params: Arc::new(MonotonicallyIncreasingIdParams {
column_name,
output_schema,
}),
}
}
}

impl StreamingSink for MonotonicallyIncreasingIdSink {
#[instrument(skip_all, name = "MonotonicallyIncreasingIdSink::sink")]
fn execute(
&self,
input: Arc<MicroPartition>,
mut state: Box<dyn StreamingSinkState>,
runtime_ref: &RuntimeRef,
) -> StreamingSinkExecuteResult {
let params = self.params.clone();
runtime_ref
.spawn(async move {
let mut id_offset = state
.as_any_mut()
.downcast_mut::<MonotonicallyIncreasingIdState>()
.expect("MonotonicallyIncreasingIdOperator should have MonotonicallyIncreasingIdState")
.fetch_and_increment_offset(input.len() as u64);

let tables = input.get_tables()?;
let mut results = Vec::with_capacity(tables.len());
for t in tables.iter() {
let len = t.len() as u64;
results.push(t.add_monotonically_increasing_id(
0,
id_offset,
&params.column_name,
)?);
id_offset += len;
}

let out = MicroPartition::new_loaded(
params.output_schema.clone(),
results.into(),
None,
);

Ok((
state,
StreamingSinkOutput::NeedMoreInput(Some(Arc::new(out))),
))
})
.into()
}

fn name(&self) -> &'static str {
"MonotonicallyIncreasingId"
}

fn finalize(
&self,
_states: Vec<Box<dyn StreamingSinkState>>,
_runtime_ref: &RuntimeRef,
) -> StreamingSinkFinalizeResult {
Ok(None).into()
}

fn make_state(&self) -> Box<dyn StreamingSinkState> {
Box::new(MonotonicallyIncreasingIdState { id_offset: 0 })
}

// Monotonically increasing id is a memory-bound operation, so there's no performance benefit to parallelizing it.
// Furthermore, it is much simpler to implement as a single-threaded operation, since we can just keep track of the current id offset without synchronization.
fn max_concurrency(&self) -> usize {
1
}

fn dispatch_spawner(
&self,
_runtime_handle: &ExecutionRuntimeContext,
_maintain_order: bool,
) -> Arc<dyn DispatchSpawner> {
Arc::new(UnorderedDispatcher::new(None))
}
}
4 changes: 2 additions & 2 deletions src/daft-local-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod translate;
pub use plan::CatalogWrite;
pub use plan::{
ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan,
Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, PhysicalScan, PhysicalWrite, Pivot, Project,
Sample, Sort, UnGroupedAggregate, Unpivot,
Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, MonotonicallyIncreasingId, PhysicalScan,
PhysicalWrite, Pivot, Project, Sample, Sort, UnGroupedAggregate, Unpivot,
};
pub use translate::translate;
24 changes: 23 additions & 1 deletion src/daft-local-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum LocalPhysicalPlan {
Sort(Sort),
// Split(Split),
Sample(Sample),
// MonotonicallyIncreasingId(MonotonicallyIncreasingId),
MonotonicallyIncreasingId(MonotonicallyIncreasingId),
// Coalesce(Coalesce),
// Flatten(Flatten),
// FanoutRandom(FanoutRandom),
Expand Down Expand Up @@ -256,6 +256,20 @@ impl LocalPhysicalPlan {
.arced()
}

pub(crate) fn monotonically_increasing_id(
input: LocalPhysicalPlanRef,
column_name: String,
schema: SchemaRef,
) -> LocalPhysicalPlanRef {
Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId {
input,
column_name,
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn hash_join(
left: LocalPhysicalPlanRef,
right: LocalPhysicalPlanRef,
Expand Down Expand Up @@ -426,6 +440,14 @@ pub struct Sample {
pub plan_stats: PlanStats,
}

#[derive(Debug)]
pub struct MonotonicallyIncreasingId {
pub input: LocalPhysicalPlanRef,
pub column_name: String,
pub schema: SchemaRef,
pub plan_stats: PlanStats,
}

#[derive(Debug)]
pub struct UnGroupedAggregate {
pub input: LocalPhysicalPlanRef,
Expand Down
8 changes: 8 additions & 0 deletions src/daft-local-plan/src/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
log::warn!("Repartition Not supported for Local Executor!; This will be a No-Op");
translate(&repartition.input)
}
LogicalPlan::MonotonicallyIncreasingId(monotonically_increasing_id) => {
let input = translate(&monotonically_increasing_id.input)?;
Ok(LocalPhysicalPlan::monotonically_increasing_id(
input,
monotonically_increasing_id.column_name.clone(),
monotonically_increasing_id.schema.clone(),
))
}
LogicalPlan::Sink(sink) => {
use daft_logical_plan::SinkInfo;
let input = translate(&sink.input)?;
Expand Down
32 changes: 17 additions & 15 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,21 +940,23 @@ pub fn read_pyfunc_into_table_iter(
let scan_task_filters = scan_task.pushdowns.filters.clone();
let res = table_iterators
.into_iter()
.filter_map(|iter| {
Python::with_gil(|py| {
iter.downcast_bound::<pyo3::types::PyIterator>(py)
.expect("Function must return an iterator of tables")
.clone()
.next()
.map(|result| {
result
.map(|tbl| {
tbl.extract::<daft_table::python::PyTable>()
.expect("Must be a PyTable")
.table
})
.with_context(|_| PyIOSnafu)
})
.flat_map(move |iter| {
std::iter::from_fn(move || {
Python::with_gil(|py| {
iter.downcast_bound::<pyo3::types::PyIterator>(py)
.expect("Function must return an iterator of tables")
.clone()
.next()
.map(|result| {
result
.map(|tbl| {
tbl.extract::<daft_table::python::PyTable>()
.expect("Must be a PyTable")
.table
})
.with_context(|_| PyIOSnafu)
})
})
})
})
.scan(0, move |rows_seen_so_far, table| {
Expand Down
43 changes: 38 additions & 5 deletions tests/dataframe/test_monotonically_increasing_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
import pytest

from daft.datatype import DataType
from daft.io._generator import read_generator
from daft.table.table import Table
from tests.conftest import get_tests_daft_runner_name

pytestmark = pytest.mark.skipif(
get_tests_daft_runner_name() == "native",
reason="Native executor fails for these tests",
)


def test_monotonically_increasing_id_single_partition(make_df) -> None:
data = {"a": [1, 2, 3, 4, 5]}
Expand All @@ -31,6 +28,10 @@ def test_monotonically_increasing_id_empty_table(make_df) -> None:
assert df.to_pydict() == {"id": [], "a": []}


@pytest.mark.skipif(
get_tests_daft_runner_name() == "native",
reason="Native runner does not support repartitioning",
)
@pytest.mark.parametrize("repartition_nparts", [1, 2, 20, 50, 100])
def test_monotonically_increasing_id_multiple_partitions_with_into_partition(make_df, repartition_nparts) -> None:
ITEMS = [i for i in range(100)]
Expand All @@ -54,6 +55,38 @@ def test_monotonically_increasing_id_multiple_partitions_with_into_partition(mak
assert df.to_pydict() == {"id": ids, "a": ITEMS}


def test_monotonically_increasing_id_from_generator() -> None:
ITEMS = list(range(10))
table = Table.from_pydict({"a": ITEMS})

num_tables = 3
num_generators = 3

def generator():
for _ in range(num_tables):
yield table

def generators():
for _ in range(num_generators):
yield generator

df = read_generator(generators(), schema=table.schema())._add_monotonically_increasing_id().collect()

assert len(df) == 90
assert set(df.column_names) == {"id", "a"}
assert df.schema()["id"].dtype == DataType.uint64()

if get_tests_daft_runner_name() == "native":
# On the native runner, there are no partitions, so the ids are just the row numbers.
assert df.to_pydict() == {"id": list(range(90)), "a": ITEMS * 9}
else:
# On the ray / py runner, the ids are generated based on the partition number and the row number within the partition.
# The partition number is put in the upper 28 bits and the row number is put in the lower 36 bits.
# There are num_generators partitions, and each partition has num_tables * len(ITEMS) rows.
ids = [(p << 36) | c for p in range(num_generators) for c in range(num_tables * len(ITEMS))]
assert df.to_pydict() == {"id": ids, "a": ITEMS * 9}


@pytest.mark.parametrize("repartition_nparts", [1, 2, 20, 50, 100])
def test_monotonically_increasing_id_multiple_partitions_with_repartition(make_df, repartition_nparts) -> None:
ITEMS = [i for i in range(100)]
Expand Down

0 comments on commit 6a3d3fc

Please sign in to comment.