Skip to content

Commit

Permalink
complete msg only for subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
oscartbeaumont committed Jul 19, 2023
1 parent 52bdb3c commit aa997b1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 23 deletions.
20 changes: 12 additions & 8 deletions src/internal/exec/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,12 @@ impl<
}
StreamYield::Finished(f) => {
if let Some(stream) = f.take(conn.streams.as_mut()) {
this.batch.as_mut().insert(exec::Response {
id: stream.id(),
inner: ResponseInner::Complete,
});
if let StreamOrFut::OwnedStream { stream } = stream {
this.batch.as_mut().insert(exec::Response {
id: stream.id,
inner: ResponseInner::Complete,
});
}

PollResult::QueueSend
} else {
Expand All @@ -368,10 +370,12 @@ impl<
// TODO: This can be improved by: https://github.com/jonhoo/streamunordered/pull/5
for (token, _) in conn.steam_to_sub_id.drain() {
if let Some(stream) = conn.streams.as_mut().take(token) {
this.batch.as_mut().insert(exec::Response {
id: stream.id(),
inner: ResponseInner::Complete,
});
if let StreamOrFut::OwnedStream { stream } = stream {
this.batch.as_mut().insert(exec::Response {
id: stream.id,
inner: ResponseInner::Complete,
});
}
}
}
conn.steam_to_sub_id.drain().for_each(drop);
Expand Down
17 changes: 2 additions & 15 deletions src/internal/exec/stream_or_fut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,7 @@ pin_project! {
#[pin]
fut: ExecRequestFut,
},
// TODO: Don't do it like this
Done {
id: u32,
},
}
}

impl<TCtx: 'static> StreamOrFut<TCtx> {
pub fn id(&self) -> u32 {
match self {
StreamOrFut::OwnedStream { stream } => stream.id,
StreamOrFut::ExecRequestFut { fut } => fut.id,
StreamOrFut::Done { id } => *id,
}
Done,
}
}

Expand All @@ -57,7 +44,7 @@ impl<TCtx: 'static> Stream for StreamOrFut<TCtx> {
}))
}
StreamOrFutProj::ExecRequestFut { fut } => fut.poll(cx).map(|v| {
self.set(StreamOrFut::Done { id: v.id });
self.set(StreamOrFut::Done);
Some(v)
}),
StreamOrFutProj::Done { .. } => Poll::Ready(None),
Expand Down

0 comments on commit aa997b1

Please sign in to comment.