Skip to content

Commit

Permalink
pin project
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Nov 14, 2024
1 parent 27b025d commit e751f61
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<T: Send + 'static> Future for RuntimeTask<T> {
type Output = DaftResult<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.joinset).poll_join_next(cx) {
match self.joinset.poll_join_next(cx) {
Poll::Ready(Some(result)) => {
Poll::Ready(result.map_err(|e| DaftError::External(e.into())))
}
Expand Down
1 change: 1 addition & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ lazy_static = {workspace = true}
log = {workspace = true}
loole = "0.4.0"
num-format = "0.4.4"
pin-project = "1"
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
tokio = {workspace = true}
Expand Down
17 changes: 8 additions & 9 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ lazy_static! {
/// It can be either `Ready` or `Pending`.
/// If the output is `Ready`, the value is immediately available.
/// If the output is `Pending`, the value is not yet available and a `RuntimeTask` is returned.
#[pin_project::pin_project(project = OperatorOutputProj)]
pub(crate) enum OperatorOutput<T> {
Ready(Option<T>),
Pending(RuntimeTask<T>),
Pending(#[pin] RuntimeTask<T>),
}

impl<T: Send + Sync + Unpin + 'static> Future for OperatorOutput<T> {
type Output = DaftResult<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

match this {
Self::Ready(value) => {
match self.project() {
OperatorOutputProj::Ready(value) => {
let value = value.take().unwrap();
Poll::Ready(Ok(value))
}
Self::Pending(task) => Pin::new(task).poll(cx),
OperatorOutputProj::Pending(task) => task.poll(cx),
}
}
}
Expand Down Expand Up @@ -89,13 +88,13 @@ impl<T: 'static> TaskSet<T> {
}
}

struct SpawnedTask<T>(tokio::task::JoinHandle<T>);
#[pin_project::pin_project]
struct SpawnedTask<T>(#[pin] tokio::task::JoinHandle<T>);
impl<T> Future for SpawnedTask<T> {
type Output = crate::Result<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Pin::new(&mut this.0).poll(cx).map(|r| r.context(JoinSnafu))
self.project().0.poll(cx).map(|r| r.context(JoinSnafu))
}
}

Expand Down

0 comments on commit e751f61

Please sign in to comment.