Skip to content

Commit

Permalink
update mediacommon; replace Client.OnData with OnDataH26x, OnDataMPEG…
Browse files Browse the repository at this point in the history
…4Audio, OnDataOpus
  • Loading branch information
aler9 committed Jul 27, 2023
1 parent 89e42cf commit 300053e
Show file tree
Hide file tree
Showing 23 changed files with 512 additions and 405 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ General features:
* [Examples](#examples)
* [API Documentation](#api-documentation)
* [Standards](#standards)
* [Links](#links)
* [Related projects](#related-projects)

## Examples

Expand All @@ -72,8 +72,8 @@ https://pkg.go.dev/github.com/bluenviron/gohlslib#pkg-index
* [Codec standards](https://github.com/bluenviron/mediacommon#standards)
* [Golang project layout](https://github.com/golang-standards/project-layout)

## Links

Related projects
## Related projects

* [MediaMTX](https://github.com/bluenviron/mediamtx)
* [gortsplib](https://github.com/bluenviron/gortsplib)
* [mediacommon](https://github.com/bluenviron/mediacommon)
26 changes: 21 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ const (
clientMaxDTSRTCDiff = 10 * time.Second
)

type onDataH26xFunc func(pts time.Duration, dts time.Duration, au [][]byte)

type onDataMPEG4AudioFunc func(pts time.Duration, dts time.Duration, aus [][]byte)

type onDataOpusFunc func(pts time.Duration, dts time.Duration, packets [][]byte)

func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
if err != nil {
Expand Down Expand Up @@ -50,7 +56,7 @@ type Client struct {
ctx context.Context
ctxCancel func()
onTracks func([]*Track) error
onData map[*Track]func(time.Duration, interface{})
onData map[*Track]interface{}
playlistURL *url.URL

// out
Expand All @@ -74,7 +80,7 @@ func (c *Client) Start() error {

c.ctx, c.ctxCancel = context.WithCancel(context.Background())

c.onData = make(map[*Track]func(time.Duration, interface{}))
c.onData = make(map[*Track]interface{})
c.outErr = make(chan error, 1)

go c.run()
Expand All @@ -97,9 +103,19 @@ func (c *Client) OnTracks(cb func([]*Track) error) {
c.onTracks = cb
}

// OnData sets a callback that is called when data arrives.
func (c *Client) OnData(forma *Track, cb func(time.Duration, interface{})) {
c.onData[forma] = cb
// OnDataH26x sets a callback that is called when data from an H26x track is received.
func (c *Client) OnDataH26x(forma *Track, onData onDataH26xFunc) {
c.onData[forma] = onData
}

// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received.
func (c *Client) OnDataMPEG4Audio(forma *Track, onData onDataMPEG4AudioFunc) {
c.onData[forma] = onData

Check warning on line 113 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L112-L113

Added lines #L112 - L113 were not covered by tests
}

// OnDataOpus sets a callback that is called when data from an Opus track is received.
func (c *Client) OnDataOpus(forma *Track, onData onDataOpusFunc) {
c.onData[forma] = onData

Check warning on line 118 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

func (c *Client) run() {
Expand Down
5 changes: 2 additions & 3 deletions client_downloader_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/bluenviron/gohlslib/pkg/playlist"
)
Expand Down Expand Up @@ -104,7 +103,7 @@ type clientDownloaderPrimary struct {
httpClient *http.Client
log LogFunc
onTracks func([]*Track) error
onData map[*Track]func(time.Duration, interface{})
onData map[*Track]interface{}
rp *clientRoutinePool

leadingTimeSync clientTimeSync
Expand All @@ -123,7 +122,7 @@ func newClientDownloaderPrimary(
log LogFunc,
rp *clientRoutinePool,
onTracks func([]*Track) error,
onData map[*Track]func(time.Duration, interface{}),
onData map[*Track]interface{},
) *clientDownloaderPrimary {
return &clientDownloaderPrimary{
primaryPlaylistURL: primaryPlaylistURL,
Expand Down
5 changes: 2 additions & 3 deletions client_downloader_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"net/url"
"strconv"
"time"

"github.com/bluenviron/gohlslib/pkg/playlist"
)
Expand Down Expand Up @@ -40,7 +39,7 @@ type clientDownloaderStream struct {
onStreamTracks func(context.Context, []*Track) bool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]func(time.Duration, interface{})
onData map[*Track]interface{}

curSegmentID *int
}
Expand All @@ -55,7 +54,7 @@ func newClientDownloaderStream(
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onData map[*Track]func(time.Duration, interface{}),
onData map[*Track]interface{},
) *clientDownloaderStream {
return &clientDownloaderStream{
isLeading: isLeading,
Expand Down
184 changes: 121 additions & 63 deletions client_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ type clientProcessorFMP4 struct {
rp *clientRoutinePool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]func(time.Duration, interface{})
onData map[*Track]interface{}

tracks []*Track
init fmp4.Init
leadingTrackID int
trackProcs map[int]*clientProcessorFMP4Track
tracks []*Track
init fmp4.Init
leadingTrackID int
prePreProcessFuncs map[int]func(context.Context, *fmp4.PartTrack) error

// in
subpartProcessed chan struct{}
Expand All @@ -52,7 +52,7 @@ func newClientProcessorFMP4(
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onData map[*Track]func(time.Duration, interface{}),
onData map[*Track]interface{},
) (*clientProcessorFMP4, error) {
p := &clientProcessorFMP4{
isLeading: isLeading,
Expand Down Expand Up @@ -111,41 +111,16 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e
processingCount := 0

for _, part := range parts {
for _, track := range part.Tracks {
if p.trackProcs == nil {
var ts *clientTimeSyncFMP4

if p.isLeading {
if track.ID != p.leadingTrackID {
continue
}

timeScale := func() uint32 {
for _, track := range p.init.Tracks {
if track.ID == p.leadingTrackID {
return track.TimeScale
}
}
return 0
}()
ts = newClientTimeSyncFMP4(timeScale, track.BaseTime)
p.onSetLeadingTimeSync(ts)
} else {
rawTS, ok := p.onGetLeadingTimeSync(ctx)
if !ok {
return fmt.Errorf("terminated")
}

ts, ok = rawTS.(*clientTimeSyncFMP4)
if !ok {
return fmt.Errorf("stream playlists are mixed MPEGTS/FMP4")
}
for _, partTrack := range part.Tracks {
err := p.initializeTrackProcs(ctx, partTrack)
if err != nil {
if err == errSkipSilently {
continue

Check warning on line 118 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

p.initializeTrackProcs(ts)
return err

Check warning on line 120 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L120

Added line #L120 was not covered by tests
}

proc, ok := p.trackProcs[track.ID]
prePreProcess, ok := p.prePreProcessFuncs[partTrack.ID]
if !ok {
continue
}
Expand All @@ -154,11 +129,11 @@ func (p *clientProcessorFMP4) processSegment(ctx context.Context, byts []byte) e
return fmt.Errorf("too many part tracks at once")
}

select {
case proc.queue <- track:
case <-ctx.Done():
return fmt.Errorf("terminated")
err = prePreProcess(ctx, partTrack)
if err != nil {
return err

Check warning on line 134 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L134

Added line #L134 was not covered by tests
}

processingCount++
}
}
Expand All @@ -181,44 +156,127 @@ func (p *clientProcessorFMP4) onPartTrackProcessed(ctx context.Context) {
}
}

func (p *clientProcessorFMP4) initializeTrackProcs(ts *clientTimeSyncFMP4) {
p.trackProcs = make(map[int]*clientProcessorFMP4Track)
func (p *clientProcessorFMP4) initializeTrackProcs(ctx context.Context, track *fmp4.PartTrack) error {
if p.prePreProcessFuncs != nil {
return nil
}

Check warning on line 162 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L161-L162

Added lines #L161 - L162 were not covered by tests

for i, track := range p.tracks {
var cb func(time.Duration, []byte) error
var timeSync *clientTimeSyncFMP4
isLeadingTrack := (track.ID == p.leadingTrackID)

cb2, ok := p.onData[track]
if !ok {
cb2 = func(time.Duration, interface{}) {
if p.isLeading {
if !isLeadingTrack {
return errSkipSilently
}

Check warning on line 170 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L169-L170

Added lines #L169 - L170 were not covered by tests

timeScale := func() uint32 {
for _, track := range p.init.Tracks {
if isLeadingTrack {
return track.TimeScale
}
}
return 0

Check warning on line 178 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L178

Added line #L178 was not covered by tests
}()
timeSync = newClientTimeSyncFMP4(timeScale, track.BaseTime)
p.onSetLeadingTimeSync(timeSync)
} else {
rawTS, ok := p.onGetLeadingTimeSync(ctx)
if !ok {
return fmt.Errorf("terminated")
}

Check warning on line 186 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L182-L186

Added lines #L182 - L186 were not covered by tests

timeSync, ok = rawTS.(*clientTimeSyncFMP4)
if !ok {
return fmt.Errorf("stream playlists are mixed MPEG-TS/fMP4")

Check warning on line 190 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L188-L190

Added lines #L188 - L190 were not covered by tests
}
}

p.prePreProcessFuncs = make(map[int]func(context.Context, *fmp4.PartTrack) error)

for i, track := range p.tracks {
onData := p.onData[track]

var postProcess func(pts time.Duration, dts time.Duration, payload []byte) error

switch track.Codec.(type) {
case *codecs.H264, *codecs.H265:
cb = func(pts time.Duration, payload []byte) error {
nalus, err := h264.AVCCUnmarshal(payload)
var onDataCasted onDataH26xFunc = func(pts time.Duration, dts time.Duration, au [][]byte) {}
if onData != nil {
onDataCasted = onData.(onDataH26xFunc)
}

postProcess = func(pts time.Duration, dts time.Duration, payload []byte) error {
au, err := h264.AVCCUnmarshal(payload)
if err != nil {
return err
}

cb2(pts, nalus)
onDataCasted(pts, dts, au)
return nil
}

case *codecs.MPEG4Audio:
var onDataCasted onDataMPEG4AudioFunc = func(pts time.Duration, dts time.Duration, aus [][]byte) {}
if onData != nil {
onDataCasted = onData.(onDataMPEG4AudioFunc)
}

Check warning on line 222 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L218-L222

Added lines #L218 - L222 were not covered by tests

postProcess = func(pts time.Duration, dts time.Duration, payload []byte) error {
onDataCasted(pts, dts, [][]byte{payload})

Check warning on line 225 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L224-L225

Added lines #L224 - L225 were not covered by tests
return nil
}

case *codecs.MPEG4Audio, *codecs.Opus:
cb = func(pts time.Duration, payload []byte) error {
cb2(pts, payload)
case *codecs.Opus:
var onDataCasted onDataOpusFunc = func(pts time.Duration, dts time.Duration, packets [][]byte) {}
if onData != nil {
onDataCasted = onData.(onDataOpusFunc)
}

Check warning on line 233 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L229-L233

Added lines #L229 - L233 were not covered by tests

postProcess = func(pts time.Duration, dts time.Duration, payload []byte) error {
onDataCasted(pts, dts, [][]byte{payload})

Check warning on line 236 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L235-L236

Added lines #L235 - L236 were not covered by tests
return nil
}
}

proc := newClientProcessorFMP4Track(
p.init.Tracks[i].TimeScale,
ts,
p.onPartTrackProcessed,
cb,
)
p.rp.add(proc)
p.trackProcs[p.init.Tracks[i].ID] = proc
timeScale := p.init.Tracks[i].TimeScale

preProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error {
rawDTS := partTrack.BaseTime

for _, sample := range partTrack.Samples {
pts, dts, err := timeSync.convertAndSync(ctx, timeScale, rawDTS, sample.PTSOffset)
if err != nil {
return err
}

Check warning on line 250 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L249-L250

Added lines #L249 - L250 were not covered by tests

rawDTS += uint64(sample.Duration)

// silently discard packets prior to the first packet of the leading track
if pts < 0 {
continue

Check warning on line 256 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L256

Added line #L256 was not covered by tests
}

err = postProcess(pts, dts, sample.Payload)
if err != nil {
return err
}

Check warning on line 262 in client_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_processor_fmp4.go#L261-L262

Added lines #L261 - L262 were not covered by tests
}

p.onPartTrackProcessed(ctx)
return nil
}

trackProc := newClientTrackProcessor()
p.rp.add(trackProc)

prePreProcess := func(ctx context.Context, partTrack *fmp4.PartTrack) error {
return trackProc.push(ctx, func() error {
return preProcess(ctx, partTrack)
})
}

p.prePreProcessFuncs[p.init.Tracks[i].ID] = prePreProcess
}

return nil
}
Loading

0 comments on commit 300053e

Please sign in to comment.