From aa997b10b1d5f25d1434c1b913361063264c2ef4 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Wed, 19 Jul 2023 10:44:52 +0800 Subject: [PATCH] complete msg only for subscription --- src/internal/exec/connection.rs | 20 ++++++++++++-------- src/internal/exec/stream_or_fut.rs | 17 ++--------------- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/src/internal/exec/connection.rs b/src/internal/exec/connection.rs index 20c268be..f533dfa8 100644 --- a/src/internal/exec/connection.rs +++ b/src/internal/exec/connection.rs @@ -338,10 +338,12 @@ impl< } StreamYield::Finished(f) => { if let Some(stream) = f.take(conn.streams.as_mut()) { - this.batch.as_mut().insert(exec::Response { - id: stream.id(), - inner: ResponseInner::Complete, - }); + if let StreamOrFut::OwnedStream { stream } = stream { + this.batch.as_mut().insert(exec::Response { + id: stream.id, + inner: ResponseInner::Complete, + }); + } PollResult::QueueSend } else { @@ -368,10 +370,12 @@ impl< // TODO: This can be improved by: https://github.com/jonhoo/streamunordered/pull/5 for (token, _) in conn.steam_to_sub_id.drain() { if let Some(stream) = conn.streams.as_mut().take(token) { - this.batch.as_mut().insert(exec::Response { - id: stream.id(), - inner: ResponseInner::Complete, - }); + if let StreamOrFut::OwnedStream { stream } = stream { + this.batch.as_mut().insert(exec::Response { + id: stream.id, + inner: ResponseInner::Complete, + }); + } } } conn.steam_to_sub_id.drain().for_each(drop); diff --git a/src/internal/exec/stream_or_fut.rs b/src/internal/exec/stream_or_fut.rs index 438a765e..e9aa97c4 100644 --- a/src/internal/exec/stream_or_fut.rs +++ b/src/internal/exec/stream_or_fut.rs @@ -23,20 +23,7 @@ pin_project! { #[pin] fut: ExecRequestFut, }, - // TODO: Don't do it like this - Done { - id: u32, - }, - } -} - -impl StreamOrFut { - pub fn id(&self) -> u32 { - match self { - StreamOrFut::OwnedStream { stream } => stream.id, - StreamOrFut::ExecRequestFut { fut } => fut.id, - StreamOrFut::Done { id } => *id, - } + Done, } } @@ -57,7 +44,7 @@ impl Stream for StreamOrFut { })) } StreamOrFutProj::ExecRequestFut { fut } => fut.poll(cx).map(|v| { - self.set(StreamOrFut::Done { id: v.id }); + self.set(StreamOrFut::Done); Some(v) }), StreamOrFutProj::Done { .. } => Poll::Ready(None),