diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 68601c7e58..0efa083323 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -12,8 +12,8 @@ use daft_core::{ use daft_dsl::{col, join::get_common_join_keys, Expr}; use daft_local_plan::{ ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan, - Limit, LocalPhysicalPlan, PhysicalWrite, Pivot, Project, Sample, Sort, UnGroupedAggregate, - Unpivot, + Limit, LocalPhysicalPlan, MonotonicallyIncreasingId, PhysicalWrite, Pivot, Project, Sample, + Sort, UnGroupedAggregate, Unpivot, }; use daft_logical_plan::JoinType; use daft_micropartition::MicroPartition; @@ -38,6 +38,7 @@ use crate::{ concat::ConcatSink, hash_join_build::{HashJoinBuildSink, ProbeStateBridge}, limit::LimitSink, + monotonically_increasing_id::MonotonicallyIncreasingIdSink, outer_hash_join_probe::OuterHashJoinProbeSink, pivot::PivotSink, sort::SortSink, @@ -281,7 +282,18 @@ pub fn physical_plan_to_pipeline( let child_node = physical_plan_to_pipeline(input, psets, cfg)?; BlockingSinkNode::new(Arc::new(sort_sink), child_node).boxed() } - + LocalPhysicalPlan::MonotonicallyIncreasingId(MonotonicallyIncreasingId { + input, + column_name, + schema, + .. + }) => { + let child_node = physical_plan_to_pipeline(input, psets, cfg)?; + let monotonically_increasing_id_sink = + MonotonicallyIncreasingIdSink::new(column_name.clone(), schema.clone()); + StreamingSinkNode::new(Arc::new(monotonically_increasing_id_sink), vec![child_node]) + .boxed() + } LocalPhysicalPlan::HashJoin(HashJoin { left, right, diff --git a/src/daft-local-execution/src/sinks/mod.rs b/src/daft-local-execution/src/sinks/mod.rs index 768427ce62..474c9fe33f 100644 --- a/src/daft-local-execution/src/sinks/mod.rs +++ b/src/daft-local-execution/src/sinks/mod.rs @@ -3,6 +3,7 @@ pub mod blocking_sink; pub mod concat; pub mod hash_join_build; pub mod limit; +pub mod monotonically_increasing_id; pub mod outer_hash_join_probe; pub mod pivot; pub mod sort; diff --git a/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs b/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs new file mode 100644 index 0000000000..4c5f268bed --- /dev/null +++ b/src/daft-local-execution/src/sinks/monotonically_increasing_id.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use common_runtime::RuntimeRef; +use daft_core::prelude::SchemaRef; +use daft_micropartition::MicroPartition; +use tracing::instrument; + +use super::streaming_sink::{ + StreamingSink, StreamingSinkExecuteResult, StreamingSinkFinalizeResult, StreamingSinkOutput, + StreamingSinkState, +}; +use crate::{ + dispatcher::{DispatchSpawner, UnorderedDispatcher}, + ExecutionRuntimeContext, +}; + +struct MonotonicallyIncreasingIdState { + id_offset: u64, +} + +impl MonotonicallyIncreasingIdState { + fn fetch_and_increment_offset(&mut self, increment: u64) -> u64 { + let id_offset = self.id_offset; + self.id_offset += increment; + id_offset + } +} + +impl StreamingSinkState for MonotonicallyIncreasingIdState { + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} + +struct MonotonicallyIncreasingIdParams { + column_name: String, + output_schema: SchemaRef, +} + +pub struct MonotonicallyIncreasingIdSink { + params: Arc, +} +impl MonotonicallyIncreasingIdSink { + pub fn new(column_name: String, output_schema: SchemaRef) -> Self { + Self { + params: Arc::new(MonotonicallyIncreasingIdParams { + column_name, + output_schema, + }), + } + } +} + +impl StreamingSink for MonotonicallyIncreasingIdSink { + #[instrument(skip_all, name = "MonotonicallyIncreasingIdSink::sink")] + fn execute( + &self, + input: Arc, + mut state: Box, + runtime_ref: &RuntimeRef, + ) -> StreamingSinkExecuteResult { + let params = self.params.clone(); + runtime_ref + .spawn(async move { + let mut id_offset = state + .as_any_mut() + .downcast_mut::() + .expect("MonotonicallyIncreasingIdOperator should have MonotonicallyIncreasingIdState") + .fetch_and_increment_offset(input.len() as u64); + + let tables = input.get_tables()?; + let mut results = Vec::with_capacity(tables.len()); + for t in tables.iter() { + let len = t.len() as u64; + results.push(t.add_monotonically_increasing_id( + 0, + id_offset, + ¶ms.column_name, + )?); + id_offset += len; + } + + let out = MicroPartition::new_loaded( + params.output_schema.clone(), + results.into(), + None, + ); + + Ok(( + state, + StreamingSinkOutput::NeedMoreInput(Some(Arc::new(out))), + )) + }) + .into() + } + + fn name(&self) -> &'static str { + "MonotonicallyIncreasingId" + } + + fn finalize( + &self, + _states: Vec>, + _runtime_ref: &RuntimeRef, + ) -> StreamingSinkFinalizeResult { + Ok(None).into() + } + + fn make_state(&self) -> Box { + Box::new(MonotonicallyIncreasingIdState { id_offset: 0 }) + } + + // Monotonically increasing id is a memory-bound operation, so there's no performance benefit to parallelizing it. + // Furthermore, it is much simpler to implement as a single-threaded operation, since we can just keep track of the current id offset without synchronization. + fn max_concurrency(&self) -> usize { + 1 + } + + fn dispatch_spawner( + &self, + _runtime_handle: &ExecutionRuntimeContext, + _maintain_order: bool, + ) -> Arc { + Arc::new(UnorderedDispatcher::new(None)) + } +} diff --git a/src/daft-local-plan/src/lib.rs b/src/daft-local-plan/src/lib.rs index 67d1eecb2a..53a760e982 100644 --- a/src/daft-local-plan/src/lib.rs +++ b/src/daft-local-plan/src/lib.rs @@ -6,7 +6,7 @@ mod translate; pub use plan::CatalogWrite; pub use plan::{ ActorPoolProject, Concat, EmptyScan, Explode, Filter, HashAggregate, HashJoin, InMemoryScan, - Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, PhysicalScan, PhysicalWrite, Pivot, Project, - Sample, Sort, UnGroupedAggregate, Unpivot, + Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, MonotonicallyIncreasingId, PhysicalScan, + PhysicalWrite, Pivot, Project, Sample, Sort, UnGroupedAggregate, Unpivot, }; pub use translate::translate; diff --git a/src/daft-local-plan/src/plan.rs b/src/daft-local-plan/src/plan.rs index 12dd17238a..6e6aaa9e3c 100644 --- a/src/daft-local-plan/src/plan.rs +++ b/src/daft-local-plan/src/plan.rs @@ -21,7 +21,7 @@ pub enum LocalPhysicalPlan { Sort(Sort), // Split(Split), Sample(Sample), - // MonotonicallyIncreasingId(MonotonicallyIncreasingId), + MonotonicallyIncreasingId(MonotonicallyIncreasingId), // Coalesce(Coalesce), // Flatten(Flatten), // FanoutRandom(FanoutRandom), @@ -256,6 +256,20 @@ impl LocalPhysicalPlan { .arced() } + pub(crate) fn monotonically_increasing_id( + input: LocalPhysicalPlanRef, + column_name: String, + schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId { + input, + column_name, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + pub(crate) fn hash_join( left: LocalPhysicalPlanRef, right: LocalPhysicalPlanRef, @@ -426,6 +440,14 @@ pub struct Sample { pub plan_stats: PlanStats, } +#[derive(Debug)] +pub struct MonotonicallyIncreasingId { + pub input: LocalPhysicalPlanRef, + pub column_name: String, + pub schema: SchemaRef, + pub plan_stats: PlanStats, +} + #[derive(Debug)] pub struct UnGroupedAggregate { pub input: LocalPhysicalPlanRef, diff --git a/src/daft-local-plan/src/translate.rs b/src/daft-local-plan/src/translate.rs index b04bea550e..884bec04de 100644 --- a/src/daft-local-plan/src/translate.rs +++ b/src/daft-local-plan/src/translate.rs @@ -160,6 +160,14 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { log::warn!("Repartition Not supported for Local Executor!; This will be a No-Op"); translate(&repartition.input) } + LogicalPlan::MonotonicallyIncreasingId(monotonically_increasing_id) => { + let input = translate(&monotonically_increasing_id.input)?; + Ok(LocalPhysicalPlan::monotonically_increasing_id( + input, + monotonically_increasing_id.column_name.clone(), + monotonically_increasing_id.schema.clone(), + )) + } LogicalPlan::Sink(sink) => { use daft_logical_plan::SinkInfo; let input = translate(&sink.input)?; diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 39bc7ad5c5..845fe12263 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -940,21 +940,23 @@ pub fn read_pyfunc_into_table_iter( let scan_task_filters = scan_task.pushdowns.filters.clone(); let res = table_iterators .into_iter() - .filter_map(|iter| { - Python::with_gil(|py| { - iter.downcast_bound::(py) - .expect("Function must return an iterator of tables") - .clone() - .next() - .map(|result| { - result - .map(|tbl| { - tbl.extract::() - .expect("Must be a PyTable") - .table - }) - .with_context(|_| PyIOSnafu) - }) + .flat_map(move |iter| { + std::iter::from_fn(move || { + Python::with_gil(|py| { + iter.downcast_bound::(py) + .expect("Function must return an iterator of tables") + .clone() + .next() + .map(|result| { + result + .map(|tbl| { + tbl.extract::() + .expect("Must be a PyTable") + .table + }) + .with_context(|_| PyIOSnafu) + }) + }) }) }) .scan(0, move |rows_seen_so_far, table| { diff --git a/tests/dataframe/test_monotonically_increasing_id.py b/tests/dataframe/test_monotonically_increasing_id.py index 4f1929ce72..2d52ec4869 100644 --- a/tests/dataframe/test_monotonically_increasing_id.py +++ b/tests/dataframe/test_monotonically_increasing_id.py @@ -3,13 +3,10 @@ import pytest from daft.datatype import DataType +from daft.io._generator import read_generator +from daft.table.table import Table from tests.conftest import get_tests_daft_runner_name -pytestmark = pytest.mark.skipif( - get_tests_daft_runner_name() == "native", - reason="Native executor fails for these tests", -) - def test_monotonically_increasing_id_single_partition(make_df) -> None: data = {"a": [1, 2, 3, 4, 5]} @@ -31,6 +28,10 @@ def test_monotonically_increasing_id_empty_table(make_df) -> None: assert df.to_pydict() == {"id": [], "a": []} +@pytest.mark.skipif( + get_tests_daft_runner_name() == "native", + reason="Native runner does not support repartitioning", +) @pytest.mark.parametrize("repartition_nparts", [1, 2, 20, 50, 100]) def test_monotonically_increasing_id_multiple_partitions_with_into_partition(make_df, repartition_nparts) -> None: ITEMS = [i for i in range(100)] @@ -54,6 +55,38 @@ def test_monotonically_increasing_id_multiple_partitions_with_into_partition(mak assert df.to_pydict() == {"id": ids, "a": ITEMS} +def test_monotonically_increasing_id_from_generator() -> None: + ITEMS = list(range(10)) + table = Table.from_pydict({"a": ITEMS}) + + num_tables = 3 + num_generators = 3 + + def generator(): + for _ in range(num_tables): + yield table + + def generators(): + for _ in range(num_generators): + yield generator + + df = read_generator(generators(), schema=table.schema())._add_monotonically_increasing_id().collect() + + assert len(df) == 90 + assert set(df.column_names) == {"id", "a"} + assert df.schema()["id"].dtype == DataType.uint64() + + if get_tests_daft_runner_name() == "native": + # On the native runner, there are no partitions, so the ids are just the row numbers. + assert df.to_pydict() == {"id": list(range(90)), "a": ITEMS * 9} + else: + # On the ray / py runner, the ids are generated based on the partition number and the row number within the partition. + # The partition number is put in the upper 28 bits and the row number is put in the lower 36 bits. + # There are num_generators partitions, and each partition has num_tables * len(ITEMS) rows. + ids = [(p << 36) | c for p in range(num_generators) for c in range(num_tables * len(ITEMS))] + assert df.to_pydict() == {"id": ids, "a": ITEMS * 9} + + @pytest.mark.parametrize("repartition_nparts", [1, 2, 20, 50, 100]) def test_monotonically_increasing_id_multiple_partitions_with_repartition(make_df, repartition_nparts) -> None: ITEMS = [i for i in range(100)]