From ea87a8eb814dd17e35f72450551c104098fc6e9f Mon Sep 17 00:00:00 2001 From: kixelated Date: Tue, 29 Oct 2024 14:04:31 -0700 Subject: [PATCH] Karp API improvements (#220) --- moq-karp/src/catalog/broadcast.rs | 1 - moq-karp/src/catalog/video.rs | 16 ----- moq-karp/src/cmaf/import.rs | 67 +++++++-------------- moq-karp/src/consume/broadcast.rs | 10 +-- moq-karp/src/consume/mod.rs | 6 +- moq-karp/src/consume/{audio.rs => track.rs} | 6 +- moq-karp/src/consume/video.rs | 67 --------------------- moq-karp/src/lib.rs | 3 + moq-karp/src/main.rs | 5 +- moq-karp/src/media/timestamp.rs | 12 ++-- moq-karp/src/produce/audio.rs | 40 ------------ moq-karp/src/produce/broadcast.rs | 10 +-- moq-karp/src/produce/mod.rs | 6 +- moq-karp/src/produce/track.rs | 31 ++++++++++ moq-karp/src/produce/video.rs | 38 ------------ moq-native/src/lib.rs | 4 ++ moq-transfork/src/coding/path.rs | 2 +- moq-transfork/src/model/path.rs | 13 +++- moq-transfork/src/session/publisher.rs | 2 +- 19 files changed, 97 insertions(+), 242 deletions(-) rename moq-karp/src/consume/{audio.rs => track.rs} (97%) delete mode 100644 moq-karp/src/consume/video.rs delete mode 100644 moq-karp/src/produce/audio.rs create mode 100644 moq-karp/src/produce/track.rs delete mode 100644 moq-karp/src/produce/video.rs diff --git a/moq-karp/src/catalog/broadcast.rs b/moq-karp/src/catalog/broadcast.rs index 58113395..27a63a20 100644 --- a/moq-karp/src/catalog/broadcast.rs +++ b/moq-karp/src/catalog/broadcast.rs @@ -130,7 +130,6 @@ mod test { width: 1280, height: 720, }, - layers: Default::default(), bitrate: Some(6_000_000), }], audio: vec![Audio { diff --git a/moq-karp/src/catalog/video.rs b/moq-karp/src/catalog/video.rs index 4a30dd1d..a92db43c 100644 --- a/moq-karp/src/catalog/video.rs +++ b/moq-karp/src/catalog/video.rs @@ -27,20 +27,4 @@ pub struct Video { #[serde(default)] pub bitrate: Option, - - // Additional enhancement layers - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub layers: Vec, -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct VideoLayer { - /// The name of the track, relative to the broadcast path. - pub name: String, - - /// The priority of the track, relative to other tracks in the same broadcast. - pub priority: i8, - - /// The resolution of this layer. - pub resolution: Dimensions, } diff --git a/moq-karp/src/cmaf/import.rs b/moq-karp/src/cmaf/import.rs index 5dcb47e1..f9672339 100644 --- a/moq-karp/src/cmaf/import.rs +++ b/moq-karp/src/cmaf/import.rs @@ -1,10 +1,14 @@ -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use mp4_atom::{Any, AsyncReadFrom, Atom, DecodeMaybe, Esds, Moof, Moov, Trak}; use std::collections::HashMap; use tokio::io::AsyncRead; use super::{util, Error, Result}; -use crate::{catalog, media::Timestamp, produce}; +use crate::{ + catalog, + media::{self, Timestamp}, + produce, +}; /// Converts fMP4 -> Karp pub struct Import { @@ -15,8 +19,7 @@ pub struct Import { broadcast: produce::Broadcast, // A lookup to tracks in the broadcast - audio: HashMap, - video: HashMap, + tracks: HashMap, // The moov atom at the start of the file. moov: Option, @@ -30,8 +33,7 @@ impl Import { Self { buffer: BytesMut::new(), broadcast, - audio: HashMap::default(), - video: HashMap::default(), + tracks: HashMap::default(), moov: None, moof: None, } @@ -68,20 +70,20 @@ impl Import { let track_id = trak.tkhd.track_id; let handler = &trak.mdia.hdlr.handler; - match handler.as_ref() { + let track = match handler.as_ref() { b"vide" => { let track = Self::init_video(trak)?; - let video = self.broadcast.create_video(track)?; - self.video.insert(track_id, video); + self.broadcast.create_video(track)? } b"soun" => { let track = Self::init_audio(trak)?; - let audio = self.broadcast.create_audio(track)?; - self.audio.insert(track_id, audio); + self.broadcast.create_audio(track)? } b"sbtl" => return Err(Error::UnsupportedTrack("subtitle")), _ => return Err(Error::UnsupportedTrack("unknown")), }; + + self.tracks.insert(track_id, track); } self.moov = Some(moov); @@ -112,7 +114,6 @@ impl Import { } .into(), description: description.freeze(), - layers: vec![], bitrate: None, } } else if let Some(hev1) = &stsd.hev1 { @@ -159,7 +160,6 @@ impl Import { width: vp09.width, height: vp09.height, }, - layers: vec![], bitrate: None, } } else { @@ -230,30 +230,6 @@ impl Import { self.init(moov)?; } Any::Moof(moof) => { - let track_id = util::frame_track_id(&moof)?; - let keyframe = util::frame_is_key(&moof); - - if keyframe { - let moov = self.moov.as_ref().ok_or(Error::MissingBox(Moov::KIND))?; - let trak = moov - .trak - .iter() - .find(|trak| trak.tkhd.track_id == track_id) - .ok_or(Error::UnknownTrack)?; - - // If this is a video track, start a new group for the keyframe. - if trak.mdia.hdlr.handler == b"vide".into() { - // Start a new group for the keyframe. - for track in self.video.values_mut() { - track.keyframe(); - } - - for track in self.audio.values_mut() { - track.segment(); - } - } - } - if self.moof.is_some() { // Two moof boxes in a row. return Err(Error::DuplicateBox(Moof::KIND)); @@ -269,15 +245,16 @@ impl Import { let timestamp = util::frame_timestamp(&moof)?; let timescale = util::frame_timescale(moov, &moof)?; - let timestamp = Timestamp::from_scale(timestamp, timescale as _); + let timestamp = Timestamp::from_units(timestamp, timescale as _); - if let Some(video) = self.video.get_mut(&track_id) { - video.write(timestamp, mdat.data.into()); - } else if let Some(audio) = self.audio.get_mut(&track_id) { - audio.write(timestamp, mdat.data.into()); - } else { - return Err(Error::UnknownTrack); - } + let frame = media::Frame { + timestamp, + keyframe: util::frame_is_key(&moof), + payload: Bytes::from(mdat.data), + }; + + let track = self.tracks.get_mut(&track_id).ok_or(Error::UnknownTrack)?; + track.write(frame); } _ => { diff --git a/moq-karp/src/consume/broadcast.rs b/moq-karp/src/consume/broadcast.rs index 2eb9a873..e60bf4c2 100644 --- a/moq-karp/src/consume/broadcast.rs +++ b/moq-karp/src/consume/broadcast.rs @@ -3,7 +3,7 @@ use crate::{catalog, Error}; use moq_transfork::{Path, Session}; use tokio::sync::mpsc; -use super::{Audio, Video}; +use super::Track; #[derive(Clone)] pub struct Broadcast { @@ -41,7 +41,7 @@ impl Broadcast { } // This API could be improved - pub fn video(&self, name: &str) -> Result { + pub fn video(&self, name: &str) -> Result { let info = self.find_video(name)?; let track = moq_transfork::Track { @@ -51,10 +51,10 @@ impl Broadcast { }; let track = self.session.subscribe(track); - Ok(Video::new(track)) + Ok(Track::new(track)) } - pub fn audio(&self, name: &str) -> Result { + pub fn audio(&self, name: &str) -> Result { let info = self.find_audio(name)?; let track = moq_transfork::Track { @@ -64,7 +64,7 @@ impl Broadcast { }; let track = self.session.subscribe(track); - Ok(Audio::new(track)) + Ok(Track::new(track)) } fn find_audio(&self, name: &str) -> Result<&catalog::Audio, Error> { diff --git a/moq-karp/src/consume/mod.rs b/moq-karp/src/consume/mod.rs index 7a7739f2..3a7e6848 100644 --- a/moq-karp/src/consume/mod.rs +++ b/moq-karp/src/consume/mod.rs @@ -1,9 +1,7 @@ -mod audio; mod broadcast; mod resume; -mod video; +mod track; -pub use audio::*; pub use broadcast::*; pub use resume::*; -pub use video::*; +pub use track::*; diff --git a/moq-karp/src/consume/audio.rs b/moq-karp/src/consume/track.rs similarity index 97% rename from moq-karp/src/consume/audio.rs rename to moq-karp/src/consume/track.rs index 8c0f20a2..e8213f23 100644 --- a/moq-karp/src/consume/audio.rs +++ b/moq-karp/src/consume/track.rs @@ -1,16 +1,16 @@ +use crate::media::Timestamp; use crate::Error; -use crate::media::Timestamp; use crate::{media::Frame, util::FuturesExt}; use moq_transfork::coding::*; -pub struct Audio { +pub struct Track { track: moq_transfork::TrackConsumer, group: Option, } -impl Audio { +impl Track { pub(super) fn new(track: moq_transfork::TrackConsumer) -> Self { Self { track, group: None } } diff --git a/moq-karp/src/consume/video.rs b/moq-karp/src/consume/video.rs deleted file mode 100644 index 02be9402..00000000 --- a/moq-karp/src/consume/video.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::media::Timestamp; -use crate::Error; - -use crate::{media::Frame, util::FuturesExt}; - -use moq_transfork::coding::*; - -pub struct Video { - track: moq_transfork::TrackConsumer, - group: Option, -} - -impl Video { - pub(super) fn new(track: moq_transfork::TrackConsumer) -> Self { - Self { track, group: None } - } - - pub async fn read(&mut self) -> Result, Error> { - let mut keyframe = false; - - if self.group.is_none() { - self.group = self.track.next_group().await?; - keyframe = true; - - if self.group.is_none() { - return Ok(None); - } - } - - loop { - tokio::select! { - biased; - Some(res) = self.group.as_mut().unwrap().read_frame().transpose() => { - let raw = res?; - let frame = self.decode_frame(raw, keyframe)?; - return Ok(Some(frame)); - }, - Some(res) = self.track.next_group().transpose() => { - let group = res?; - - if group.sequence < self.group.as_ref().unwrap().sequence { - // Ignore old groups - continue; - } - - // TODO use a configurable latency before moving to the next group. - self.group = Some(group); - keyframe = true; - }, - else => return Ok(None), - } - } - } - - fn decode_frame(&self, mut payload: Bytes, keyframe: bool) -> Result { - let micros = u64::decode(&mut payload)?; - let timestamp = Timestamp::from_micros(micros); - - let frame = Frame { - keyframe, - timestamp, - payload, - }; - - Ok(frame) - } -} diff --git a/moq-karp/src/lib.rs b/moq-karp/src/lib.rs index 43be485f..dd9ec548 100644 --- a/moq-karp/src/lib.rs +++ b/moq-karp/src/lib.rs @@ -8,3 +8,6 @@ mod error; pub use error::*; pub(crate) mod util; + +// export the moq-transfork version in use +pub use moq_transfork; diff --git a/moq-karp/src/main.rs b/moq-karp/src/main.rs index 2e115af0..f9f8f439 100644 --- a/moq-karp/src/main.rs +++ b/moq-karp/src/main.rs @@ -6,7 +6,6 @@ use url::Url; use moq_karp::{cmaf, produce}; use moq_native::quic; -use moq_transfork::*; #[derive(Parser, Clone)] struct Cli { @@ -53,7 +52,7 @@ async fn main() -> anyhow::Result<()> { let session = quic.client.connect(&cli.url).await?; let session = moq_transfork::Session::connect(session).await?; - let path = Path::new(cli.path); + let path = moq_transfork::Path::new(cli.path); match cli.command { Command::Publish => publish(session, path).await, @@ -62,7 +61,7 @@ async fn main() -> anyhow::Result<()> { } #[tracing::instrument("publish", skip_all, err, fields(?path))] -async fn publish(mut session: moq_transfork::Session, path: Path) -> anyhow::Result<()> { +async fn publish(mut session: moq_transfork::Session, path: moq_transfork::Path) -> anyhow::Result<()> { let broadcast = produce::Resumable::new(path).broadcast(); let mut input = tokio::io::stdin(); diff --git a/moq-karp/src/media/timestamp.rs b/moq-karp/src/media/timestamp.rs index 2a1ecd4b..70cc86d0 100644 --- a/moq-karp/src/media/timestamp.rs +++ b/moq-karp/src/media/timestamp.rs @@ -1,6 +1,6 @@ -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct Timestamp { - pub micros: u64, + micros: u64, } impl Timestamp { @@ -18,9 +18,9 @@ impl Timestamp { } } - pub fn from_scale(base: u64, scale: u64) -> Self { + pub fn from_units(value: u64, base: u64) -> Self { Self { - micros: base * 1_000_000 / scale, + micros: (value * 1_000_000) / base, } } @@ -36,7 +36,7 @@ impl Timestamp { self.micros / 1_000_000 } - pub fn to_scale(&self, scale: u64) -> u64 { - self.micros * scale / 1_000_000 + pub fn as_units(&self, base: u64) -> u64 { + (self.micros * base) / 1_000_000 } } diff --git a/moq-karp/src/produce/audio.rs b/moq-karp/src/produce/audio.rs deleted file mode 100644 index acf9efff..00000000 --- a/moq-karp/src/produce/audio.rs +++ /dev/null @@ -1,40 +0,0 @@ -use moq_transfork::coding::*; - -use crate::media::Timestamp; - -pub struct Audio { - inner: moq_transfork::TrackProducer, - group: Option, -} - -impl Audio { - pub(super) fn new(inner: moq_transfork::TrackProducer) -> Self { - Self { inner, group: None } - } - - // Terminate an audio segment. - // This should be done at least once per video keyframe, but may be done more frequently. - pub fn segment(&mut self) { - // The take() is important, it means we'll create a new group on the next write. - if let Some(group) = self.group.take() { - tracing::debug!(sequence = group.sequence, frames = group.frame_count(), "keyframe"); - } - } - - pub fn write(&mut self, timestamp: Timestamp, payload: Bytes) { - let timestamp = timestamp.as_micros(); - let mut header = BytesMut::with_capacity(timestamp.encode_size()); - timestamp.encode(&mut header); - - let mut group = match self.group.take() { - Some(group) => group, - None => self.inner.append_group(), - }; - - let mut frame = group.create_frame(header.len() + payload.len()); - frame.write(header.freeze()); - frame.write(payload); - - self.group.replace(group); - } -} diff --git a/moq-karp/src/produce/broadcast.rs b/moq-karp/src/produce/broadcast.rs index ee542642..06c74fc7 100644 --- a/moq-karp/src/produce/broadcast.rs +++ b/moq-karp/src/produce/broadcast.rs @@ -5,7 +5,7 @@ use crate::{ Error, }; -use super::{Audio, Video}; +use super::Track; pub struct Broadcast { pub path: Path, @@ -26,7 +26,7 @@ impl Broadcast { } } - pub fn create_video(&mut self, info: catalog::Video) -> Result { + pub fn create_video(&mut self, info: catalog::Video) -> Result { let (producer, consumer) = moq_transfork::Track { path: self.path.clone().push(&info.track.name), priority: info.track.priority, @@ -34,7 +34,7 @@ impl Broadcast { } .produce(); - let track = Video::new(producer); + let track = Track::new(producer); self.catalog.video.push(info); self.tracks.push(consumer); @@ -44,7 +44,7 @@ impl Broadcast { Ok(track) } - pub fn create_audio(&mut self, info: catalog::Audio) -> Result { + pub fn create_audio(&mut self, info: catalog::Audio) -> Result { let (producer, consumer) = moq_transfork::Track { path: self.path.clone().push(&info.track.name), priority: info.track.priority, @@ -52,7 +52,7 @@ impl Broadcast { } .produce(); - let track = Audio::new(producer); + let track = Track::new(producer); self.catalog.audio.push(info); self.tracks.push(consumer); diff --git a/moq-karp/src/produce/mod.rs b/moq-karp/src/produce/mod.rs index 7a7739f2..3a7e6848 100644 --- a/moq-karp/src/produce/mod.rs +++ b/moq-karp/src/produce/mod.rs @@ -1,9 +1,7 @@ -mod audio; mod broadcast; mod resume; -mod video; +mod track; -pub use audio::*; pub use broadcast::*; pub use resume::*; -pub use video::*; +pub use track::*; diff --git a/moq-karp/src/produce/track.rs b/moq-karp/src/produce/track.rs new file mode 100644 index 00000000..6f1b8025 --- /dev/null +++ b/moq-karp/src/produce/track.rs @@ -0,0 +1,31 @@ +use moq_transfork::coding::*; + +use crate::media::Frame; + +pub struct Track { + inner: moq_transfork::TrackProducer, + group: Option, +} + +impl Track { + pub(super) fn new(inner: moq_transfork::TrackProducer) -> Self { + Self { inner, group: None } + } + + pub fn write(&mut self, frame: Frame) { + let timestamp = frame.timestamp.as_micros(); + let mut header = BytesMut::with_capacity(timestamp.encode_size()); + timestamp.encode(&mut header); + + let mut group = match self.group.take() { + Some(group) if !frame.keyframe => group, + _ => self.inner.append_group(), + }; + + let mut chunked = group.create_frame(header.len() + frame.payload.len()); + chunked.write(header.freeze()); + chunked.write(frame.payload); + + self.group.replace(group); + } +} diff --git a/moq-karp/src/produce/video.rs b/moq-karp/src/produce/video.rs deleted file mode 100644 index d82de5d6..00000000 --- a/moq-karp/src/produce/video.rs +++ /dev/null @@ -1,38 +0,0 @@ -use moq_transfork::coding::*; - -use crate::media::Timestamp; - -pub struct Video { - inner: moq_transfork::TrackProducer, - group: Option, -} - -impl Video { - pub(super) fn new(inner: moq_transfork::TrackProducer) -> Self { - Self { inner, group: None } - } - - pub fn keyframe(&mut self) { - // The take() is important, it means we'll create a new group on the next write. - if let Some(group) = self.group.take() { - tracing::debug!(sequence = group.sequence, frames = group.frame_count(), "keyframe"); - } - } - - pub fn write(&mut self, timestamp: Timestamp, payload: Bytes) { - let timestamp = timestamp.as_micros(); - let mut header = BytesMut::with_capacity(timestamp.encode_size()); - timestamp.encode(&mut header); - - let mut group = match self.group.take() { - Some(group) => group, - None => self.inner.append_group(), - }; - - let mut frame = group.create_frame(header.len() + payload.len()); - frame.write(header.freeze()); - frame.write(payload); - - self.group.replace(group); - } -} diff --git a/moq-native/src/lib.rs b/moq-native/src/lib.rs index 0d772cdb..b3092047 100644 --- a/moq-native/src/lib.rs +++ b/moq-native/src/lib.rs @@ -1,3 +1,7 @@ pub mod log; pub mod quic; pub mod tls; + +// Re-export these crates. +pub use moq_transfork; +pub use web_transport; diff --git a/moq-transfork/src/coding/path.rs b/moq-transfork/src/coding/path.rs index d47fa696..12fb41f3 100644 --- a/moq-transfork/src/coding/path.rs +++ b/moq-transfork/src/coding/path.rs @@ -14,6 +14,6 @@ impl Encode for Path { impl Decode for Path { fn decode(r: &mut R) -> Result { let parts = Vec::::decode(r)?; - Ok(Path::new(parts)) + Ok(Self::new(parts)) } } diff --git a/moq-transfork/src/model/path.rs b/moq-transfork/src/model/path.rs index de46b049..064d81ed 100644 --- a/moq-transfork/src/model/path.rs +++ b/moq-transfork/src/model/path.rs @@ -6,9 +6,10 @@ pub struct Path { } impl Path { - pub fn new(parts: Vec) -> Path { - Path { - parts: parts.into_iter().map(Arc::new).collect(), + /// Creates a new `Path` from any collection of elements that can be converted to strings. + pub fn new>(parts: I) -> Self { + Self { + parts: parts.into_iter().map(|s| Arc::new(s.to_string())).collect(), } } @@ -82,3 +83,9 @@ impl fmt::Debug for Path { write!(f, "]") } } + +impl FromIterator for Path { + fn from_iter>(iter: T) -> Self { + Self::new(iter) + } +} diff --git a/moq-transfork/src/session/publisher.rs b/moq-transfork/src/session/publisher.rs index a9e8fbe7..921f898d 100644 --- a/moq-transfork/src/session/publisher.rs +++ b/moq-transfork/src/session/publisher.rs @@ -61,7 +61,7 @@ impl Publisher { pub fn announce(&mut self, mut announced: AnnouncedConsumer) { let mut downstream = self.announced.clone(); - tokio::spawn(async move { + spawn(async move { while let Some(announced) = announced.next().await { match announced { Announced::Active(path) => downstream.announce(path.clone()),