From 07ac215d7f7832601d5951716ffc4478e3dded46 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 27 Jul 2023 11:59:37 +0200 Subject: [PATCH] update mediacommon; replace Client.OnData with OnDataH26x, OnDataMPEG4Audio, OnDataOpus --- README.md | 8 +- client.go | 30 +++- client_downloader_primary.go | 5 +- client_downloader_stream.go | 15 +- client_processor_fmp4.go | 184 ++++++++++++++++-------- client_processor_fmp4_track.go | 72 ---------- client_processor_mpegts.go | 240 ++++++++++++++++--------------- client_processor_mpegts_track.go | 63 -------- client_test.go | 14 +- client_timesync_fmp4.go | 8 +- client_timesync_mpegts.go | 12 +- client_track_processor.go | 41 ++++++ examples/client/main.go | 24 +++- go.mod | 2 +- go.sum | 4 +- muxer.go | 13 +- muxer_segment_mpegts.go | 54 ++++--- muxer_segmenter.go | 3 +- muxer_segmenter_fmp4.go | 35 +++++ muxer_segmenter_mpegts.go | 68 ++++++--- muxer_test.go | 24 ++-- pkg/codecs/mpeg4audio.go | 2 +- pkg/fmp4/init.go | 4 +- 23 files changed, 513 insertions(+), 412 deletions(-) delete mode 100644 client_processor_fmp4_track.go delete mode 100644 client_processor_mpegts_track.go create mode 100644 client_track_processor.go diff --git a/README.md b/README.md index 7904513..3f8d78e 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ General features: * [Examples](#examples) * [API Documentation](#api-documentation) * [Standards](#standards) -* [Links](#links) +* [Related projects](#related-projects) ## Examples @@ -72,8 +72,8 @@ https://pkg.go.dev/github.com/bluenviron/gohlslib#pkg-index * [Codec standards](https://github.com/bluenviron/mediacommon#standards) * [Golang project layout](https://github.com/golang-standards/project-layout) -## Links - -Related projects +## Related projects * [MediaMTX](https://github.com/bluenviron/mediamtx) +* [gortsplib](https://github.com/bluenviron/gortsplib) +* [mediacommon](https://github.com/bluenviron/mediacommon) diff --git a/client.go b/client.go index 0644d80..14a5b9d 100644 --- a/client.go +++ b/client.go @@ -16,11 +16,17 @@ import ( const ( clientMPEGTSEntryQueueSize = 100 clientFMP4MaxPartTracksPerSegment = 200 - clientLiveStartingInvPosition = 3 - clientLiveMaxInvPosition = 5 + clientLiveInitialDistance = 3 + clientLiveMaxDistanceFromEnd = 5 clientMaxDTSRTCDiff = 10 * time.Second ) +type onDataH26xFunc func(pts time.Duration, dts time.Duration, au [][]byte) + +type onDataMPEG4AudioFunc func(pts time.Duration, dts time.Duration, aus [][]byte) + +type onDataOpusFunc func(pts time.Duration, dts time.Duration, packets [][]byte) + func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) { u, err := url.Parse(relative) if err != nil { @@ -50,7 +56,7 @@ type Client struct { ctx context.Context ctxCancel func() onTracks func([]*Track) error - onData map[*Track]func(time.Duration, interface{}) + onData map[*Track]interface{} playlistURL *url.URL // out @@ -74,7 +80,7 @@ func (c *Client) Start() error { c.ctx, c.ctxCancel = context.WithCancel(context.Background()) - c.onData = make(map[*Track]func(time.Duration, interface{})) + c.onData = make(map[*Track]interface{}) c.outErr = make(chan error, 1) go c.run() @@ -97,9 +103,19 @@ func (c *Client) OnTracks(cb func([]*Track) error) { c.onTracks = cb } -// OnData sets a callback that is called when data arrives. -func (c *Client) OnData(forma *Track, cb func(time.Duration, interface{})) { - c.onData[forma] = cb +// OnDataH26x sets a callback that is called when data from an H26x track is received. +func (c *Client) OnDataH26x(forma *Track, onData onDataH26xFunc) { + c.onData[forma] = onData +} + +// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received. +func (c *Client) OnDataMPEG4Audio(forma *Track, onData onDataMPEG4AudioFunc) { + c.onData[forma] = onData +} + +// OnDataOpus sets a callback that is called when data from an Opus track is received. +func (c *Client) OnDataOpus(forma *Track, onData onDataOpusFunc) { + c.onData[forma] = onData } func (c *Client) run() { diff --git a/client_downloader_primary.go b/client_downloader_primary.go index 106b75f..619a65b 100644 --- a/client_downloader_primary.go +++ b/client_downloader_primary.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" "strings" - "time" "github.com/bluenviron/gohlslib/pkg/playlist" ) @@ -104,7 +103,7 @@ type clientDownloaderPrimary struct { httpClient *http.Client log LogFunc onTracks func([]*Track) error - onData map[*Track]func(time.Duration, interface{}) + onData map[*Track]interface{} rp *clientRoutinePool leadingTimeSync clientTimeSync @@ -123,7 +122,7 @@ func newClientDownloaderPrimary( log LogFunc, rp *clientRoutinePool, onTracks func([]*Track) error, - onData map[*Track]func(time.Duration, interface{}), + onData map[*Track]interface{}, ) *clientDownloaderPrimary { return &clientDownloaderPrimary{ primaryPlaylistURL: primaryPlaylistURL, diff --git a/client_downloader_stream.go b/client_downloader_stream.go index 7da4398..5c0b27c 100644 --- a/client_downloader_stream.go +++ b/client_downloader_stream.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" "strconv" - "time" "github.com/bluenviron/gohlslib/pkg/playlist" ) @@ -40,7 +39,7 @@ type clientDownloaderStream struct { onStreamTracks func(context.Context, []*Track) bool onSetLeadingTimeSync func(clientTimeSync) onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) - onData map[*Track]func(time.Duration, interface{}) + onData map[*Track]interface{} curSegmentID *int } @@ -55,7 +54,7 @@ func newClientDownloaderStream( onStreamTracks func(context.Context, []*Track) bool, onSetLeadingTimeSync func(clientTimeSync), onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool), - onData map[*Track]func(time.Duration, interface{}), + onData map[*Track]interface{}, ) *clientDownloaderStream { return &clientDownloaderStream{ isLeading: isLeading, @@ -216,8 +215,8 @@ func (d *clientDownloaderStream) fillSegmentQueue( var segPos int if d.curSegmentID == nil { - if !pl.Endlist { // live stream: start from clientLiveStartingInvPosition - seg, segPos = findSegmentWithInvPosition(pl.Segments, clientLiveStartingInvPosition) + if !pl.Endlist { // live stream: start from clientLiveInitialDistance + seg, segPos = findSegmentWithInvPosition(pl.Segments, clientLiveInitialDistance) if seg == nil { return fmt.Errorf("there aren't enough segments to fill the buffer") } @@ -231,12 +230,12 @@ func (d *clientDownloaderStream) fillSegmentQueue( var invPos int seg, segPos, invPos = findSegmentWithID(pl.MediaSequence, pl.Segments, *d.curSegmentID+1) if seg == nil { - return fmt.Errorf("following segment not found or not ready yet") + return fmt.Errorf("next segment not found or not ready yet") } - d.log(LogLevelDebug, "segment inverse position: %d", invPos) + d.log(LogLevelDebug, "distance of next segment from end of playlist: %d", invPos) - if !pl.Endlist && invPos > clientLiveMaxInvPosition { + if !pl.Endlist && invPos > clientLiveMaxDistanceFromEnd { return fmt.Errorf("playback is too late") } } diff --git a/client_processor_fmp4.go b/client_processor_fmp4.go index 1498ca1..e4de6a9 100644 --- a/client_processor_fmp4.go +++ b/client_processor_fmp4.go @@ -31,12 +31,12 @@ type clientProcessorFMP4 struct { rp *clientRoutinePool onSetLeadingTimeSync func(clientTimeSync) onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) - onData map[*Track]func(time.Duration, interface{}) + onData map[*Track]interface{} - tracks []*Track - init fmp4.Init - leadingTrackID int - trackProcs map[int]*clientProcessorFMP4Track + tracks []*Track + init fmp4.Init + leadingTrackID int + prePreProcessFuncs map[int]func(context.Context, *fmp4.PartTrack) error // in subpartProcessed chan struct{} @@ -52,7 +52,7 @@ func newClientProcessorFMP4( onStreamTracks func(context.Context, []*Track) bool, onSetLeadingTimeSync func(clientTimeSync), onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool), - onData map[*Track]func(time.Duration, interface{}), + onData map[*Track]interface{}, ) (*clientProcessorFMP4, error) { p := &clientProcessorFMP4{ isLeading: isLeading, @@ -111,41 +111,16 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e processingCount := 0 for _, part := range parts { - for _, track := range part.Tracks { - if p.trackProcs == nil { - var ts *clientTimeSyncFMP4 - - if p.isLeading { - if track.ID != p.leadingTrackID { - continue - } - - timeScale := func() uint32 { - for _, track := range p.init.Tracks { - if track.ID == p.leadingTrackID { - return track.TimeScale - } - } - return 0 - }() - ts = newClientTimeSyncFMP4(timeScale, track.BaseTime) - p.onSetLeadingTimeSync(ts) - } else { - rawTS, ok := p.onGetLeadingTimeSync(ctx) - if !ok { - return fmt.Errorf("terminated") - } - - ts, ok = rawTS.(*clientTimeSyncFMP4) - if !ok { - return fmt.Errorf("stream playlists are mixed MPEGTS/FMP4") - } + for _, partTrack := range part.Tracks { + err := p.initializeTrackProcs(ctx, partTrack) + if err != nil { + if err == errSkipSilently { + continue } - - p.initializeTrackProcs(ts) + return err } - proc, ok := p.trackProcs[track.ID] + prePreProcess, ok := p.prePreProcessFuncs[partTrack.ID] if !ok { continue } @@ -154,11 +129,11 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e return fmt.Errorf("too many part tracks at once") } - select { - case proc.queue <- track: - case <-ctx.Done(): - return fmt.Errorf("terminated") + err = prePreProcess(ctx, partTrack) + if err != nil { + return err } + processingCount++ } } @@ -181,44 +156,127 @@ func (p *clientProcessorFMP4) onPartTrackProcessed(ctx context.Context) { } } -func (p *clientProcessorFMP4) initializeTrackProcs(ts *clientTimeSyncFMP4) { - p.trackProcs = make(map[int]*clientProcessorFMP4Track) +func (p *clientProcessorFMP4) initializeTrackProcs(ctx context.Context, track *fmp4.PartTrack) error { + if p.prePreProcessFuncs != nil { + return nil + } - for i, track := range p.tracks { - var cb func(time.Duration, []byte) error + var timeSync *clientTimeSyncFMP4 + isLeadingTrack := (track.ID == p.leadingTrackID) - cb2, ok := p.onData[track] - if !ok { - cb2 = func(time.Duration, interface{}) { + if p.isLeading { + if !isLeadingTrack { + return errSkipSilently + } + + timeScale := func() uint32 { + for _, track := range p.init.Tracks { + if isLeadingTrack { + return track.TimeScale + } } + return 0 + }() + timeSync = newClientTimeSyncFMP4(timeScale, track.BaseTime) + p.onSetLeadingTimeSync(timeSync) + } else { + rawTS, ok := p.onGetLeadingTimeSync(ctx) + if !ok { + return fmt.Errorf("terminated") + } + + timeSync, ok = rawTS.(*clientTimeSyncFMP4) + if !ok { + return fmt.Errorf("stream playlists are mixed MPEG-TS/fMP4") } + } + + p.prePreProcessFuncs = make(map[int]func(context.Context, *fmp4.PartTrack) error) + + for i, track := range p.tracks { + onData := p.onData[track] + + var postProcess func(pts time.Duration, dts time.Duration, payload []byte) error switch track.Codec.(type) { case *codecs.H264, *codecs.H265: - cb = func(pts time.Duration, payload []byte) error { - nalus, err := h264.AVCCUnmarshal(payload) + var onDataCasted onDataH26xFunc = func(pts time.Duration, dts time.Duration, au [][]byte) {} + if onData != nil { + onDataCasted = onData.(onDataH26xFunc) + } + + postProcess = func(pts time.Duration, dts time.Duration, payload []byte) error { + au, err := h264.AVCCUnmarshal(payload) if err != nil { return err } - cb2(pts, nalus) + onDataCasted(pts, dts, au) + return nil + } + + case *codecs.MPEG4Audio: + var onDataCasted onDataMPEG4AudioFunc = func(pts time.Duration, dts time.Duration, aus [][]byte) {} + if onData != nil { + onDataCasted = onData.(onDataMPEG4AudioFunc) + } + + postProcess = func(pts time.Duration, dts time.Duration, payload []byte) error { + onDataCasted(pts, dts, [][]byte{payload}) return nil } - case *codecs.MPEG4Audio, *codecs.Opus: - cb = func(pts time.Duration, payload []byte) error { - cb2(pts, payload) + case *codecs.Opus: + var onDataCasted onDataOpusFunc = func(pts time.Duration, dts time.Duration, packets [][]byte) {} + if onData != nil { + onDataCasted = onData.(onDataOpusFunc) + } + + postProcess = func(pts time.Duration, dts time.Duration, payload []byte) error { + onDataCasted(pts, dts, [][]byte{payload}) return nil } } - proc := newClientProcessorFMP4Track( - p.init.Tracks[i].TimeScale, - ts, - p.onPartTrackProcessed, - cb, - ) - p.rp.add(proc) - p.trackProcs[p.init.Tracks[i].ID] = proc + timeScale := p.init.Tracks[i].TimeScale + + preProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error { + rawDTS := partTrack.BaseTime + + for _, sample := range partTrack.Samples { + pts, dts, err := timeSync.convertAndSync(ctx, timeScale, rawDTS, sample.PTSOffset) + if err != nil { + return err + } + + rawDTS += uint64(sample.Duration) + + // silently discard packets prior to the first packet of the leading track + if pts < 0 { + continue + } + + err = postProcess(pts, dts, sample.Payload) + if err != nil { + return err + } + } + + p.onPartTrackProcessed(ctx) + return nil + } + + trackProc := newClientTrackProcessor() + p.rp.add(trackProc) + + prePreProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error { + return trackProc.push(ctx, func() error { + return preProcess(ctx, partTrack) + }) + } + + p.prePreProcessFuncs[p.init.Tracks[i].ID] = prePreProcess } + + return nil } diff --git a/client_processor_fmp4_track.go b/client_processor_fmp4_track.go deleted file mode 100644 index badbaae..0000000 --- a/client_processor_fmp4_track.go +++ /dev/null @@ -1,72 +0,0 @@ -package gohlslib - -import ( - "context" - "time" - - "github.com/bluenviron/gohlslib/pkg/fmp4" -) - -type clientProcessorFMP4Track struct { - timeScale uint32 - ts *clientTimeSyncFMP4 - onPartTrackProcessed func(context.Context) - onEntry func(time.Duration, []byte) error - - // in - queue chan *fmp4.PartTrack -} - -func newClientProcessorFMP4Track( - timeScale uint32, - ts *clientTimeSyncFMP4, - onPartTrackProcessed func(context.Context), - onEntry func(time.Duration, []byte) error, -) *clientProcessorFMP4Track { - return &clientProcessorFMP4Track{ - timeScale: timeScale, - ts: ts, - onPartTrackProcessed: onPartTrackProcessed, - onEntry: onEntry, - queue: make(chan *fmp4.PartTrack, clientFMP4MaxPartTracksPerSegment), - } -} - -func (t *clientProcessorFMP4Track) run(ctx context.Context) error { - for { - select { - case entry := <-t.queue: - err := t.processPartTrack(ctx, entry) - if err != nil { - return err - } - - t.onPartTrackProcessed(ctx) - - case <-ctx.Done(): - return nil - } - } -} - -func (t *clientProcessorFMP4Track) processPartTrack(ctx context.Context, pt *fmp4.PartTrack) error { - rawDTS := pt.BaseTime - - for _, sample := range pt.Samples { - pts, err := t.ts.convertAndSync(ctx, t.timeScale, rawDTS, sample.PTSOffset) - if err != nil { - return err - } - - if pts >= 0 { // silently discard packets prior to the first packet of the leading track - err = t.onEntry(pts, sample.Payload) - if err != nil { - return err - } - } - - rawDTS += uint64(sample.Duration) - } - - return nil -} diff --git a/client_processor_mpegts.go b/client_processor_mpegts.go index 51a0314..fbfd3bd 100644 --- a/client_processor_mpegts.go +++ b/client_processor_mpegts.go @@ -3,29 +3,31 @@ package gohlslib import ( "bytes" "context" + "errors" "fmt" + "io" "strings" "time" "github.com/asticode/go-astits" - "github.com/bluenviron/mediacommon/pkg/codecs/h264" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/bluenviron/gohlslib/pkg/codecs" ) -func mpegtsPickLeadingTrack(mpegtsTracks []*mpegts.Track) uint16 { +var errSkipSilently = errors.New("skip silently") + +func mpegtsPickLeadingTrack(mpegtsTracks []*mpegts.Track) *mpegts.Track { // pick first video track - for _, mt := range mpegtsTracks { - if _, ok := mt.Codec.(*mpegts.CodecH264); ok { - return mt.ES.ElementaryPID + for _, track := range mpegtsTracks { + if _, ok := track.Codec.(*mpegts.CodecH264); ok { + return track } } // otherwise, pick first track - return mpegtsTracks[0].ES.ElementaryPID + return mpegtsTracks[0] } func trackMPEGTSToHLS(mt *mpegts.Track) *Track { @@ -46,6 +48,14 @@ func trackMPEGTSToHLS(mt *mpegts.Track) *Track { return nil } +type switchableReader struct { + r io.Reader +} + +func (r *switchableReader) Read(p []byte) (int, error) { + return r.r.Read(p) +} + type clientProcessorMPEGTS struct { isLeading bool segmentQueue *clientSegmentQueue @@ -54,12 +64,13 @@ type clientProcessorMPEGTS struct { onStreamTracks func(context.Context, []*Track) bool onSetLeadingTimeSync func(clientTimeSync) onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool) - onData map[*Track]func(time.Duration, interface{}) + onData map[*Track]interface{} - tracks []*Track - mpegtsTracks []*mpegts.Track - leadingTrackPID uint16 - trackProcs map[uint16]*clientProcessorMPEGTSTrack + switchableReader *switchableReader + reader *mpegts.Reader + tracks []*Track + trackProcs map[*Track]*clientTrackProcessor + timeSync *clientTimeSyncMPEGTS } func newClientProcessorMPEGTS( @@ -70,7 +81,7 @@ func newClientProcessorMPEGTS( onStreamTracks func(context.Context, []*Track) bool, onSetLeadingTimeSync func(clientTimeSync), onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool), - onData map[*Track]func(time.Duration, interface{}), + onData map[*Track]interface{}, ) *clientProcessorMPEGTS { return &clientProcessorMPEGTS{ isLeading: isLeading, @@ -99,16 +110,16 @@ func (p *clientProcessorMPEGTS) run(ctx context.Context) error { } func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error { - if p.mpegtsTracks == nil { - dem := astits.NewDemuxer(context.Background(), bytes.NewReader(byts)) + if p.switchableReader == nil { + p.switchableReader = &switchableReader{bytes.NewReader(byts)} var err error - p.mpegtsTracks, err = mpegts.FindTracks(dem) + p.reader, err = mpegts.NewReader(p.switchableReader) if err != nil { return err } - for _, track := range p.mpegtsTracks { + for _, track := range p.reader.Tracks() { switch track.Codec.(type) { case *mpegts.CodecH264, *mpegts.CodecMPEG4Audio: default: @@ -116,135 +127,138 @@ func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) } } - p.leadingTrackPID = mpegtsPickLeadingTrack(p.mpegtsTracks) + leadingTrack := mpegtsPickLeadingTrack(p.reader.Tracks()) + p.tracks = make([]*Track, len(p.reader.Tracks())) - p.tracks = make([]*Track, len(p.mpegtsTracks)) - for i, mt := range p.mpegtsTracks { - p.tracks[i] = trackMPEGTSToHLS(mt) + for i, mpegtsTrack := range p.reader.Tracks() { + p.tracks[i] = trackMPEGTSToHLS(mpegtsTrack) } ok := p.onStreamTracks(ctx, p.tracks) if !ok { return fmt.Errorf("terminated") } - } - - dem := astits.NewDemuxer(context.Background(), bytes.NewReader(byts)) - for { - data, err := dem.NextData() - if err != nil { - if err == astits.ErrNoMorePackets { - return nil - } - if strings.HasPrefix(err.Error(), "astits: parsing PES data failed") { - continue - } - return err - } + for i, mpegtsTrack := range p.reader.Tracks() { + track := p.tracks[i] + isLeadingTrack := (leadingTrack == mpegtsTrack) + var trackProc *clientTrackProcessor + onData := p.onData[track] - if data.PES == nil { - continue - } + preProcess := func(ctx context.Context, rawPTS int64, + rawDTS int64, postProcess func(time.Duration, time.Duration), + ) error { + pts, dts, err := p.timeSync.convertAndSync(ctx, rawPTS, rawDTS) + if err != nil { + return err + } - if data.PES.Header.OptionalHeader == nil || - data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorNoPTSOrDTS || - data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorIsForbidden { - return fmt.Errorf("PTS is missing") - } + // silently discard packets prior to the first packet of the leading track + if pts < 0 { + return nil + } - if p.trackProcs == nil { - var ts *clientTimeSyncMPEGTS + postProcess(pts, dts) + return nil + } - if p.isLeading { - if data.PID != p.leadingTrackPID { - continue + prePreProcess := func(pts int64, dts int64, postProcess func(time.Duration, time.Duration)) error { + err := p.initializeTrackProcs(ctx, isLeadingTrack, dts) + if err != nil { + if err == errSkipSilently { + return nil + } + return err } - var dts int64 - if data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent { - dts = data.PES.Header.OptionalHeader.DTS.Base - } else { - dts = data.PES.Header.OptionalHeader.PTS.Base + if trackProc == nil { + trackProc = p.trackProcs[track] } - ts = newClientTimeSyncMPEGTS(dts) - p.onSetLeadingTimeSync(ts) - } else { - rawTS, ok := p.onGetLeadingTimeSync(ctx) - if !ok { - return fmt.Errorf("terminated") - } + return trackProc.push(ctx, func() error { + return preProcess(ctx, pts, dts, postProcess) + }) + } - ts, ok = rawTS.(*clientTimeSyncMPEGTS) - if !ok { - return fmt.Errorf("stream playlists are mixed MPEGTS/FMP4") + switch track.Codec.(type) { + case *codecs.H264: + var onDataCasted onDataH26xFunc = func(pts time.Duration, dts time.Duration, au [][]byte) {} + if onData != nil { + onDataCasted = onData.(onDataH26xFunc) } - } - p.initializeTrackProcs(ts) - } + p.reader.OnDataH26x(mpegtsTrack, func(pts int64, dts int64, au [][]byte) error { + return prePreProcess( + pts, dts, + func(pts time.Duration, dts time.Duration) { + onDataCasted(pts, dts, au) + }) + }) + + case *codecs.MPEG4Audio: + var onDataCasted onDataMPEG4AudioFunc = func(pts time.Duration, dts time.Duration, aus [][]byte) {} + if onData != nil { + onDataCasted = onData.(onDataMPEG4AudioFunc) + } - proc, ok := p.trackProcs[data.PID] - if !ok { - continue + p.reader.OnDataMPEG4Audio(mpegtsTrack, func(pts int64, dts int64, aus [][]byte) error { + return prePreProcess( + pts, dts, + func(pts time.Duration, dts time.Duration) { + onDataCasted(pts, dts, aus) + }) + }) + } } + } else { + p.switchableReader.r = bytes.NewReader(byts) + } - select { - case proc.queue <- data.PES: - case <-ctx.Done(): + for { + err := p.reader.Read() + if err != nil { + if err == astits.ErrNoMorePackets { + return nil + } + if strings.HasPrefix(err.Error(), "astits: parsing PES data failed") { + continue + } + return err } } } -func (p *clientProcessorMPEGTS) initializeTrackProcs(ts *clientTimeSyncMPEGTS) { - p.trackProcs = make(map[uint16]*clientProcessorMPEGTSTrack) +func (p *clientProcessorMPEGTS) initializeTrackProcs(ctx context.Context, isLeadingTrack bool, dts int64) error { + if p.trackProcs != nil { + return nil + } - for i, track := range p.tracks { - var cb func(time.Duration, []byte) error + if p.isLeading { + if !isLeadingTrack { + return errSkipSilently + } - cb2, ok := p.onData[track] + p.timeSync = newClientTimeSyncMPEGTS(dts) + p.onSetLeadingTimeSync(p.timeSync) + } else { + rawTS, ok := p.onGetLeadingTimeSync(ctx) if !ok { - cb2 = func(time.Duration, interface{}) { - } + return fmt.Errorf("terminated") } - switch track.Codec.(type) { - case *codecs.H264: - cb = func(pts time.Duration, payload []byte) error { - au, err := h264.AnnexBUnmarshal(payload) - if err != nil { - p.log(LogLevelWarn, "unable to decode Annex-B: %s", err) - return nil - } - - cb2(pts, au) - return nil - } - - case *codecs.MPEG4Audio: - cb = func(pts time.Duration, payload []byte) error { - var adtsPkts mpeg4audio.ADTSPackets - err := adtsPkts.Unmarshal(payload) - if err != nil { - return fmt.Errorf("unable to decode ADTS: %s", err) - } - - for i, pkt := range adtsPkts { - cb2( - pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*time.Second/time.Duration(pkt.SampleRate), - pkt.AU) - } - - return nil - } + p.timeSync, ok = rawTS.(*clientTimeSyncMPEGTS) + if !ok { + return fmt.Errorf("stream playlists are mixed MPEGTS/FMP4") } + } + + p.trackProcs = make(map[*Track]*clientTrackProcessor) - proc := newClientProcessorMPEGTSTrack( - ts, - cb, - ) + for _, track := range p.tracks { + proc := newClientTrackProcessor() p.rp.add(proc) - p.trackProcs[p.mpegtsTracks[i].ES.ElementaryPID] = proc + p.trackProcs[track] = proc } + + return nil } diff --git a/client_processor_mpegts_track.go b/client_processor_mpegts_track.go deleted file mode 100644 index 7886139..0000000 --- a/client_processor_mpegts_track.go +++ /dev/null @@ -1,63 +0,0 @@ -package gohlslib - -import ( - "context" - "time" - - "github.com/asticode/go-astits" -) - -type clientProcessorMPEGTSTrack struct { - ts *clientTimeSyncMPEGTS - onEntry func(time.Duration, []byte) error - - queue chan *astits.PESData -} - -func newClientProcessorMPEGTSTrack( - ts *clientTimeSyncMPEGTS, - onEntry func(time.Duration, []byte) error, -) *clientProcessorMPEGTSTrack { - return &clientProcessorMPEGTSTrack{ - ts: ts, - onEntry: onEntry, - queue: make(chan *astits.PESData, clientMPEGTSEntryQueueSize), - } -} - -func (t *clientProcessorMPEGTSTrack) run(ctx context.Context) error { - for { - select { - case pes := <-t.queue: - err := t.processEntry(ctx, pes) - if err != nil { - return err - } - - case <-ctx.Done(): - return nil - } - } -} - -func (t *clientProcessorMPEGTSTrack) processEntry(ctx context.Context, pes *astits.PESData) error { - rawPTS := pes.Header.OptionalHeader.PTS.Base - var rawDTS int64 - if pes.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent { - rawDTS = pes.Header.OptionalHeader.DTS.Base - } else { - rawDTS = rawPTS - } - - pts, err := t.ts.convertAndSync(ctx, rawDTS, rawPTS) - if err != nil { - return err - } - - // silently discard packets prior to the first packet of the leading track - if pts < 0 { - return nil - } - - return t.onEntry(pts, pes.Data) -} diff --git a/client_test.go b/client_test.go index 5582583..baa62af 100644 --- a/client_test.go +++ b/client_test.go @@ -282,20 +282,21 @@ func TestClientMPEGTS(t *testing.T) { }, } - onH264 := func(pts time.Duration, unit interface{}) { + onH264 := func(pts time.Duration, dts time.Duration, au [][]byte) { require.Equal(t, 2*time.Second, pts) + require.Equal(t, time.Duration(0), dts) require.Equal(t, [][]byte{ {7, 1, 2, 3}, {8}, {5}, - }, unit) + }, au) close(packetRecv) } c.OnTracks(func(tracks []*Track) error { require.Equal(t, 1, len(tracks)) require.Equal(t, &codecs.H264{}, tracks[0].Codec) - c.OnData(tracks[0], onH264) + c.OnDataH26x(tracks[0], onH264) return nil }) @@ -344,13 +345,14 @@ segment.mp4 packetRecv := make(chan struct{}) - onH264 := func(pts time.Duration, unit interface{}) { + onH264 := func(pts time.Duration, dts time.Duration, au [][]byte) { require.Equal(t, 2*time.Second, pts) + require.Equal(t, time.Duration(0), dts) require.Equal(t, [][]byte{ {7, 1, 2, 3}, {8}, {5}, - }, unit) + }, au) close(packetRecv) } @@ -362,7 +364,7 @@ segment.mp4 require.Equal(t, 1, len(tracks)) _, ok := tracks[0].Codec.(*codecs.H264) require.Equal(t, true, ok) - c.OnData(tracks[0], onH264) + c.OnDataH26x(tracks[0], onH264) return nil }) diff --git a/client_timesync_fmp4.go b/client_timesync_fmp4.go index 576e7ac..f2de1f9 100644 --- a/client_timesync_fmp4.go +++ b/client_timesync_fmp4.go @@ -34,7 +34,7 @@ func newClientTimeSyncFMP4(timeScale uint32, baseTime uint64) *clientTimeSyncFMP func (ts *clientTimeSyncFMP4) convertAndSync(ctx context.Context, timeScale uint32, rawDTS uint64, ptsOffset int32, -) (time.Duration, error) { +) (time.Duration, time.Duration, error) { pts := durationMp4ToGo(rawDTS+uint64(ptsOffset), timeScale) dts := durationMp4ToGo(rawDTS, timeScale) @@ -45,15 +45,15 @@ func (ts *clientTimeSyncFMP4) convertAndSync(ctx context.Context, timeScale uint if dts > elapsed { diff := dts - elapsed if diff > clientMaxDTSRTCDiff { - return 0, fmt.Errorf("difference between DTS and RTC is too big") + return 0, 0, fmt.Errorf("difference between DTS and RTC is too big") } select { case <-time.After(diff): case <-ctx.Done(): - return 0, fmt.Errorf("terminated") + return 0, 0, fmt.Errorf("terminated") } } - return pts, nil + return pts, dts, nil } diff --git a/client_timesync_mpegts.go b/client_timesync_mpegts.go index 3adf558..24df59d 100644 --- a/client_timesync_mpegts.go +++ b/client_timesync_mpegts.go @@ -22,25 +22,27 @@ func newClientTimeSyncMPEGTS(startDTS int64) *clientTimeSyncMPEGTS { } } -func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, rawDTS int64, rawPTS int64) (time.Duration, error) { +func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, + rawPTS int64, rawDTS int64, +) (time.Duration, time.Duration, error) { ts.mutex.Lock() - dts := ts.td.Decode(rawDTS) pts := ts.td.Decode(rawPTS) + dts := ts.td.Decode(rawDTS) ts.mutex.Unlock() elapsed := time.Since(ts.startRTC) if dts > elapsed { diff := dts - elapsed if diff > clientMaxDTSRTCDiff { - return 0, fmt.Errorf("difference between DTS and RTC is too big") + return 0, 0, fmt.Errorf("difference between DTS and RTC is too big") } select { case <-time.After(diff): case <-ctx.Done(): - return 0, fmt.Errorf("terminated") + return 0, 0, fmt.Errorf("terminated") } } - return pts, nil + return pts, dts, nil } diff --git a/client_track_processor.go b/client_track_processor.go new file mode 100644 index 0000000..113409d --- /dev/null +++ b/client_track_processor.go @@ -0,0 +1,41 @@ +package gohlslib + +import ( + "context" + "fmt" +) + +type clientTrackProcessor struct { + queue chan func() error +} + +func newClientTrackProcessor() *clientTrackProcessor { + return &clientTrackProcessor{ + queue: make(chan func() error, clientMPEGTSEntryQueueSize), + } +} + +func (t *clientTrackProcessor) run(ctx context.Context) error { + for { + select { + case cb := <-t.queue: + err := cb() + if err != nil { + return err + } + + case <-ctx.Done(): + return nil + } + } +} + +func (t *clientTrackProcessor) push(ctx context.Context, cb func() error) error { + select { + case t.queue <- cb: + return nil + + case <-ctx.Done(): + return fmt.Errorf("terminated") + } +} diff --git a/examples/client/main.go b/examples/client/main.go index f2c8c20..eff6997 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -5,6 +5,7 @@ import ( "time" "github.com/bluenviron/gohlslib" + "github.com/bluenviron/gohlslib/pkg/codecs" ) // This example shows how to read a HLS stream. @@ -18,13 +19,28 @@ func main() { // setup a hook that is called when tracks are parsed c.OnTracks(func(tracks []*gohlslib.Track) error { for _, track := range tracks { + ttrack := track + log.Printf("detected track with codec %T\n", track.Codec) // setup a hook that is called when data is received - ttrack := track - c.OnData(track, func(pts time.Duration, unit interface{}) { - log.Printf("received data from track %T, pts = %v", ttrack, pts) - }) + switch track.Codec.(type) { + case *codecs.H264, *codecs.H265: + c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) { + log.Printf("received data from track %T, pts = %v", ttrack, pts) + }) + + case *codecs.MPEG4Audio: + c.OnDataMPEG4Audio(track, func(pts time.Duration, dts time.Duration, aus [][]byte) { + log.Printf("received data from track %T, pts = %v", ttrack, pts) + }) + + case *codecs.Opus: + c.OnDataOpus(track, func(pts time.Duration, dts time.Duration, packets [][]byte) { + log.Printf("received data from track %T, pts = %v", ttrack, pts) + }) + } + } return nil }) diff --git a/go.mod b/go.mod index 6665188..294b5cc 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/abema/go-mp4 v0.11.0 github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 github.com/asticode/go-astits v1.11.0 - github.com/bluenviron/mediacommon v0.6.0 + github.com/bluenviron/mediacommon v0.7.0 github.com/gin-gonic/gin v1.9.1 github.com/stretchr/testify v1.8.4 ) diff --git a/go.sum b/go.sum index dd1fef1..ba6ff57 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflx github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng= github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= -github.com/bluenviron/mediacommon v0.6.0 h1:suWFWHL9WL+sfBQPmleCd5jCY0iEtuKgvPRUaBGoq+g= -github.com/bluenviron/mediacommon v0.6.0/go.mod h1:wuLJdxcITiSPgY1MvQqrX+qPlKmNfeV9wNvXth5M98I= +github.com/bluenviron/mediacommon v0.7.0 h1:dJWLLL9oDbAqfK8KuNfnDUQwNbeMAtGeRjZc9Vo95js= +github.com/bluenviron/mediacommon v0.7.0/go.mod h1:wuLJdxcITiSPgY1MvQqrX+qPlKmNfeV9wNvXth5M98I= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/muxer.go b/muxer.go index 853a564..401b49b 100644 --- a/muxer.go +++ b/muxer.go @@ -114,7 +114,7 @@ func (m *Muxer) Start() error { if m.AudioTrack != nil { if _, ok := m.AudioTrack.Codec.(*codecs.MPEG4Audio); !ok { return fmt.Errorf( - "the MPEG-TS variant of HLS only supports MPEG4-audio. Use the fMP4 or Low-Latency variants instead") + "the MPEG-TS variant of HLS only supports MPEG-4 Audio. Use the fMP4 or Low-Latency variants instead") } } } @@ -281,9 +281,14 @@ func (m *Muxer) WriteH26x(ntp time.Time, pts time.Duration, au [][]byte) error { return m.segmenter.writeH26x(ntp, pts, au, randomAccessPresent, forceSwitch) } -// WriteAudio writes an audio access unit. -func (m *Muxer) WriteAudio(ntp time.Time, pts time.Duration, au []byte) error { - return m.segmenter.writeAudio(ntp, pts, au) +// WriteMPEG4Audio writes aMPEG-4 Audio access units. +func (m *Muxer) WriteMPEG4Audio(ntp time.Time, pts time.Duration, aus [][]byte) error { + return m.segmenter.writeMPEG4Audio(ntp, pts, aus) +} + +// WriteOpus writes Opus packets. +func (m *Muxer) WriteOpus(ntp time.Time, pts time.Duration, packets [][]byte) error { + return m.segmenter.writeOpus(ntp, pts, packets) } // Handle handles a HTTP request. diff --git a/muxer_segment_mpegts.go b/muxer_segment_mpegts.go index 3181ba8..e9f440a 100644 --- a/muxer_segment_mpegts.go +++ b/muxer_segment_mpegts.go @@ -9,13 +9,19 @@ import ( "github.com/bluenviron/gohlslib/pkg/storage" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) +func durationGoToMPEGTS(v time.Duration) int64 { + return int64(v.Seconds() * 90000) +} + type muxerSegmentMPEGTS struct { - segmentMaxSize uint64 - hasVideoTrack bool - writer *mpegts.Writer + segmentMaxSize uint64 + writerVideoTrack *mpegts.Track + writerAudioTrack *mpegts.Track + writer *mpegts.Writer storage storage.File storagePart storage.Part @@ -32,16 +38,19 @@ func newMuxerSegmentMPEGTS( id uint64, startNTP time.Time, segmentMaxSize uint64, - hasVideoTrack bool, + writerVideoTrack *mpegts.Track, + writerAudioTrack *mpegts.Track, + switchableWriter *switchableWriter, writer *mpegts.Writer, factory storage.Factory, ) (*muxerSegmentMPEGTS, error) { t := &muxerSegmentMPEGTS{ - segmentMaxSize: segmentMaxSize, - hasVideoTrack: hasVideoTrack, - writer: writer, - startNTP: startNTP, - name: "seg" + strconv.FormatUint(id, 10), + segmentMaxSize: segmentMaxSize, + writerVideoTrack: writerVideoTrack, + writerAudioTrack: writerAudioTrack, + writer: writer, + startNTP: startNTP, + name: "seg" + strconv.FormatUint(id, 10), } var err error @@ -53,7 +62,7 @@ func newMuxerSegmentMPEGTS( t.storagePart = t.storage.NewPart() t.bw = bufio.NewWriter(t.storagePart.Writer()) - writer.SetByteWriter(t.bw) + switchableWriter.w = t.bw return t, nil } @@ -86,14 +95,13 @@ func (t *muxerSegmentMPEGTS) finalize(nextDTS time.Duration) { } func (t *muxerSegmentMPEGTS) writeH264( - pcr time.Duration, dts time.Duration, pts time.Duration, idrPresent bool, - nalus [][]byte, + au [][]byte, ) error { size := uint64(0) - for _, nalu := range nalus { + for _, nalu := range au { size += uint64(len(nalu)) } if (t.size + size) > t.segmentMaxSize { @@ -101,7 +109,10 @@ func (t *muxerSegmentMPEGTS) writeH264( } t.size += size - err := t.writer.WriteH264(pcr, dts, pts, idrPresent, nalus) + // prepend an AUD. This is required by video.js and iOS + au = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, au...) + + err := t.writer.WriteH26x(t.writerVideoTrack, durationGoToMPEGTS(dts), durationGoToMPEGTS(pts), idrPresent, au) if err != nil { return err } @@ -114,23 +125,26 @@ func (t *muxerSegmentMPEGTS) writeH264( return nil } -func (t *muxerSegmentMPEGTS) writeAAC( - pcr time.Duration, +func (t *muxerSegmentMPEGTS) writeMPEG4Audio( pts time.Duration, - au []byte, + aus [][]byte, ) error { - size := uint64(len(au)) + size := uint64(0) + for _, au := range aus { + size += uint64(len(au)) + } + if (t.size + size) > t.segmentMaxSize { return fmt.Errorf("reached maximum segment size") } t.size += size - err := t.writer.WriteAAC(pcr, pts, au) + err := t.writer.WriteMPEG4Audio(t.writerAudioTrack, durationGoToMPEGTS(pts), aus) if err != nil { return err } - if !t.hasVideoTrack { + if t.writerVideoTrack == nil { t.audioAUCount++ if t.startDTS == nil { diff --git a/muxer_segmenter.go b/muxer_segmenter.go index 9f159b9..8b60901 100644 --- a/muxer_segmenter.go +++ b/muxer_segmenter.go @@ -7,5 +7,6 @@ import ( type muxerSegmenter interface { close() writeH26x(time.Time, time.Duration, [][]byte, bool, bool) error - writeAudio(time.Time, time.Duration, []byte) error + writeMPEG4Audio(time.Time, time.Duration, [][]byte) error + writeOpus(time.Time, time.Duration, [][]byte) error } diff --git a/muxer_segmenter_fmp4.go b/muxer_segmenter_fmp4.go index 7538b49..dcaabc3 100644 --- a/muxer_segmenter_fmp4.go +++ b/muxer_segmenter_fmp4.go @@ -6,6 +6,8 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/codecs/opus" "github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/gohlslib/pkg/fmp4" @@ -318,6 +320,39 @@ func (m *muxerSegmenterFMP4) writeH26x( return nil } +func (m *muxerSegmenterFMP4) writeMPEG4Audio(ntp time.Time, pts time.Duration, aus [][]byte) error { + sampleRate := time.Duration(m.audioTrack.Codec.(*codecs.MPEG4Audio).Config.SampleRate) + + for i, au := range aus { + auNTP := ntp.Add(time.Duration(i) * mpeg4audio.SamplesPerAccessUnit * + time.Second / sampleRate) + auPTS := pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/sampleRate + + err := m.writeAudio(auNTP, auPTS, au) + if err != nil { + return err + } + } + + return nil +} + +func (m *muxerSegmenterFMP4) writeOpus(ntp time.Time, pts time.Duration, packets [][]byte) error { + for _, packet := range packets { + err := m.writeAudio(ntp, pts, packet) + if err != nil { + return err + } + + duration := opus.PacketDuration(packet) + ntp = ntp.Add(duration) + pts += duration + } + + return nil +} + func (m *muxerSegmenterFMP4) writeAudio(ntp time.Time, dts time.Duration, au []byte) error { if m.videoTrack != nil { // wait for the video track diff --git a/muxer_segmenter_mpegts.go b/muxer_segmenter_mpegts.go index c24ef69..23896fe 100644 --- a/muxer_segmenter_mpegts.go +++ b/muxer_segmenter_mpegts.go @@ -2,6 +2,7 @@ package gohlslib import ( "fmt" + "io" "time" "github.com/bluenviron/mediacommon/pkg/codecs/h264" @@ -16,18 +17,16 @@ const ( ) func trackHLSToMPEGTS(t *Track) *mpegts.Track { - if t == nil { - return nil - } - switch tcodec := t.Codec.(type) { case *codecs.H264: return &mpegts.Track{ + PID: 256, Codec: &mpegts.CodecH264{}, } case *codecs.MPEG4Audio: return &mpegts.Track{ + PID: 257, Codec: &mpegts.CodecMPEG4Audio{ Config: tcodec.Config, }, @@ -37,6 +36,14 @@ func trackHLSToMPEGTS(t *Track) *mpegts.Track { return nil } +type switchableWriter struct { + w io.Writer +} + +func (w *switchableWriter) Write(p []byte) (int, error) { + return w.w.Write(p) +} + type muxerSegmenterMPEGTS struct { segmentDuration time.Duration segmentMaxSize uint64 @@ -45,11 +52,13 @@ type muxerSegmenterMPEGTS struct { factory storage.Factory onSegmentReady func(muxerSegment) + writerVideoTrack *mpegts.Track + writerAudioTrack *mpegts.Track + switchableWriter *switchableWriter writer *mpegts.Writer nextSegmentID uint64 currentSegment *muxerSegmentMPEGTS videoDTSExtractor *h264.DTSExtractor - startPCR time.Time startDTS time.Duration } @@ -70,9 +79,21 @@ func newMuxerSegmenterMPEGTS( onSegmentReady: onSegmentReady, } - m.writer = mpegts.NewWriter( - trackHLSToMPEGTS(videoTrack), - trackHLSToMPEGTS(audioTrack)) + var tracks []*mpegts.Track + + if videoTrack != nil { + m.writerVideoTrack = trackHLSToMPEGTS(videoTrack) + tracks = append(tracks, m.writerVideoTrack) + } + + if audioTrack != nil { + m.writerAudioTrack = trackHLSToMPEGTS(audioTrack) + tracks = append(tracks, m.writerAudioTrack) + } + + m.switchableWriter = &switchableWriter{} + + m.writer = mpegts.NewWriter(m.switchableWriter, tracks) return m } @@ -113,7 +134,6 @@ func (m *muxerSegmenterMPEGTS) writeH26x( return fmt.Errorf("unable to extract DTS: %v", err) } - m.startPCR = ntp m.startDTS = dts dts = 0 pts -= m.startDTS @@ -123,7 +143,9 @@ func (m *muxerSegmenterMPEGTS) writeH26x( m.genSegmentID(), ntp, m.segmentMaxSize, - m.videoTrack != nil, + m.writerVideoTrack, + m.writerAudioTrack, + m.switchableWriter, m.writer, m.factory) if err != nil { @@ -151,7 +173,9 @@ func (m *muxerSegmenterMPEGTS) writeH26x( m.genSegmentID(), ntp, m.segmentMaxSize, - m.videoTrack != nil, + m.writerVideoTrack, + m.writerAudioTrack, + m.switchableWriter, m.writer, m.factory, ) @@ -162,7 +186,6 @@ func (m *muxerSegmenterMPEGTS) writeH26x( } err := m.currentSegment.writeH264( - ntp.Sub(m.startPCR), dts, pts, randomAccessPresent, @@ -174,10 +197,17 @@ func (m *muxerSegmenterMPEGTS) writeH26x( return nil } -func (m *muxerSegmenterMPEGTS) writeAudio(ntp time.Time, pts time.Duration, au []byte) error { +func (m *muxerSegmenterMPEGTS) writeMPEG4Audio(ntp time.Time, pts time.Duration, aus [][]byte) error { + return m.writeAudio(ntp, pts, aus) +} + +func (m *muxerSegmenterMPEGTS) writeOpus(_ time.Time, _ time.Duration, _ [][]byte) error { + return fmt.Errorf("unimplemented") +} + +func (m *muxerSegmenterMPEGTS) writeAudio(ntp time.Time, pts time.Duration, aus [][]byte) error { if m.videoTrack == nil { if m.currentSegment == nil { - m.startPCR = ntp m.startDTS = pts pts = 0 @@ -187,7 +217,9 @@ func (m *muxerSegmenterMPEGTS) writeAudio(ntp time.Time, pts time.Duration, au [ m.genSegmentID(), ntp, m.segmentMaxSize, - m.videoTrack != nil, + m.writerVideoTrack, + m.writerAudioTrack, + m.switchableWriter, m.writer, m.factory, ) @@ -208,7 +240,9 @@ func (m *muxerSegmenterMPEGTS) writeAudio(ntp time.Time, pts time.Duration, au [ m.genSegmentID(), ntp, m.segmentMaxSize, - m.videoTrack != nil, + m.writerVideoTrack, + m.writerAudioTrack, + m.switchableWriter, m.writer, m.factory, ) @@ -226,7 +260,7 @@ func (m *muxerSegmenterMPEGTS) writeAudio(ntp time.Time, pts time.Duration, au [ pts -= m.startDTS } - err := m.currentSegment.writeAAC(ntp.Sub(m.startPCR), pts, au) + err := m.currentSegment.writeMPEG4Audio(pts, aus) if err != nil { return err } diff --git a/muxer_test.go b/muxer_test.go index 849b9ac..e7e72d2 100644 --- a/muxer_test.go +++ b/muxer_test.go @@ -137,15 +137,15 @@ func TestMuxerVideoAudio(t *testing.T) { require.NoError(t, err) d = 3 * time.Second - err = m.WriteAudio(testTime.Add(d-1*time.Second), d, []byte{ + err = m.WriteMPEG4Audio(testTime.Add(d-1*time.Second), d, [][]byte{{ 0x01, 0x02, 0x03, 0x04, - }) + }}) require.NoError(t, err) d = 3500 * time.Millisecond - err = m.WriteAudio(testTime.Add(d-1*time.Second), d, []byte{ + err = m.WriteMPEG4Audio(testTime.Add(d-1*time.Second), d, [][]byte{{ 0x01, 0x02, 0x03, 0x04, - }) + }}) require.NoError(t, err) // access unit without IDR @@ -156,9 +156,9 @@ func TestMuxerVideoAudio(t *testing.T) { require.NoError(t, err) d = 4500 * time.Millisecond - err = m.WriteAudio(testTime.Add(d-1*time.Second), d, []byte{ + err = m.WriteMPEG4Audio(testTime.Add(d-1*time.Second), d, [][]byte{{ 0x01, 0x02, 0x03, 0x04, - }) + }}) require.NoError(t, err) // access unit with IDR @@ -449,22 +449,22 @@ func TestMuxerAudioOnly(t *testing.T) { for i := 0; i < 100; i++ { d := 1 * time.Second - err = m.WriteAudio(testTime.Add(d-1*time.Second), d, []byte{ + err = m.WriteMPEG4Audio(testTime.Add(d-1*time.Second), d, [][]byte{{ 0x01, 0x02, 0x03, 0x04, - }) + }}) require.NoError(t, err) } d := 2 * time.Second - err = m.WriteAudio(testTime.Add(d-1*time.Second), d, []byte{ + err = m.WriteMPEG4Audio(testTime.Add(d-1*time.Second), d, [][]byte{{ 0x01, 0x02, 0x03, 0x04, - }) + }}) require.NoError(t, err) d = 3 * time.Second - err = m.WriteAudio(testTime.Add(d-1*time.Second), d, []byte{ + err = m.WriteMPEG4Audio(testTime.Add(d-1*time.Second), d, [][]byte{{ 0x01, 0x02, 0x03, 0x04, - }) + }}) require.NoError(t, err) byts, err := readPath(m, "index.m3u8", "", "", "") diff --git a/pkg/codecs/mpeg4audio.go b/pkg/codecs/mpeg4audio.go index 7622a78..978437c 100644 --- a/pkg/codecs/mpeg4audio.go +++ b/pkg/codecs/mpeg4audio.go @@ -4,7 +4,7 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" ) -// MPEG4Audio is a MPEG4-Audio codec. +// MPEG4Audio is a MPEG-4 Audio codec. type MPEG4Audio struct { mpeg4audio.Config } diff --git a/pkg/fmp4/init.go b/pkg/fmp4/init.go index d315279..6711640 100644 --- a/pkg/fmp4/init.go +++ b/pkg/fmp4/init.go @@ -201,13 +201,13 @@ func (i *Init) Unmarshal(byts []byte) error { return nil }() if encodedConf == nil { - return nil, fmt.Errorf("unable to find MPEG4-audio configuration") + return nil, fmt.Errorf("unable to find MPEG-4 Audio configuration") } var c mpeg4audio.Config err = c.Unmarshal(encodedConf) if err != nil { - return nil, fmt.Errorf("invalid MPEG4-audio configuration: %s", err) + return nil, fmt.Errorf("invalid MPEG-4 Audio configuration: %s", err) } curTrack.Codec = &codecs.MPEG4Audio{