diff --git a/client.go b/client.go index e169e192..1e3e53a9 100644 --- a/client.go +++ b/client.go @@ -317,6 +317,7 @@ type Client struct { writer asyncProcessor reader *clientReader timeDecoder *rtptime.GlobalDecoder + timeDecoder2 *rtptime.GlobalDecoder2 mustClose bool // in @@ -799,6 +800,7 @@ func (c *Client) startReadRoutines() { } c.timeDecoder = rtptime.NewGlobalDecoder() + c.timeDecoder2 = rtptime.NewGlobalDecoder2() for _, cm := range c.medias { cm.start() @@ -1879,12 +1881,22 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error // PacketPTS returns the PTS of an incoming RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. +// +// Deprecated: replaced by PacketPTS2. func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { cm := c.medias[medi] ct := cm.formats[pkt.PayloadType] return c.timeDecoder.Decode(ct.format, pkt) } +// PacketPTS returns the PTS of an incoming RTP packet. +// It is computed by decoding the packet timestamp and sychronizing it with other tracks. +func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) { + cm := c.medias[medi] + ct := cm.formats[pkt.PayloadType] + return c.timeDecoder2.Decode(ct.format, pkt) +} + // PacketNTP returns the NTP timestamp of an incoming RTP packet. // The NTP timestamp is computed from RTCP sender reports. func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { diff --git a/examples/client-play-format-av1/main.go b/examples/client-play-format-av1/main.go index 96db9065..f301dace 100644 --- a/examples/client-play-format-av1/main.go +++ b/examples/client-play-format-av1/main.go @@ -59,7 +59,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-g711/main.go b/examples/client-play-format-g711/main.go index a2ed8e19..d8e60cf1 100644 --- a/examples/client-play-format-g711/main.go +++ b/examples/client-play-format-g711/main.go @@ -58,7 +58,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-g722/main.go b/examples/client-play-format-g722/main.go index 93b87f91..fee5f5c3 100644 --- a/examples/client-play-format-g722/main.go +++ b/examples/client-play-format-g722/main.go @@ -58,7 +58,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-h264-mpeg4audio-save-to-disk/main.go b/examples/client-play-format-h264-mpeg4audio-save-to-disk/main.go index d0283708..4e863bce 100644 --- a/examples/client-play-format-h264-mpeg4audio-save-to-disk/main.go +++ b/examples/client-play-format-h264-mpeg4audio-save-to-disk/main.go @@ -84,7 +84,7 @@ func main() { // called when a H264/RTP packet arrives c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(h264Media, pkt) + pts, ok := c.PacketPTS2(h264Media, pkt) if !ok { log.Printf("waiting for timestamp") return @@ -112,7 +112,7 @@ func main() { // called when a MPEG-4 audio / RTP packet arrives c.OnPacketRTP(mpeg4AudioMedia, mpeg4AudioFormat, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(mpeg4AudioMedia, pkt) + pts, ok := c.PacketPTS2(mpeg4AudioMedia, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-h264-mpeg4audio-save-to-disk/mpegts_muxer.go b/examples/client-play-format-h264-mpeg4audio-save-to-disk/mpegts_muxer.go index b5b51699..4f14e8b1 100644 --- a/examples/client-play-format-h264-mpeg4audio-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-h264-mpeg4audio-save-to-disk/mpegts_muxer.go @@ -4,15 +4,16 @@ import ( "bufio" "os" "sync" - "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) } // mpegtsMuxer allows to save a H264 / MPEG-4 audio stream into a MPEG-TS file. @@ -26,7 +27,7 @@ type mpegtsMuxer struct { w *mpegts.Writer h264Track *mpegts.Track mpeg4AudioTrack *mpegts.Track - dtsExtractor *h264.DTSExtractor + dtsExtractor *h264.DTSExtractor2 mutex sync.Mutex } @@ -61,7 +62,7 @@ func (e *mpegtsMuxer) close() { } // writeH264 writes a H264 access unit into MPEG-TS. -func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { +func (e *mpegtsMuxer) writeH264(au [][]byte, pts int64) error { e.mutex.Lock() defer e.mutex.Unlock() @@ -105,30 +106,27 @@ func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { au = append([][]byte{e.h264Format.SPS, e.h264Format.PPS}, au...) } - var dts time.Duration - if e.dtsExtractor == nil { // skip samples silently until we find one with a IDR if !idrPresent { return nil } - e.dtsExtractor = h264.NewDTSExtractor() + e.dtsExtractor = h264.NewDTSExtractor2() } - var err error - dts, err = e.dtsExtractor.Extract(au, pts) + dts, err := e.dtsExtractor.Extract(au, pts) if err != nil { return err } // encode into MPEG-TS - return e.w.WriteH264(e.h264Track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, au) + return e.w.WriteH264(e.h264Track, pts, dts, idrPresent, au) } // writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. -func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts time.Duration) error { +func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error { e.mutex.Lock() defer e.mutex.Unlock() - return e.w.WriteMPEG4Audio(e.mpeg4AudioTrack, durationGoToMPEGTS(pts), aus) + return e.w.WriteMPEG4Audio(e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.mpeg4AudioFormat.ClockRate())), aus) } diff --git a/examples/client-play-format-h264-save-to-disk/main.go b/examples/client-play-format-h264-save-to-disk/main.go index e545b0da..d1352a05 100644 --- a/examples/client-play-format-h264-save-to-disk/main.go +++ b/examples/client-play-format-h264-save-to-disk/main.go @@ -71,7 +71,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go b/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go index 62d03fb7..475b8e80 100644 --- a/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go @@ -3,16 +3,11 @@ package main import ( "bufio" "os" - "time" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) -} - // mpegtsMuxer allows to save a H264 stream into a MPEG-TS file. type mpegtsMuxer struct { fileName string @@ -23,7 +18,7 @@ type mpegtsMuxer struct { b *bufio.Writer w *mpegts.Writer track *mpegts.Track - dtsExtractor *h264.DTSExtractor + dtsExtractor *h264.DTSExtractor2 } // initialize initializes a mpegtsMuxer. @@ -51,7 +46,7 @@ func (e *mpegtsMuxer) close() { } // writeH264 writes a H264 access unit into MPEG-TS. -func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { +func (e *mpegtsMuxer) writeH264(au [][]byte, pts int64) error { var filteredAU [][]byte nonIDRPresent := false @@ -92,22 +87,19 @@ func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { au = append([][]byte{e.sps, e.pps}, au...) } - var dts time.Duration - if e.dtsExtractor == nil { // skip samples silently until we find one with a IDR if !idrPresent { return nil } - e.dtsExtractor = h264.NewDTSExtractor() + e.dtsExtractor = h264.NewDTSExtractor2() } - var err error - dts, err = e.dtsExtractor.Extract(au, pts) + dts, err := e.dtsExtractor.Extract(au, pts) if err != nil { return err } // encode into MPEG-TS - return e.w.WriteH264(e.track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, au) + return e.w.WriteH264(e.track, pts, dts, idrPresent, au) } diff --git a/examples/client-play-format-h264/main.go b/examples/client-play-format-h264/main.go index 27682288..45697964 100644 --- a/examples/client-play-format-h264/main.go +++ b/examples/client-play-format-h264/main.go @@ -78,7 +78,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-h265-save-to-disk/main.go b/examples/client-play-format-h265-save-to-disk/main.go index e737ccb2..7390f3dc 100644 --- a/examples/client-play-format-h265-save-to-disk/main.go +++ b/examples/client-play-format-h265-save-to-disk/main.go @@ -72,7 +72,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go b/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go index fd94ae77..7c0bfc1c 100644 --- a/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go @@ -3,16 +3,11 @@ package main import ( "bufio" "os" - "time" "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) -} - // mpegtsMuxer allows to save a H265 stream into a MPEG-TS file. type mpegtsMuxer struct { fileName string @@ -24,7 +19,7 @@ type mpegtsMuxer struct { b *bufio.Writer w *mpegts.Writer track *mpegts.Track - dtsExtractor *h265.DTSExtractor + dtsExtractor *h265.DTSExtractor2 } // initialize initializes a mpegtsMuxer. @@ -52,7 +47,7 @@ func (e *mpegtsMuxer) close() { } // writeH265 writes a H265 access unit into MPEG-TS. -func (e *mpegtsMuxer) writeH265(au [][]byte, pts time.Duration) error { +func (e *mpegtsMuxer) writeH265(au [][]byte, pts int64) error { var filteredAU [][]byte isRandomAccess := false @@ -93,22 +88,19 @@ func (e *mpegtsMuxer) writeH265(au [][]byte, pts time.Duration) error { au = append([][]byte{e.vps, e.sps, e.pps}, au...) } - var dts time.Duration - if e.dtsExtractor == nil { // skip samples silently until we find one with a IDR if !isRandomAccess { return nil } - e.dtsExtractor = h265.NewDTSExtractor() + e.dtsExtractor = h265.NewDTSExtractor2() } - var err error - dts, err = e.dtsExtractor.Extract(au, pts) + dts, err := e.dtsExtractor.Extract(au, pts) if err != nil { return err } // encode into MPEG-TS - return e.w.WriteH265(e.track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), isRandomAccess, au) + return e.w.WriteH265(e.track, pts, dts, isRandomAccess, au) } diff --git a/examples/client-play-format-h265/main.go b/examples/client-play-format-h265/main.go index 920e3ab1..d3a01886 100644 --- a/examples/client-play-format-h265/main.go +++ b/examples/client-play-format-h265/main.go @@ -81,7 +81,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-lpcm/main.go b/examples/client-play-format-lpcm/main.go index f9879036..df46f1b0 100644 --- a/examples/client-play-format-lpcm/main.go +++ b/examples/client-play-format-lpcm/main.go @@ -58,7 +58,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-mjpeg/main.go b/examples/client-play-format-mjpeg/main.go index b5ab1712..1964d402 100644 --- a/examples/client-play-format-mjpeg/main.go +++ b/examples/client-play-format-mjpeg/main.go @@ -62,7 +62,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-mpeg4audio-save-to-disk/main.go b/examples/client-play-format-mpeg4audio-save-to-disk/main.go index 753e91c4..956b3b06 100644 --- a/examples/client-play-format-mpeg4audio-save-to-disk/main.go +++ b/examples/client-play-format-mpeg4audio-save-to-disk/main.go @@ -53,6 +53,7 @@ func main() { // setup MPEG-4 audio -> MPEG-TS muxer mpegtsMuxer := &mpegtsMuxer{ fileName: "mystream.ts", + format: forma, track: &mpegts.Track{ Codec: &mpegts.CodecMPEG4Audio{ Config: *forma.Config, @@ -74,7 +75,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go b/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go index 3d055bdd..67316b7d 100644 --- a/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go @@ -3,18 +3,21 @@ package main import ( "bufio" "os" - "time" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) } // mpegtsMuxer allows to save a MPEG-4 audio stream into a MPEG-TS file. type mpegtsMuxer struct { fileName string + format format.Format track *mpegts.Track f *os.File @@ -43,6 +46,6 @@ func (e *mpegtsMuxer) close() { } // writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. -func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts time.Duration) error { - return e.w.WriteMPEG4Audio(e.track, durationGoToMPEGTS(pts), aus) +func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error { + return e.w.WriteMPEG4Audio(e.track, multiplyAndDivide(pts, 90000, int64(e.format.ClockRate())), aus) } diff --git a/examples/client-play-format-mpeg4audio/main.go b/examples/client-play-format-mpeg4audio/main.go index 6631f200..0d059dbf 100644 --- a/examples/client-play-format-mpeg4audio/main.go +++ b/examples/client-play-format-mpeg4audio/main.go @@ -58,7 +58,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-opus-save-to-disk/main.go b/examples/client-play-format-opus-save-to-disk/main.go index 925fb61f..94c17a15 100644 --- a/examples/client-play-format-opus-save-to-disk/main.go +++ b/examples/client-play-format-opus-save-to-disk/main.go @@ -53,6 +53,7 @@ func main() { // setup Opus -> MPEG-TS muxer mpegtsMuxer := &mpegtsMuxer{ fileName: "mystream.ts", + format: forma, track: &mpegts.Track{ Codec: &mpegts.CodecOpus{ ChannelCount: forma.ChannelCount, @@ -74,7 +75,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-opus-save-to-disk/mpegts_muxer.go b/examples/client-play-format-opus-save-to-disk/mpegts_muxer.go index c709d72a..f32fa04f 100644 --- a/examples/client-play-format-opus-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-opus-save-to-disk/mpegts_muxer.go @@ -3,18 +3,21 @@ package main import ( "bufio" "os" - "time" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) } // mpegtsMuxer allows to save a MPEG-4 audio stream into a MPEG-TS file. type mpegtsMuxer struct { fileName string + format format.Format track *mpegts.Track f *os.File @@ -43,6 +46,6 @@ func (e *mpegtsMuxer) close() { } // writeOpus writes Opus packets into MPEG-TS. -func (e *mpegtsMuxer) writeOpus(pkt []byte, pts time.Duration) error { - return e.w.WriteOpus(e.track, durationGoToMPEGTS(pts), [][]byte{pkt}) +func (e *mpegtsMuxer) writeOpus(pkt []byte, pts int64) error { + return e.w.WriteOpus(e.track, multiplyAndDivide(pts, 90000, int64(e.format.ClockRate())), [][]byte{pkt}) } diff --git a/examples/client-play-format-opus/main.go b/examples/client-play-format-opus/main.go index de89067c..13e9296c 100644 --- a/examples/client-play-format-opus/main.go +++ b/examples/client-play-format-opus/main.go @@ -58,7 +58,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-vp8/main.go b/examples/client-play-format-vp8/main.go index 77ea3dcf..86d63134 100644 --- a/examples/client-play-format-vp8/main.go +++ b/examples/client-play-format-vp8/main.go @@ -59,7 +59,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-format-vp9/main.go b/examples/client-play-format-vp9/main.go index 75b79c14..eab9ad74 100644 --- a/examples/client-play-format-vp9/main.go +++ b/examples/client-play-format-vp9/main.go @@ -59,7 +59,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := c.PacketPTS(medi, pkt) + pts, ok := c.PacketPTS2(medi, pkt) if !ok { log.Printf("waiting for timestamp") return diff --git a/examples/client-play-timestamp/main.go b/examples/client-play-timestamp/main.go index e24e1155..9ab6b82f 100644 --- a/examples/client-play-timestamp/main.go +++ b/examples/client-play-timestamp/main.go @@ -46,7 +46,7 @@ func main() { // called when a RTP packet arrives c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { // get the PTS timestamp of the packet, i.e. timestamp relative to the start of the session - pts, ptsAvailable := c.PacketPTS(medi, pkt) + pts, ptsAvailable := c.PacketPTS2(medi, pkt) log.Printf("PTS: available=%v, value=%v\n", ptsAvailable, pts) // get the NTP timestamp of the packet, i.e. the absolute timestamp diff --git a/examples/client-record-format-h264-from-disk/main.go b/examples/client-record-format-h264-from-disk/main.go index 37fc261f..54c9d920 100644 --- a/examples/client-record-format-h264-from-disk/main.go +++ b/examples/client-record-format-h264-from-disk/main.go @@ -9,7 +9,6 @@ import ( "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) @@ -74,18 +73,15 @@ func main() { panic(err) } - // setup RTP timestamp generator - rtpTime := &rtptime.Encoder{ClockRate: forma.ClockRate()} - err = rtpTime.Initialize() - if err != nil { - panic(err) - } - + timeDecoder := mpegts.NewTimeDecoder2() var firstDTS *int64 var startTime time.Time // setup a callback that is called whenever a H264 access unit is read from the file r.OnDataH264(track, func(pts, dts int64, au [][]byte) error { + dts = timeDecoder.Decode(dts) + pts = timeDecoder.Decode(pts) + // sleep between access units if firstDTS != nil { timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(startTime) @@ -105,10 +101,11 @@ func main() { return err } - // set timestamp - rtpTime := rtpTime.Encode(time.Duration(pts) * time.Second / 90000) + // set packet timestamp + // we don't have to perform any conversion + // since H264 clock rate is the same in both MPEG-TS and RTSP for _, packet := range packets { - packet.Timestamp = rtpTime + packet.Timestamp = uint32(pts) } // write packets to the server diff --git a/examples/client-record-format-mjpeg-from-image/main.go b/examples/client-record-format-mjpeg-from-image/main.go index 2692724d..b7133a09 100644 --- a/examples/client-record-format-mjpeg-from-image/main.go +++ b/examples/client-record-format-mjpeg-from-image/main.go @@ -10,7 +10,6 @@ import ( "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" ) // This example shows how to @@ -20,6 +19,12 @@ import ( // 4. generate RTP packets from the JPEG image // 5. write packets to the server +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + func createRandomImage(i int) *image.RGBA { img := image.NewRGBA(image.Rect(0, 0, 640, 480)) @@ -66,12 +71,6 @@ func main() { panic(err) } - // setup RTP timestamp generator - rtpTime := &rtptime.Encoder{ClockRate: forma.ClockRate()} - err = rtpTime.Initialize() - if err != nil { - panic(err) - } start := time.Now() // setup a ticker to sleep between frames @@ -99,11 +98,11 @@ func main() { } // get current timestamp - ts := rtpTime.Encode(time.Since(start)) + pts := uint32(multiplyAndDivide(int64(time.Since(start)), int64(forma.ClockRate()), int64(time.Second))) // write packets to the server for _, pkt := range pkts { - pkt.Timestamp = ts + pkt.Timestamp = pts err = c.WritePacketRTP(desc.Medias[0], pkt) if err != nil { diff --git a/examples/server-h264-save-to-disk/main.go b/examples/server-h264-save-to-disk/main.go index d29e9da6..5573b8c3 100644 --- a/examples/server-h264-save-to-disk/main.go +++ b/examples/server-h264-save-to-disk/main.go @@ -122,7 +122,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas // called when receiving a RTP packet ctx.Session.OnPacketRTP(sh.media, sh.format, func(pkt *rtp.Packet) { // decode timestamp - pts, ok := ctx.Session.PacketPTS(sh.media, pkt) + pts, ok := ctx.Session.PacketPTS2(sh.media, pkt) if !ok { return } diff --git a/examples/server-h264-save-to-disk/mpegts_muxer.go b/examples/server-h264-save-to-disk/mpegts_muxer.go index dcfe268b..278ff2f9 100644 --- a/examples/server-h264-save-to-disk/mpegts_muxer.go +++ b/examples/server-h264-save-to-disk/mpegts_muxer.go @@ -3,16 +3,11 @@ package main import ( "bufio" "os" - "time" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) -} - // mpegtsMuxer allows to save a H264 stream into a MPEG-TS file. type mpegtsMuxer struct { fileName string @@ -23,7 +18,7 @@ type mpegtsMuxer struct { b *bufio.Writer w *mpegts.Writer track *mpegts.Track - dtsExtractor *h264.DTSExtractor + dtsExtractor *h264.DTSExtractor2 } // initialize initializes a mpegtsMuxer. @@ -51,7 +46,7 @@ func (e *mpegtsMuxer) close() { } // writeH264 writes a H264 access unit into MPEG-TS. -func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { +func (e *mpegtsMuxer) writeH264(au [][]byte, pts int64) error { var filteredAU [][]byte nonIDRPresent := false @@ -92,22 +87,19 @@ func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { au = append([][]byte{e.sps, e.pps}, au...) } - var dts time.Duration - if e.dtsExtractor == nil { // skip samples silently until we find one with a IDR if !idrPresent { return nil } - e.dtsExtractor = h264.NewDTSExtractor() + e.dtsExtractor = h264.NewDTSExtractor2() } - var err error - dts, err = e.dtsExtractor.Extract(au, pts) + dts, err := e.dtsExtractor.Extract(au, pts) if err != nil { return err } // encode into MPEG-TS - return e.w.WriteH265(e.track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, au) + return e.w.WriteH265(e.track, pts, dts, idrPresent, au) } diff --git a/go.mod b/go.mod index 3ed6b69e..b3f57284 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/bluenviron/gortsplib/v4 go 1.20 require ( - github.com/bluenviron/mediacommon v1.12.4 + github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc github.com/google/uuid v1.6.0 github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.7-0.20240429002300-bc5124c9d0d0 diff --git a/go.sum b/go.sum index e1108d06..9e57355c 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflx github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c= github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= -github.com/bluenviron/mediacommon v1.12.4 h1:7VrA/W/iDB7VELquXqRjgjzUSJT3llZYgXjFN9WkByo= -github.com/bluenviron/mediacommon v1.12.4/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec= +github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc h1:walYSlRh0oE5Vn+H8dHoZCAOX/XjPUhy9umlckpsn3k= +github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/rtptime/encoder.go b/pkg/rtptime/encoder.go index 6b89231d..cf5103f4 100644 --- a/pkg/rtptime/encoder.go +++ b/pkg/rtptime/encoder.go @@ -23,6 +23,8 @@ func randUint32() (uint32, error) { } // Encoder is a RTP timestamp encoder. +// +// Deprecated: not used anymore. type Encoder struct { // Clock rate. ClockRate int diff --git a/pkg/rtptime/global_decoder.go b/pkg/rtptime/global_decoder.go index ba449a37..620ad9a8 100644 --- a/pkg/rtptime/global_decoder.go +++ b/pkg/rtptime/global_decoder.go @@ -51,6 +51,8 @@ type GlobalDecoderTrack interface { } // GlobalDecoder is a RTP timestamp decoder. +// +// Deprecated: replaced by GlobalDecoder2. type GlobalDecoder struct { mutex sync.Mutex leadingTrack GlobalDecoderTrack @@ -60,6 +62,8 @@ type GlobalDecoder struct { } // NewGlobalDecoder allocates a GlobalDecoder. +// +// Deprecated: replaced by NewGlobalDecoder2. func NewGlobalDecoder() *GlobalDecoder { return &GlobalDecoder{ tracks: make(map[GlobalDecoderTrack]*globalDecoderTrackData), @@ -104,16 +108,14 @@ func (d *GlobalDecoder) Decode( return df.startPTS, true } + pts := df.decode(pkt.Timestamp) + // update startNTP / startPTS if d.leadingTrack == track && track.PTSEqualsDTS(pkt) { - pts := df.decode(pkt.Timestamp) - now := timeNow() d.startNTP = now d.startPTS = pts - - return pts, true } - return df.decode(pkt.Timestamp), true + return pts, true } diff --git a/pkg/rtptime/global_decoder2.go b/pkg/rtptime/global_decoder2.go new file mode 100644 index 00000000..68644e7a --- /dev/null +++ b/pkg/rtptime/global_decoder2.go @@ -0,0 +1,103 @@ +package rtptime + +import ( + "sync" + "time" + + "github.com/pion/rtp" +) + +// avoid an int64 overflow and preserve resolution by splitting division into two parts: +// first add the integer part, then the decimal part. +func multiplyAndDivide2(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +type globalDecoder2TrackData struct { + overall int64 + prev uint32 +} + +func (d *globalDecoder2TrackData) decode(ts uint32) int64 { + d.overall += int64(int32(ts - d.prev)) + d.prev = ts + return d.overall +} + +// GlobalDecoder2Track is a track (RTSP format or WebRTC track) of GlobalDecoder2. +type GlobalDecoder2Track interface { + ClockRate() int + PTSEqualsDTS(*rtp.Packet) bool +} + +// GlobalDecoder2 is a RTP timestamp decoder. +type GlobalDecoder2 struct { + mutex sync.Mutex + leadingTrack GlobalDecoderTrack + startNTP time.Time + startPTS int64 + startPTSClockRate int64 + tracks map[GlobalDecoder2Track]*globalDecoder2TrackData +} + +// NewGlobalDecoder2 allocates a GlobalDecoder. +func NewGlobalDecoder2() *GlobalDecoder2 { + return &GlobalDecoder2{ + tracks: make(map[GlobalDecoder2Track]*globalDecoder2TrackData), + } +} + +// Decode decodes a timestamp. +func (d *GlobalDecoder2) Decode( + track GlobalDecoder2Track, + pkt *rtp.Packet, +) (int64, bool) { + if track.ClockRate() == 0 { + return 0, false + } + + d.mutex.Lock() + defer d.mutex.Unlock() + + df, ok := d.tracks[track] + + // never seen before track + if !ok { + if !track.PTSEqualsDTS(pkt) { + return 0, false + } + + now := timeNow() + + if d.leadingTrack == nil { + d.leadingTrack = track + d.startNTP = now + d.startPTS = 0 + d.startPTSClockRate = int64(track.ClockRate()) + } + + // start from the PTS of the leading track + startPTS := multiplyAndDivide2(d.startPTS, int64(track.ClockRate()), d.startPTSClockRate) + startPTS += multiplyAndDivide2(int64(now.Sub(d.startNTP)), int64(track.ClockRate()), int64(time.Second)) + + d.tracks[track] = &globalDecoder2TrackData{ + overall: startPTS, + prev: pkt.Timestamp, + } + + return startPTS, true + } + + pts := df.decode(pkt.Timestamp) + + // update startNTP / startPTS + if d.leadingTrack == track && track.PTSEqualsDTS(pkt) { + now := timeNow() + d.startNTP = now + d.startPTS = pts + } + + return pts, true +} diff --git a/server_session.go b/server_session.go index f86458a7..fc213b41 100644 --- a/server_session.go +++ b/server_session.go @@ -248,6 +248,7 @@ type ServerSession struct { udpCheckStreamTimer *time.Timer writer asyncProcessor timeDecoder *rtptime.GlobalDecoder + timeDecoder2 *rtptime.GlobalDecoder2 // in chHandleRequest chan sessionRequestReq @@ -948,6 +949,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.udpLastPacketTime = &v ss.timeDecoder = rtptime.NewGlobalDecoder() + ss.timeDecoder2 = rtptime.NewGlobalDecoder2() for _, sm := range ss.setuppedMedias { sm.start() @@ -1034,6 +1036,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.udpLastPacketTime = &v ss.timeDecoder = rtptime.NewGlobalDecoder() + ss.timeDecoder2 = rtptime.NewGlobalDecoder2() for _, sm := range ss.setuppedMedias { sm.start() @@ -1088,6 +1091,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( } ss.timeDecoder = nil + ss.timeDecoder2 = nil switch ss.state { case ServerSessionStatePlay: @@ -1255,12 +1259,22 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe // PacketPTS returns the PTS of an incoming RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. +// +// Deprecated: replaced by PacketPTS2. func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { sm := ss.setuppedMedias[medi] sf := sm.formats[pkt.PayloadType] return ss.timeDecoder.Decode(sf.format, pkt) } +// PacketPTS2 returns the PTS of an incoming RTP packet. +// It is computed by decoding the packet timestamp and sychronizing it with other tracks. +func (ss *ServerSession) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) { + sm := ss.setuppedMedias[medi] + sf := sm.formats[pkt.PayloadType] + return ss.timeDecoder2.Decode(sf.format, pkt) +} + // PacketNTP returns the NTP timestamp of an incoming RTP packet. // The NTP timestamp is computed from RTCP sender reports. func (ss *ServerSession) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) {