diff --git a/moq-karp/src/cmaf/error.rs b/moq-karp/src/cmaf/error.rs index 0ff73b95..3c5874f2 100644 --- a/moq-karp/src/cmaf/error.rs +++ b/moq-karp/src/cmaf/error.rs @@ -50,6 +50,9 @@ pub enum Error { #[error("trailing data")] TrailingData, + #[error("closed")] + Closed, + #[error("unsupported track: {0}")] UnsupportedTrack(&'static str), diff --git a/moq-karp/src/cmaf/import.rs b/moq-karp/src/cmaf/import.rs index 6fa9d9ad..79e3f988 100644 --- a/moq-karp/src/cmaf/import.rs +++ b/moq-karp/src/cmaf/import.rs @@ -36,6 +36,10 @@ impl Import { } pub fn parse(&mut self, data: &[u8]) -> Result<()> { + if self.broadcast.is_closed() { + return Err(Error::Closed); + } + if !self.buffer.is_empty() { let mut buffer = std::mem::replace(&mut self.buffer, BytesMut::new()); buffer.extend_from_slice(data); @@ -217,11 +221,18 @@ impl Import { // Read the media from a stream, processing moof and mdat atoms. pub async fn read_from(&mut self, input: &mut T) -> Result<()> { - while let Some(atom) = Option::::read_from(input).await? { - self.process(atom)?; + loop { + tokio::select! { + res = Option::::read_from(input) => { + match res { + Ok(Some(atom)) => self.process(atom)?, + Ok(None) => return Ok(()), + Err(err) => return Err(err.into()), + } + } + _ = self.broadcast.closed() => return Err(Error::Closed), + } } - - Ok(()) } fn process(&mut self, atom: mp4_atom::Any) -> Result<()> { diff --git a/moq-karp/src/media/producer.rs b/moq-karp/src/media/producer.rs index db7d22d8..be861b61 100644 --- a/moq-karp/src/media/producer.rs +++ b/moq-karp/src/media/producer.rs @@ -56,6 +56,14 @@ impl BroadcastProducer { Ok(()) } + + pub async fn closed(&self) { + self.inner.closed().await + } + + pub fn is_closed(&self) -> bool { + self.inner.is_closed() + } } pub struct TrackProducer { diff --git a/moq-transfork/src/model/broadcast.rs b/moq-transfork/src/model/broadcast.rs index f4d7aa4d..2f6957ff 100644 --- a/moq-transfork/src/model/broadcast.rs +++ b/moq-transfork/src/model/broadcast.rs @@ -133,11 +133,11 @@ impl BroadcastProducer { } // Returns when there are no references to the consumer - pub async fn unused(&self) { + pub async fn closed(&self) { self.state.closed().await } - pub fn is_unused(&self) -> bool { + pub fn is_closed(&self) -> bool { !self.state.is_closed() } } diff --git a/moq-transfork/src/session/publisher.rs b/moq-transfork/src/session/publisher.rs index 07ad88c8..9eff8ed6 100644 --- a/moq-transfork/src/session/publisher.rs +++ b/moq-transfork/src/session/publisher.rs @@ -27,9 +27,13 @@ impl Publisher { #[tracing::instrument("publish", skip_all, err, fields(?broadcast))] pub fn publish(&mut self, broadcast: BroadcastConsumer) -> Result<(), Error> { let active = self.announced.insert(broadcast.clone())?; + let session = self.session.clone(); spawn(async move { - broadcast.closed().await.ok(); + tokio::select! { + _ = broadcast.closed() => (), + _ = session.closed() => (), + } drop(active); }); diff --git a/moq-transfork/src/session/subscriber.rs b/moq-transfork/src/session/subscriber.rs index 5dfa9ea1..cb5e97b3 100644 --- a/moq-transfork/src/session/subscriber.rs +++ b/moq-transfork/src/session/subscriber.rs @@ -112,9 +112,14 @@ impl Subscriber { writer.route_tracks(router.1); let this = self.clone(); + let session = self.session.clone(); spawn(async move { - this.run_router(writer, router.0).await; + tokio::select! { + _ = this.run_router(writer, router.0) => (), + _ = session.closed() => (), + }; + drop(served); });