Skip to content

Commit

Permalink
Small API changes to avoid needing the Session upfront. (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Oct 28, 2024
1 parent 7376f73 commit 79c76df
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 58 deletions.
4 changes: 2 additions & 2 deletions moq-clock/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ impl Publisher {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();

segment.write_frame(base.clone().into());
segment.write_frame(base.clone());

loop {
let delta = now.format("%S").to_string();
segment.write_frame(delta.clone().into());
segment.write_frame(delta.clone());

let next = now + chrono::Duration::try_seconds(1).unwrap();
let next = next.with_nanosecond(0).unwrap();
Expand Down
20 changes: 6 additions & 14 deletions moq-karp/src/catalog/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,18 @@ impl Broadcast {
Ok(parsed)
}

pub fn publish(&self, session: &mut moq_transfork::Session, path: Path) -> Result<moq_transfork::TrackProducer> {
let (mut writer, reader) = moq_transfork::Track {
path,
/// Returns the track metadata that should be used for this catalog.
pub fn track(path: Path) -> moq_transfork::Track {
moq_transfork::Track {
path: path.push("catalog.json"),
priority: -1,
group_order: moq_transfork::GroupOrder::Desc,
group_expires: std::time::Duration::ZERO,
}
.produce();

session.publish(reader)?;
self.update(&mut writer)?;
Ok(writer)
}

pub fn update(&self, track: &mut moq_transfork::TrackProducer) -> Result<()> {
let mut group = track.append_group();
let frame = self.to_string()?;
group.write_frame(frame.into());

Ok(())
pub fn is_empty(&self) -> bool {
self.video.is_empty() && self.audio.is_empty()
}
}

Expand Down
24 changes: 13 additions & 11 deletions moq-karp/src/cmaf/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,11 @@ impl Import {

// Read the media from a stream, processing moof and mdat atoms.
pub async fn read_from<T: AsyncRead + Unpin>(&mut self, input: &mut T) -> Result<()> {
loop {
tokio::select! {
res = Option::<mp4_atom::Any>::read_from(input) => {
match res {
Ok(Some(atom)) => self.process(atom)?,
Ok(None) => return Ok(()),
Err(err) => return Err(err.into()),
}
}
_ = self.broadcast.closed() => return Err(Error::Closed),
}
while let Some(atom) = Option::<mp4_atom::Any>::read_from(input).await? {
self.process(atom)?;
}

Ok(())
}

fn process(&mut self, atom: mp4_atom::Any) -> Result<()> {
Expand Down Expand Up @@ -298,4 +291,13 @@ impl Import {
pub fn catalog(&self) -> &catalog::Broadcast {
self.broadcast.catalog()
}

pub fn publish(&mut self, session: &mut moq_transfork::Session) -> Result<()> {
if self.moov.is_none() {
return Err(Error::MissingBox(Moov::KIND));
}
self.broadcast.publish(session)?;

Ok(())
}
}
12 changes: 8 additions & 4 deletions moq-karp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ async fn main() -> anyhow::Result<()> {
}

#[tracing::instrument("publish", skip_all, err, fields(?path))]
async fn publish(session: moq_transfork::Session, path: Path) -> anyhow::Result<()> {
let producer = produce::Resumable::new(session, path);
let broadcast = producer.broadcast();
async fn publish(mut session: moq_transfork::Session, path: Path) -> anyhow::Result<()> {
let broadcast = produce::Resumable::new(path).broadcast();

let mut input = tokio::io::stdin();

Expand All @@ -73,7 +72,12 @@ async fn publish(session: moq_transfork::Session, path: Path) -> anyhow::Result<

tracing::info!(catalog = ?import.catalog());

Ok(import.read_from(&mut input).await?)
import.publish(&mut session)?;

tokio::select! {
res = import.read_from(&mut input) => Ok(res?),
res = session.closed() => Err(res.into()),
}
}

/*
Expand Down
45 changes: 26 additions & 19 deletions moq-karp/src/produce/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use moq_transfork::{Path, Session};
use moq_transfork::{Path, Session, TrackConsumer, TrackProducer};

use crate::{
catalog::{self},
Expand All @@ -8,19 +8,21 @@ use crate::{
use super::{Audio, Video};

pub struct Broadcast {
pub path: Path,
catalog: catalog::Broadcast,
catalog_track: Option<moq_transfork::TrackProducer>,
session: Session,
path: Path,
catalog_producer: TrackProducer, // need to hold the track to keep it open
tracks: Vec<TrackConsumer>,
}

impl Broadcast {
pub fn new(session: Session, path: Path) -> Self {
pub fn new(path: Path) -> Self {
let (producer, consumer) = catalog::Broadcast::track(path.clone()).produce();

Self {
session,
path,
catalog: catalog::Broadcast::default(),
catalog_track: None,
catalog_producer: producer,
tracks: vec![consumer],
}
}

Expand All @@ -32,11 +34,12 @@ impl Broadcast {
}
.produce();

self.session.publish(consumer)?;
let track = Video::new(producer);

self.catalog.video.push(info);
self.publish()?;
self.tracks.push(consumer);

self.update()?;

Ok(track)
}
Expand All @@ -49,11 +52,12 @@ impl Broadcast {
}
.produce();

self.session.publish(consumer)?;
let track = Audio::new(producer);

self.catalog.audio.push(info);
self.publish()?;
self.tracks.push(consumer);

self.update()?;

Ok(track)
}
Expand All @@ -62,18 +66,21 @@ impl Broadcast {
&self.catalog
}

fn publish(&mut self) -> Result<(), Error> {
if let Some(track) = self.catalog_track.as_mut() {
return Ok(self.catalog.update(track)?);
}
fn update(&mut self) -> Result<(), Error> {
let frame = self.catalog.to_string()?;

let path = self.path.clone().push("catalog.json");
self.catalog_track = self.catalog.publish(&mut self.session, path)?.into();
let mut group = self.catalog_producer.append_group();
group.write_frame(frame);

Ok(())
}

pub async fn closed(&self) {
self.session.closed().await;
/// Publish all of the *current* tracks to the session.
pub fn publish(&self, session: &mut Session) -> Result<(), Error> {
for track in &self.tracks {
session.publish(track.clone())?;
}

Ok(())
}
}
9 changes: 4 additions & 5 deletions moq-karp/src/produce/resume.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::time;

use moq_transfork::{Path, Session};
use moq_transfork::Path;

use super::Broadcast;

pub struct Resumable {
session: Session,
path: Path,
}

impl Resumable {
pub fn new(session: Session, path: Path) -> Self {
Self { session, path }
pub fn new(path: Path) -> Self {
Self { path }
}

/// Produce a broadcast using the current time as the ID.
Expand All @@ -23,6 +22,6 @@ impl Resumable {
.as_secs();

let path = self.path.clone().push(id.to_string());
Broadcast::new(self.session.clone(), path)
Broadcast::new(path)
}
}
4 changes: 2 additions & 2 deletions moq-transfork/src/model/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl FrameProducer {
Self { state, info }
}

pub fn write(&mut self, chunk: bytes::Bytes) {
self.state.send_modify(|state| state.chunks.push(chunk));
pub fn write<B: Into<Bytes>>(&mut self, chunk: B) {
self.state.send_modify(|state| state.chunks.push(chunk.into()));
}

/// Close the stream with an error.
Expand Down
3 changes: 2 additions & 1 deletion moq-transfork/src/model/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl GroupProducer {
}

// Write a frame in one go
pub fn write_frame(&mut self, frame: bytes::Bytes) {
pub fn write_frame<B: Into<Bytes>>(&mut self, frame: B) {
let frame = frame.into();
self.create_frame(frame.len()).write(frame);
}

Expand Down

0 comments on commit 79c76df

Please sign in to comment.