Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use native timestamps instead of time.Duration #190

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@
type ClientOnTracksFunc func([]*Track) error

// ClientOnDataAV1Func is the prototype of the function passed to OnDataAV1().
type ClientOnDataAV1Func func(pts time.Duration, tu [][]byte)
type ClientOnDataAV1Func func(pts int64, tu [][]byte)

// ClientOnDataVP9Func is the prototype of the function passed to OnDataVP9().
type ClientOnDataVP9Func func(pts time.Duration, frame []byte)
type ClientOnDataVP9Func func(pts int64, frame []byte)

// ClientOnDataH26xFunc is the prototype of the function passed to OnDataH26x().
type ClientOnDataH26xFunc func(pts time.Duration, dts time.Duration, au [][]byte)
type ClientOnDataH26xFunc func(pts int64, dts int64, au [][]byte)

// ClientOnDataMPEG4AudioFunc is the prototype of the function passed to OnDataMPEG4Audio().
type ClientOnDataMPEG4AudioFunc func(pts time.Duration, aus [][]byte)
type ClientOnDataMPEG4AudioFunc func(pts int64, aus [][]byte)

// ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus().
type ClientOnDataOpusFunc func(pts time.Duration, packets [][]byte)
type ClientOnDataOpusFunc func(pts int64, packets [][]byte)

type clientOnStreamTracksFunc func(ctx context.Context, isLeading bool, tracks []*Track) ([]*clientTrack, bool)

type clientOnDataFunc func(pts time.Duration, dts time.Duration, data [][]byte)
type clientOnDataFunc func(pts int64, dts int64, data [][]byte)

func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
Expand Down Expand Up @@ -186,35 +186,35 @@

// OnDataAV1 sets a callback that is called when data from an AV1 track is received.
func (c *Client) OnDataAV1(track *Track, cb ClientOnDataAV1Func) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {

Check warning on line 189 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L189

Added line #L189 was not covered by tests
cb(pts, data)
}
}

// OnDataVP9 sets a callback that is called when data from a VP9 track is received.
func (c *Client) OnDataVP9(track *Track, cb ClientOnDataVP9Func) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {

Check warning on line 196 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L196

Added line #L196 was not covered by tests
cb(pts, data[0])
}
}

// OnDataH26x sets a callback that is called when data from an H26x track is received.
func (c *Client) OnDataH26x(track *Track, cb ClientOnDataH26xFunc) {
c.tracks[track].onData = func(pts time.Duration, dts time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, dts int64, data [][]byte) {
cb(pts, dts, data)
}
}

// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received.
func (c *Client) OnDataMPEG4Audio(track *Track, cb ClientOnDataMPEG4AudioFunc) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}

// OnDataOpus sets a callback that is called when data from an Opus track is received.
func (c *Client) OnDataOpus(track *Track, cb ClientOnDataOpusFunc) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {

Check warning on line 217 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L217

Added line #L217 was not covered by tests
cb(pts, data)
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@
for _, track := range tracks {
c.tracks[track] = &clientTrack{
track: track,
onData: func(_, _ time.Duration, _ [][]byte) {},
onData: func(_, _ int64, _ [][]byte) {},
}
}

Expand Down
45 changes: 24 additions & 21 deletions client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@
p.leadingTrackID = fmp4PickLeadingTrack(&p.init)

tracks := make([]*Track, len(p.init.Tracks))

for i, track := range p.init.Tracks {
tracks[i] = &Track{
Codec: codecs.FromFMP4(track.Codec),
Codec: codecs.FromFMP4(track.Codec),
ClockRate: int(track.TimeScale),
}
}

Expand Down Expand Up @@ -121,26 +123,28 @@

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative time.Duration
var ntpRelative int64
var leadingClockRate int

if p.trackProcessors == nil || seg.dateTime != nil {
partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if partTrack == nil {
return fmt.Errorf("could not find data of leading track")
}
partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if partTrack == nil {
return fmt.Errorf("could not find data of leading track")
}

Check warning on line 132 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L131-L132

Added lines #L131 - L132 were not covered by tests

if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, partTrack)
if err != nil {
return err
}
if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, partTrack)
if err != nil {
return err

Check warning on line 137 in client_stream_processor_fmp4.go

View check run for this annotation

Codecov / codecov/patch

client_stream_processor_fmp4.go#L137

Added line #L137 was not covered by tests
}
}

if seg.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *seg.dateTime
ntpRelative = p.timeConv.convert(partTrack.BaseTime, p.timeConv.leadingTimeScale)
}
leadingTrackProc := p.trackProcessors[partTrack.ID]
leadingClockRate = leadingTrackProc.track.track.ClockRate

if seg.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *seg.dateTime
ntpRelative = p.timeConv.convert(int64(partTrack.BaseTime), leadingClockRate)
}

partTrackCount := 0
Expand All @@ -155,7 +159,7 @@
err := trackProc.push(ctx, &procEntryFMP4{
ntpAvailable: ntpAvailable,
ntpAbsolute: ntpAbsolute,
ntpRelative: ntpRelative,
ntpRelative: multiplyAndDivide(ntpRelative, int64(trackProc.track.track.ClockRate), int64(leadingClockRate)),
partTrack: partTrack,
})
if err != nil {
Expand Down Expand Up @@ -196,8 +200,8 @@
timeScale := findTimeScaleOfLeadingTrack(p.init.Tracks, p.leadingTrackID)

p.timeConv = &clientTimeConvFMP4{
leadingTimeScale: timeScale,
initialBaseTime: partTrack.BaseTime,
leadingTimeScale: int64(timeScale),
leadingBaseTime: int64(partTrack.BaseTime),
}
p.timeConv.initialize()

Expand All @@ -219,7 +223,6 @@
for i, track := range p.clientStreamTracks {
trackProc := &clientTrackProcessorFMP4{
track: track,
timeScale: p.init.Tracks[i].TimeScale,
timeConv: p.timeConv,
onPartTrackProcessed: p.onPartTrackProcessed,
}
Expand Down
10 changes: 7 additions & 3 deletions client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
leadingTrackID := mpegtsPickLeadingTrack(p.reader.Tracks())

tracks := make([]*Track, len(p.reader.Tracks()))

for i, mpegtsTrack := range p.reader.Tracks() {
tracks[i] = &Track{
Codec: codecs.FromMPEGTS(mpegtsTrack.Codec),
Codec: codecs.FromMPEGTS(mpegtsTrack.Codec),
ClockRate: 90000,
}
}

Expand All @@ -180,7 +182,7 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative time.Duration
var ntpRelative int64

for i, mpegtsTrack := range p.reader.Tracks() {
track := p.clientStreamTracks[i]
Expand Down Expand Up @@ -225,7 +227,9 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs

ntp := time.Time{}
if ntpAvailable {
ntp = ntpAbsolute.Add(dts - ntpRelative)
diff := dts - ntpRelative
diffDur := timestampToDuration(diff, 90000)
ntp = ntpAbsolute.Add(diffDur)
}

return trackProc.push(ctx, &procEntryMPEGTS{
Expand Down
67 changes: 44 additions & 23 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,20 @@ func TestClient(t *testing.T) {
pps = testPPS
}

var audioClockRate int
if format == "fmp4" {
audioClockRate = 44100
} else {
audioClockRate = 90000
}

require.Equal(t, []*Track{
{
Codec: &codecs.H264{
SPS: sps,
PPS: pps,
},
ClockRate: 90000,
},
{
Codec: &codecs.MPEG4Audio{
Expand All @@ -423,14 +431,15 @@ func TestClient(t *testing.T) {
ChannelCount: 2,
},
},
ClockRate: audioClockRate,
},
}, tracks)

c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) {
c.OnDataH26x(tracks[0], func(pts int64, dts int64, au [][]byte) {
switch videoCount {
case 0:
require.Equal(t, time.Duration(0), dts)
require.Equal(t, 2*time.Second, pts)
require.Equal(t, int64(0), dts)
require.Equal(t, int64(2*90000), pts)
require.Equal(t, [][]byte{
{7, 1, 2, 3},
{8},
Expand All @@ -441,16 +450,16 @@ func TestClient(t *testing.T) {
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp)

case 1:
require.Equal(t, 33333333*time.Nanosecond, dts)
require.Equal(t, 2*time.Second+33333333*time.Nanosecond, pts)
require.Equal(t, int64(3000), dts)
require.Equal(t, int64(2*90000+3000), pts)
require.Equal(t, [][]byte{{1, 4, 5, 6}}, au)
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 33333333, time.UTC), ntp)

case 2:
require.Equal(t, 66666666*time.Nanosecond, dts)
require.Equal(t, 66666666*time.Nanosecond, pts)
require.Equal(t, int64(6000), dts)
require.Equal(t, int64(6000), pts)
require.Equal(t, [][]byte{{4}}, au)
_, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, false, ok)
Expand All @@ -459,17 +468,17 @@ func TestClient(t *testing.T) {
videoCount++
})

c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) {
c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
switch audioCount {
case 0:
require.Equal(t, 0*time.Second, pts)
require.Equal(t, int64(0), pts)
require.Equal(t, [][]byte{{1, 2, 3, 4}}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp)

case 1:
require.Equal(t, 33333333*time.Nanosecond, pts)
require.Equal(t, int64(0.0333336*float64(tracks[1].ClockRate)), pts)
require.Equal(t, [][]byte{{5, 6, 7, 8}}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
Expand Down Expand Up @@ -516,6 +525,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
"#EXT-X-INDEPENDENT-SEGMENTS\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MAP:URI=\"init_video.mp4\"\n" +
"#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" +
"#EXTINF:2,\n" +
"segment_video.mp4\n" +
"#EXT-X-ENDLIST\n"))
Expand All @@ -529,6 +539,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
"#EXT-X-INDEPENDENT-SEGMENTS\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MAP:URI=\"init_audio.mp4\"\n" +
"#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" +
"#EXTINF:2,\n" +
"segment_audio.mp4\n" +
"#EXT-X-ENDLIST"))
Expand Down Expand Up @@ -589,7 +600,8 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
err := mp4ToWriter(&fmp4.Part{
Tracks: []*fmp4.PartTrack{
{
ID: 1,
ID: 1,
BaseTime: 3000,
Samples: []*fmp4.PartSample{{
Duration: 44100,
Payload: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -627,30 +639,38 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
SPS: testSPS,
PPS: testPPS,
},
ClockRate: 90000,
},
{
Codec: &codecs.MPEG4Audio{
Config: testConfig,
},
ClockRate: 44100,
},
}, tracks)

c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) {
require.Equal(t, 3*time.Second, pts)
require.Equal(t, time.Duration(0), dts)
c.OnDataH26x(tracks[0], func(pts int64, dts int64, au [][]byte) {
require.Equal(t, int64(3*90000), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
{7, 1, 2, 3},
{8},
{5},
}, au)
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp)
packetRecv <- struct{}{}
})

c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) {
require.Equal(t, 0*time.Second, pts)
c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
require.Equal(t, int64(3000), pts)
require.Equal(t, [][]byte{
{1, 2, 3, 4},
}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 34693877, time.UTC), ntp)
packetRecv <- struct{}{}
})

Expand Down Expand Up @@ -820,17 +840,18 @@ func TestClientFMP4LowLatency(t *testing.T) {
SPS: testSPS,
PPS: testPPS,
},
ClockRate: 90000,
},
}, tracks)

c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) {
c.OnDataH26x(tracks[0], func(pts int64, dts int64, au [][]byte) {
switch recvCount {
case 0:
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 4, 0, time.UTC), ntp)
require.Equal(t, 0*time.Second, pts)
require.Equal(t, time.Duration(0), dts)
require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
{7, 1, 2, 3},
{8},
Expand All @@ -841,16 +862,16 @@ func TestClientFMP4LowLatency(t *testing.T) {
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 4, 33333333, time.UTC), ntp)
require.Equal(t, 33333333*time.Nanosecond, pts)
require.Equal(t, 33333333*time.Nanosecond, dts)
require.Equal(t, int64(3000), pts)
require.Equal(t, int64(3000), dts)
require.Equal(t, [][]byte{{1, 4, 5, 6}}, au)

case 2:
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 4, 66666666, time.UTC), ntp)
require.Equal(t, 66666666*time.Nanosecond, pts)
require.Equal(t, 66666666*time.Nanosecond, dts)
require.Equal(t, int64(6000), pts)
require.Equal(t, int64(6000), dts)
require.Equal(t, [][]byte{{1, 7, 8, 9}}, au)

default:
Expand Down
Loading
Loading