Skip to content

Commit

Permalink
Support closing broadcasts. (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Oct 18, 2024
1 parent bda46cd commit 79d8983
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 8 deletions.
3 changes: 3 additions & 0 deletions moq-karp/src/cmaf/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub enum Error {
#[error("trailing data")]
TrailingData,

#[error("closed")]
Closed,

#[error("unsupported track: {0}")]
UnsupportedTrack(&'static str),

Expand Down
19 changes: 15 additions & 4 deletions moq-karp/src/cmaf/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl Import {
}

pub fn parse(&mut self, data: &[u8]) -> Result<()> {
if self.broadcast.is_closed() {
return Err(Error::Closed);
}

if !self.buffer.is_empty() {
let mut buffer = std::mem::replace(&mut self.buffer, BytesMut::new());
buffer.extend_from_slice(data);
Expand Down Expand Up @@ -217,11 +221,18 @@ impl Import {

// Read the media from a stream, processing moof and mdat atoms.
pub async fn read_from<T: AsyncRead + Unpin>(&mut self, input: &mut T) -> Result<()> {
while let Some(atom) = Option::<mp4_atom::Any>::read_from(input).await? {
self.process(atom)?;
loop {
tokio::select! {
res = Option::<mp4_atom::Any>::read_from(input) => {
match res {
Ok(Some(atom)) => self.process(atom)?,
Ok(None) => return Ok(()),
Err(err) => return Err(err.into()),
}
}
_ = self.broadcast.closed() => return Err(Error::Closed),
}
}

Ok(())
}

fn process(&mut self, atom: mp4_atom::Any) -> Result<()> {
Expand Down
8 changes: 8 additions & 0 deletions moq-karp/src/media/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ impl BroadcastProducer {

Ok(())
}

pub async fn closed(&self) {
self.inner.closed().await
}

pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}

pub struct TrackProducer {
Expand Down
4 changes: 2 additions & 2 deletions moq-transfork/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ impl BroadcastProducer {
}

// Returns when there are no references to the consumer
pub async fn unused(&self) {
pub async fn closed(&self) {
self.state.closed().await
}

pub fn is_unused(&self) -> bool {
pub fn is_closed(&self) -> bool {
!self.state.is_closed()
}
}
Expand Down
6 changes: 5 additions & 1 deletion moq-transfork/src/session/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ impl Publisher {
#[tracing::instrument("publish", skip_all, err, fields(?broadcast))]
pub fn publish(&mut self, broadcast: BroadcastConsumer) -> Result<(), Error> {
let active = self.announced.insert(broadcast.clone())?;
let session = self.session.clone();

spawn(async move {
broadcast.closed().await.ok();
tokio::select! {
_ = broadcast.closed() => (),
_ = session.closed() => (),
}
drop(active);
});

Expand Down
7 changes: 6 additions & 1 deletion moq-transfork/src/session/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ impl Subscriber {
writer.route_tracks(router.1);

let this = self.clone();
let session = self.session.clone();

spawn(async move {
this.run_router(writer, router.0).await;
tokio::select! {
_ = this.run_router(writer, router.0) => (),
_ = session.closed() => (),
};

drop(served);
});

Expand Down

0 comments on commit 79d8983

Please sign in to comment.