Skip to content

Commit

Permalink
[CHORE] Swordfish refactors (#3256)
Browse files Browse the repository at this point in the history
There's a couple outstanding issues / inefficiencies / ugliness in the
swordfish code. I originally intended of breaking these up into smaller
PRs, but during the process of trying to split it up, I realized that
all the changes are quite intertwined, and it may be easier on the
reviewer to just see all of them in one. That being said, I'll try my
best to explain all the changes and rationale in detail.

### Problems
- The `PipelineChannel` abstraction doesn't play well with streaming
sinks, in fact, in only really works for intermediate ops. This is
because the `PipelineChannel` assumes sending/receiving is round robin,
but streaming sinks can send data from the workers, as well as after the
finalize.
- Pipeline nodes currently send across both data and probe tables in the
same channel, which can be confusing. Furthermore, the logic of
dispatching probe tables and data is also different, adding more
complexity. Probe tables need to be broadcasted to all workers, while
data should only be distributed to a single worker.
- If an operator does not require input to be ordered, it shouldn't
dispatch work to workers in round robin, it should dispatch to any
available worker.
- Some operators don't do work in their `execute` or `finalize` methods.
These don't need to be spawned on the compute runtime.

### Proposed Changes
- Pipeline nodes should just send data across, via a simple
`Receiver<Arc<Micropartition>>`.
- Probe tables should be sent separately from data, via a
`ProbeStateBridge`.
- Implement an unordered dispatcher, an optimization for if ordering is
not required. This uses `loole` channels, which are multi-producer
multi-consumer. For consistency, use `loole` channels across all
swordfish. In the future we can think about implementing our custom
channels.
https://users.rust-lang.org/t/loole-a-safe-async-sync-multi-producer-multi-consumer-channel/113785
- Give the compute runtime to the operators, and let them decide whether
they want to spawn.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent 5c00dbc commit ca8f25d
Show file tree
Hide file tree
Showing 30 changed files with 1,478 additions and 1,078 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<T: Send + 'static> Future for RuntimeTask<T> {
type Output = DaftResult<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.joinset).poll_join_next(cx) {
match self.joinset.poll_join_next(cx) {
Poll::Ready(Some(result)) => {
Poll::Ready(result.map_err(|e| DaftError::External(e.into())))
}
Expand Down
3 changes: 3 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ futures = {workspace = true}
indexmap = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
loole = "0.4.0"
num-format = "0.4.4"
pin-project = "1"
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-util = {workspace = true}
tracing = {workspace = true}

[features]
Expand Down
123 changes: 46 additions & 77 deletions src/daft-local-execution/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,115 +1,84 @@
use std::sync::Arc;

use crate::{
pipeline::PipelineResultType,
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
};

pub type Sender<T> = tokio::sync::mpsc::Sender<T>;
pub type Receiver<T> = tokio::sync::mpsc::Receiver<T>;

pub fn create_channel<T>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
tokio::sync::mpsc::channel(buffer_size)
}

pub struct PipelineChannel {
sender: PipelineSender,
receiver: PipelineReceiver,
}

impl PipelineChannel {
pub fn new(buffer_size: usize, in_order: bool) -> Self {
if in_order {
let (senders, receivers) = (0..buffer_size).map(|_| create_channel(1)).unzip();
let sender = PipelineSender::InOrder(RoundRobinSender::new(senders));
let receiver = PipelineReceiver::InOrder(RoundRobinReceiver::new(receivers));
Self { sender, receiver }
} else {
let (sender, receiver) = create_channel(buffer_size);
let sender = PipelineSender::OutOfOrder(sender);
let receiver = PipelineReceiver::OutOfOrder(receiver);
Self { sender, receiver }
}
}

fn get_next_sender(&mut self) -> Sender<PipelineResultType> {
match &mut self.sender {
PipelineSender::InOrder(rr) => rr.get_next_sender(),
PipelineSender::OutOfOrder(sender) => sender.clone(),
}
}

pub(crate) fn get_next_sender_with_stats(
&mut self,
rt: &Arc<RuntimeStatsContext>,
) -> CountingSender {
CountingSender::new(self.get_next_sender(), rt.clone())
#[derive(Clone)]
pub(crate) struct Sender<T>(loole::Sender<T>);
impl<T> Sender<T> {
pub(crate) async fn send(&self, val: T) -> Result<(), loole::SendError<T>> {
self.0.send_async(val).await
}
}

pub fn get_receiver(self) -> PipelineReceiver {
self.receiver
#[derive(Clone)]
pub(crate) struct Receiver<T>(loole::Receiver<T>);
impl<T> Receiver<T> {
pub(crate) async fn recv(&self) -> Option<T> {
self.0.recv_async().await.ok()
}

pub(crate) fn get_receiver_with_stats(self, rt: &Arc<RuntimeStatsContext>) -> CountingReceiver {
CountingReceiver::new(self.get_receiver(), rt.clone())
pub(crate) fn blocking_recv(&self) -> Option<T> {
self.0.recv().ok()
}
}

pub enum PipelineSender {
InOrder(RoundRobinSender<PipelineResultType>),
OutOfOrder(Sender<PipelineResultType>),
}

pub struct RoundRobinSender<T> {
senders: Vec<Sender<T>>,
curr_sender_idx: usize,
pub(crate) fn create_channel<T: Clone>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = loole::bounded(buffer_size);
(Sender(tx), Receiver(rx))
}

impl<T> RoundRobinSender<T> {
pub fn new(senders: Vec<Sender<T>>) -> Self {
Self {
senders,
curr_sender_idx: 0,
/// A multi-producer, single-consumer channel that is aware of the ordering of the senders.
/// If `ordered` is true, the receiver will try to receive from each sender in a round-robin fashion.
/// This is useful when collecting results from multiple workers in a specific order.
pub(crate) fn create_ordering_aware_receiver_channel<T: Clone>(
ordered: bool,
buffer_size: usize,
) -> (Vec<Sender<T>>, OrderingAwareReceiver<T>) {
match ordered {
true => {
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip();
(
senders,
OrderingAwareReceiver::InOrder(RoundRobinReceiver::new(receiver)),
)
}
false => {
let (sender, receiver) = create_channel::<T>(buffer_size);
(
(0..buffer_size).map(|_| sender.clone()).collect(),
OrderingAwareReceiver::OutOfOrder(receiver),
)
}
}

pub fn get_next_sender(&mut self) -> Sender<T> {
let next_idx = self.curr_sender_idx;
self.curr_sender_idx = (next_idx + 1) % self.senders.len();
self.senders[next_idx].clone()
}
}

pub enum PipelineReceiver {
InOrder(RoundRobinReceiver<PipelineResultType>),
OutOfOrder(Receiver<PipelineResultType>),
pub(crate) enum OrderingAwareReceiver<T> {
InOrder(RoundRobinReceiver<T>),
OutOfOrder(Receiver<T>),
}

impl PipelineReceiver {
pub async fn recv(&mut self) -> Option<PipelineResultType> {
impl<T> OrderingAwareReceiver<T> {
pub(crate) async fn recv(&mut self) -> Option<T> {
match self {
Self::InOrder(rr) => rr.recv().await,
Self::OutOfOrder(r) => r.recv().await,
}
}
}

pub struct RoundRobinReceiver<T> {
/// A round-robin receiver that tries to receive from each receiver in a round-robin fashion.
pub(crate) struct RoundRobinReceiver<T> {
receivers: Vec<Receiver<T>>,
curr_receiver_idx: usize,
is_done: bool,
}

impl<T> RoundRobinReceiver<T> {
pub fn new(receivers: Vec<Receiver<T>>) -> Self {
fn new(receivers: Vec<Receiver<T>>) -> Self {
Self {
receivers,
curr_receiver_idx: 0,
is_done: false,
}
}

pub async fn recv(&mut self) -> Option<T> {
async fn recv(&mut self) -> Option<T> {
if self.is_done {
return None;
}
Expand Down
Loading

0 comments on commit ca8f25d

Please sign in to comment.