From 79c76df980a2831a0078108807ebdc4b5ba1a61f Mon Sep 17 00:00:00 2001 From: kixelated Date: Mon, 28 Oct 2024 16:33:20 -0700 Subject: [PATCH] Small API changes to avoid needing the Session upfront. (#216) --- moq-clock/src/clock.rs | 4 +-- moq-karp/src/catalog/broadcast.rs | 20 +++++--------- moq-karp/src/cmaf/import.rs | 24 +++++++++-------- moq-karp/src/main.rs | 12 ++++++--- moq-karp/src/produce/broadcast.rs | 45 ++++++++++++++++++------------- moq-karp/src/produce/resume.rs | 9 +++---- moq-transfork/src/model/frame.rs | 4 +-- moq-transfork/src/model/group.rs | 3 ++- 8 files changed, 63 insertions(+), 58 deletions(-) diff --git a/moq-clock/src/clock.rs b/moq-clock/src/clock.rs index 4948ad6b..febf63ed 100644 --- a/moq-clock/src/clock.rs +++ b/moq-clock/src/clock.rs @@ -48,11 +48,11 @@ impl Publisher { // Everything but the second. let base = now.format("%Y-%m-%d %H:%M:").to_string(); - segment.write_frame(base.clone().into()); + segment.write_frame(base.clone()); loop { let delta = now.format("%S").to_string(); - segment.write_frame(delta.clone().into()); + segment.write_frame(delta.clone()); let next = now + chrono::Duration::try_seconds(1).unwrap(); let next = next.with_nanosecond(0).unwrap(); diff --git a/moq-karp/src/catalog/broadcast.rs b/moq-karp/src/catalog/broadcast.rs index 92550f4f..58113395 100644 --- a/moq-karp/src/catalog/broadcast.rs +++ b/moq-karp/src/catalog/broadcast.rs @@ -59,26 +59,18 @@ impl Broadcast { Ok(parsed) } - pub fn publish(&self, session: &mut moq_transfork::Session, path: Path) -> Result { - let (mut writer, reader) = moq_transfork::Track { - path, + /// Returns the track metadata that should be used for this catalog. + pub fn track(path: Path) -> moq_transfork::Track { + moq_transfork::Track { + path: path.push("catalog.json"), priority: -1, group_order: moq_transfork::GroupOrder::Desc, group_expires: std::time::Duration::ZERO, } - .produce(); - - session.publish(reader)?; - self.update(&mut writer)?; - Ok(writer) } - pub fn update(&self, track: &mut moq_transfork::TrackProducer) -> Result<()> { - let mut group = track.append_group(); - let frame = self.to_string()?; - group.write_frame(frame.into()); - - Ok(()) + pub fn is_empty(&self) -> bool { + self.video.is_empty() && self.audio.is_empty() } } diff --git a/moq-karp/src/cmaf/import.rs b/moq-karp/src/cmaf/import.rs index 7d71b5b8..5dcb47e1 100644 --- a/moq-karp/src/cmaf/import.rs +++ b/moq-karp/src/cmaf/import.rs @@ -213,18 +213,11 @@ impl Import { // Read the media from a stream, processing moof and mdat atoms. pub async fn read_from(&mut self, input: &mut T) -> Result<()> { - loop { - tokio::select! { - res = Option::::read_from(input) => { - match res { - Ok(Some(atom)) => self.process(atom)?, - Ok(None) => return Ok(()), - Err(err) => return Err(err.into()), - } - } - _ = self.broadcast.closed() => return Err(Error::Closed), - } + while let Some(atom) = Option::::read_from(input).await? { + self.process(atom)?; } + + Ok(()) } fn process(&mut self, atom: mp4_atom::Any) -> Result<()> { @@ -298,4 +291,13 @@ impl Import { pub fn catalog(&self) -> &catalog::Broadcast { self.broadcast.catalog() } + + pub fn publish(&mut self, session: &mut moq_transfork::Session) -> Result<()> { + if self.moov.is_none() { + return Err(Error::MissingBox(Moov::KIND)); + } + self.broadcast.publish(session)?; + + Ok(()) + } } diff --git a/moq-karp/src/main.rs b/moq-karp/src/main.rs index 9791f67f..2e115af0 100644 --- a/moq-karp/src/main.rs +++ b/moq-karp/src/main.rs @@ -62,9 +62,8 @@ async fn main() -> anyhow::Result<()> { } #[tracing::instrument("publish", skip_all, err, fields(?path))] -async fn publish(session: moq_transfork::Session, path: Path) -> anyhow::Result<()> { - let producer = produce::Resumable::new(session, path); - let broadcast = producer.broadcast(); +async fn publish(mut session: moq_transfork::Session, path: Path) -> anyhow::Result<()> { + let broadcast = produce::Resumable::new(path).broadcast(); let mut input = tokio::io::stdin(); @@ -73,7 +72,12 @@ async fn publish(session: moq_transfork::Session, path: Path) -> anyhow::Result< tracing::info!(catalog = ?import.catalog()); - Ok(import.read_from(&mut input).await?) + import.publish(&mut session)?; + + tokio::select! { + res = import.read_from(&mut input) => Ok(res?), + res = session.closed() => Err(res.into()), + } } /* diff --git a/moq-karp/src/produce/broadcast.rs b/moq-karp/src/produce/broadcast.rs index d56ec90c..ee542642 100644 --- a/moq-karp/src/produce/broadcast.rs +++ b/moq-karp/src/produce/broadcast.rs @@ -1,4 +1,4 @@ -use moq_transfork::{Path, Session}; +use moq_transfork::{Path, Session, TrackConsumer, TrackProducer}; use crate::{ catalog::{self}, @@ -8,19 +8,21 @@ use crate::{ use super::{Audio, Video}; pub struct Broadcast { + pub path: Path, catalog: catalog::Broadcast, - catalog_track: Option, - session: Session, - path: Path, + catalog_producer: TrackProducer, // need to hold the track to keep it open + tracks: Vec, } impl Broadcast { - pub fn new(session: Session, path: Path) -> Self { + pub fn new(path: Path) -> Self { + let (producer, consumer) = catalog::Broadcast::track(path.clone()).produce(); + Self { - session, path, catalog: catalog::Broadcast::default(), - catalog_track: None, + catalog_producer: producer, + tracks: vec![consumer], } } @@ -32,11 +34,12 @@ impl Broadcast { } .produce(); - self.session.publish(consumer)?; let track = Video::new(producer); self.catalog.video.push(info); - self.publish()?; + self.tracks.push(consumer); + + self.update()?; Ok(track) } @@ -49,11 +52,12 @@ impl Broadcast { } .produce(); - self.session.publish(consumer)?; let track = Audio::new(producer); self.catalog.audio.push(info); - self.publish()?; + self.tracks.push(consumer); + + self.update()?; Ok(track) } @@ -62,18 +66,21 @@ impl Broadcast { &self.catalog } - fn publish(&mut self) -> Result<(), Error> { - if let Some(track) = self.catalog_track.as_mut() { - return Ok(self.catalog.update(track)?); - } + fn update(&mut self) -> Result<(), Error> { + let frame = self.catalog.to_string()?; - let path = self.path.clone().push("catalog.json"); - self.catalog_track = self.catalog.publish(&mut self.session, path)?.into(); + let mut group = self.catalog_producer.append_group(); + group.write_frame(frame); Ok(()) } - pub async fn closed(&self) { - self.session.closed().await; + /// Publish all of the *current* tracks to the session. + pub fn publish(&self, session: &mut Session) -> Result<(), Error> { + for track in &self.tracks { + session.publish(track.clone())?; + } + + Ok(()) } } diff --git a/moq-karp/src/produce/resume.rs b/moq-karp/src/produce/resume.rs index 75bf5761..13a28964 100644 --- a/moq-karp/src/produce/resume.rs +++ b/moq-karp/src/produce/resume.rs @@ -1,17 +1,16 @@ use std::time; -use moq_transfork::{Path, Session}; +use moq_transfork::Path; use super::Broadcast; pub struct Resumable { - session: Session, path: Path, } impl Resumable { - pub fn new(session: Session, path: Path) -> Self { - Self { session, path } + pub fn new(path: Path) -> Self { + Self { path } } /// Produce a broadcast using the current time as the ID. @@ -23,6 +22,6 @@ impl Resumable { .as_secs(); let path = self.path.clone().push(id.to_string()); - Broadcast::new(self.session.clone(), path) + Broadcast::new(path) } } diff --git a/moq-transfork/src/model/frame.rs b/moq-transfork/src/model/frame.rs index c6c4f602..5c425f3e 100644 --- a/moq-transfork/src/model/frame.rs +++ b/moq-transfork/src/model/frame.rs @@ -57,8 +57,8 @@ impl FrameProducer { Self { state, info } } - pub fn write(&mut self, chunk: bytes::Bytes) { - self.state.send_modify(|state| state.chunks.push(chunk)); + pub fn write>(&mut self, chunk: B) { + self.state.send_modify(|state| state.chunks.push(chunk.into())); } /// Close the stream with an error. diff --git a/moq-transfork/src/model/group.rs b/moq-transfork/src/model/group.rs index 7cc9923e..3445c36b 100644 --- a/moq-transfork/src/model/group.rs +++ b/moq-transfork/src/model/group.rs @@ -71,7 +71,8 @@ impl GroupProducer { } // Write a frame in one go - pub fn write_frame(&mut self, frame: bytes::Bytes) { + pub fn write_frame>(&mut self, frame: B) { + let frame = frame.into(); self.create_frame(frame.len()).write(frame); }