From 3df19dd677c588399143ccb5fff86b8d518283b8 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 6 Aug 2023 17:44:48 +0200 Subject: [PATCH] muxer: prepend prefix to segments This is needed to prevent usage of cached segments from previous muxing sessions --- muxer.go | 31 +++- muxer_part.go | 11 +- muxer_segment.go | 8 ++ muxer_segment_fmp4.go | 23 +-- muxer_segment_mpegts.go | 6 +- muxer_segmenter_fmp4.go | 69 +++++---- muxer_segmenter_mpegts.go | 7 + muxer_server.go | 296 +++++++++++++++++++------------------- muxer_test.go | 207 +++++++++++++++----------- 9 files changed, 374 insertions(+), 284 deletions(-) diff --git a/muxer.go b/muxer.go index 756f459..a885854 100644 --- a/muxer.go +++ b/muxer.go @@ -2,6 +2,8 @@ package gohlslib import ( "bytes" + "crypto/rand" + "encoding/hex" "fmt" "net/http" "time" @@ -14,6 +16,18 @@ import ( "github.com/bluenviron/gohlslib/pkg/storage" ) +// a prefix is needed to prevent usage of cached segments +// from previous muxing sessions. +func generatePrefix() (string, error) { + var buf [6]byte + _, err := rand.Read(buf[:]) + if err != nil { + return "", err + } + + return hex.EncodeToString(buf[:]), nil +} + // MuxerVariant is a muxer variant. type MuxerVariant int @@ -68,6 +82,7 @@ type Muxer struct { // private // + prefix string storageFactory storage.Factory server *muxerServer segmenter muxerSegmenter @@ -120,18 +135,24 @@ func (m *Muxer) Start() error { } } + var err error + m.prefix, err = generatePrefix() + if err != nil { + return err + } + if m.Directory != "" { m.storageFactory = storage.NewFactoryDisk(m.Directory) } else { m.storageFactory = storage.NewFactoryRAM() } - var err error m.server, err = newMuxerServer( m.Variant, m.SegmentCount, m.VideoTrack, m.AudioTrack, + m.prefix, m.storageFactory, ) if err != nil { @@ -144,8 +165,9 @@ func (m *Muxer) Start() error { m.SegmentMaxSize, m.VideoTrack, m.AudioTrack, + m.prefix, m.storageFactory, - m.server.onSegmentFinalized, + m.server.publishSegment, ) } else { m.segmenter = newMuxerSegmenterFMP4( @@ -155,9 +177,10 @@ func (m *Muxer) Start() error { m.SegmentMaxSize, m.VideoTrack, m.AudioTrack, + m.prefix, m.storageFactory, - m.server.onSegmentFinalized, - m.server.onPartFinalized, + m.server.publishSegment, + m.server.publishPart, ) } diff --git a/muxer_part.go b/muxer_part.go index b27c2e8..724e2fb 100644 --- a/muxer_part.go +++ b/muxer_part.go @@ -9,8 +9,8 @@ import ( "github.com/bluenviron/mediacommon/pkg/formats/fmp4" ) -func fmp4PartName(id uint64) string { - return "part" + strconv.FormatUint(id, 10) +func partName(prefix string, id uint64) string { + return prefix + "_part" + strconv.FormatUint(id, 10) + ".mp4" } type muxerPart struct { @@ -21,6 +21,7 @@ type muxerPart struct { id uint64 storage storage.Part + name string isIndependent bool videoSamples []*fmp4.PartSample audioSamples []*fmp4.PartSample @@ -36,6 +37,7 @@ func newMuxerPart( videoTrack *Track, audioTrack *Track, audioTrackTimeScale uint32, + prefix string, id uint64, storage storage.Part, ) *muxerPart { @@ -46,6 +48,7 @@ func newMuxerPart( audioTrackTimeScale: audioTrackTimeScale, id: id, storage: storage, + name: partName(prefix, id), } if videoTrack == nil { @@ -55,8 +58,8 @@ func newMuxerPart( return p } -func (p *muxerPart) name() string { - return fmp4PartName(p.id) +func (p *muxerPart) getName() string { + return p.name } func (p *muxerPart) reader() (io.ReadCloser, error) { diff --git a/muxer_segment.go b/muxer_segment.go index 7db623a..c0a4ddb 100644 --- a/muxer_segment.go +++ b/muxer_segment.go @@ -3,9 +3,17 @@ package gohlslib import ( "fmt" "io" + "strconv" "time" ) +func segmentName(prefix string, id uint64, mp4 bool) string { + if mp4 { + return prefix + "_seg" + strconv.FormatUint(id, 10) + ".mp4" + } + return prefix + "_seg" + strconv.FormatUint(id, 10) + ".ts" +} + type muxerSegment interface { close() getName() string diff --git a/muxer_segment_fmp4.go b/muxer_segment_fmp4.go index 7803aa2..ac21880 100644 --- a/muxer_segment_fmp4.go +++ b/muxer_segment_fmp4.go @@ -3,7 +3,6 @@ package gohlslib import ( "fmt" "io" - "strconv" "time" "github.com/bluenviron/gohlslib/pkg/storage" @@ -18,8 +17,9 @@ type muxerSegmentFMP4 struct { videoTrack *Track audioTrack *Track audioTrackTimeScale uint32 + prefix string genPartID func() uint64 - onPartFinalized func(*muxerPart) + publishPart func(*muxerPart) name string storage storage.File @@ -38,9 +38,10 @@ func newMuxerSegmentFMP4( videoTrack *Track, audioTrack *Track, audioTrackTimeScale uint32, + prefix string, factory storage.Factory, genPartID func() uint64, - onPartFinalized func(*muxerPart), + publishPart func(*muxerPart), ) (*muxerSegmentFMP4, error) { s := &muxerSegmentFMP4{ lowLatency: lowLatency, @@ -51,13 +52,14 @@ func newMuxerSegmentFMP4( videoTrack: videoTrack, audioTrack: audioTrack, audioTrackTimeScale: audioTrackTimeScale, + prefix: prefix, genPartID: genPartID, - onPartFinalized: onPartFinalized, - name: "seg" + strconv.FormatUint(id, 10), + publishPart: publishPart, + name: segmentName(prefix, id, true), } var err error - s.storage, err = factory.NewFile(s.name + ".mp4") + s.storage, err = factory.NewFile(s.name) if err != nil { return nil, err } @@ -67,6 +69,7 @@ func newMuxerSegmentFMP4( s.videoTrack, s.audioTrack, s.audioTrackTimeScale, + prefix, s.genPartID(), s.storage.NewPart(), ) @@ -101,7 +104,7 @@ func (s *muxerSegmentFMP4) finalize(nextDTS time.Duration) error { return err } - s.onPartFinalized(s.currentPart) + s.publishPart(s.currentPart) s.parts = append(s.parts, s.currentPart) } s.currentPart = nil @@ -135,13 +138,14 @@ func (s *muxerSegmentFMP4) writeVideo( } s.parts = append(s.parts, s.currentPart) - s.onPartFinalized(s.currentPart) + s.publishPart(s.currentPart) s.currentPart = newMuxerPart( nextDTS, s.videoTrack, s.audioTrack, s.audioTrackTimeScale, + s.prefix, s.genPartID(), s.storage.NewPart(), ) @@ -172,13 +176,14 @@ func (s *muxerSegmentFMP4) writeAudio( } s.parts = append(s.parts, s.currentPart) - s.onPartFinalized(s.currentPart) + s.publishPart(s.currentPart) s.currentPart = newMuxerPart( nextAudioSampleDTS, s.videoTrack, s.audioTrack, s.audioTrackTimeScale, + s.prefix, s.genPartID(), s.storage.NewPart(), ) diff --git a/muxer_segment_mpegts.go b/muxer_segment_mpegts.go index 14aa539..660e13b 100644 --- a/muxer_segment_mpegts.go +++ b/muxer_segment_mpegts.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "io" - "strconv" "time" "github.com/bluenviron/gohlslib/pkg/storage" @@ -42,6 +41,7 @@ func newMuxerSegmentMPEGTS( writerAudioTrack *mpegts.Track, switchableWriter *switchableWriter, writer *mpegts.Writer, + prefix string, factory storage.Factory, ) (*muxerSegmentMPEGTS, error) { t := &muxerSegmentMPEGTS{ @@ -50,11 +50,11 @@ func newMuxerSegmentMPEGTS( writerAudioTrack: writerAudioTrack, writer: writer, startNTP: startNTP, - name: "seg" + strconv.FormatUint(id, 10), + name: segmentName(prefix, id, false), } var err error - t.storage, err = factory.NewFile(t.name + ".ts") + t.storage, err = factory.NewFile(t.name) if err != nil { return nil, err } diff --git a/muxer_segmenter_fmp4.go b/muxer_segmenter_fmp4.go index 2387a2c..7f11a05 100644 --- a/muxer_segmenter_fmp4.go +++ b/muxer_segmenter_fmp4.go @@ -90,15 +90,16 @@ type augmentedAudioSample struct { } type muxerSegmenterFMP4 struct { - lowLatency bool - segmentDuration time.Duration - partDuration time.Duration - segmentMaxSize uint64 - videoTrack *Track - audioTrack *Track - factory storage.Factory - onSegmentFinalized func(muxerSegment) - onPartFinalized func(*muxerPart) + lowLatency bool + segmentDuration time.Duration + partDuration time.Duration + segmentMaxSize uint64 + videoTrack *Track + audioTrack *Track + prefix string + factory storage.Factory + publishSegment func(muxerSegment) + publishPart func(*muxerPart) audioTrackTimeScale uint32 startDTS time.Duration @@ -121,21 +122,23 @@ func newMuxerSegmenterFMP4( segmentMaxSize uint64, videoTrack *Track, audioTrack *Track, + prefix string, factory storage.Factory, - onSegmentFinalized func(muxerSegment), - onPartFinalized func(*muxerPart), + publishSegment func(muxerSegment), + publishPart func(*muxerPart), ) *muxerSegmenterFMP4 { m := &muxerSegmenterFMP4{ - lowLatency: lowLatency, - segmentDuration: segmentDuration, - partDuration: partDuration, - segmentMaxSize: segmentMaxSize, - videoTrack: videoTrack, - audioTrack: audioTrack, - factory: factory, - onSegmentFinalized: onSegmentFinalized, - onPartFinalized: onPartFinalized, - sampleDurations: make(map[time.Duration]struct{}), + lowLatency: lowLatency, + segmentDuration: segmentDuration, + partDuration: partDuration, + segmentMaxSize: segmentMaxSize, + videoTrack: videoTrack, + audioTrack: audioTrack, + prefix: prefix, + factory: factory, + publishSegment: publishSegment, + publishPart: publishPart, + sampleDurations: make(map[time.Duration]struct{}), } if audioTrack != nil { @@ -243,9 +246,10 @@ func (m *muxerSegmenterFMP4) writeAV1( m.videoTrack, m.audioTrack, m.audioTrackTimeScale, + m.prefix, m.factory, m.genPartID, - m.onPartFinalized, + m.publishPart, ) if err != nil { return err @@ -267,7 +271,7 @@ func (m *muxerSegmenterFMP4) writeAV1( if err != nil { return err } - m.onSegmentFinalized(m.currentSegment) + m.publishSegment(m.currentSegment) m.firstSegmentFinalized = true @@ -280,9 +284,10 @@ func (m *muxerSegmenterFMP4) writeAV1( m.videoTrack, m.audioTrack, m.audioTrackTimeScale, + m.prefix, m.factory, m.genPartID, - m.onPartFinalized, + m.publishPart, ) if err != nil { return err @@ -372,9 +377,10 @@ func (m *muxerSegmenterFMP4) writeH26x( m.videoTrack, m.audioTrack, m.audioTrackTimeScale, + m.prefix, m.factory, m.genPartID, - m.onPartFinalized, + m.publishPart, ) if err != nil { return err @@ -396,7 +402,7 @@ func (m *muxerSegmenterFMP4) writeH26x( if err != nil { return err } - m.onSegmentFinalized(m.currentSegment) + m.publishSegment(m.currentSegment) m.firstSegmentFinalized = true @@ -409,9 +415,10 @@ func (m *muxerSegmenterFMP4) writeH26x( m.videoTrack, m.audioTrack, m.audioTrackTimeScale, + m.prefix, m.factory, m.genPartID, - m.onPartFinalized, + m.publishPart, ) if err != nil { return err @@ -500,9 +507,10 @@ func (m *muxerSegmenterFMP4) writeAudio(ntp time.Time, dts time.Duration, au []b m.videoTrack, m.audioTrack, m.audioTrackTimeScale, + m.prefix, m.factory, m.genPartID, - m.onPartFinalized, + m.publishPart, ) if err != nil { return err @@ -527,7 +535,7 @@ func (m *muxerSegmenterFMP4) writeAudio(ntp time.Time, dts time.Duration, au []b if err != nil { return err } - m.onSegmentFinalized(m.currentSegment) + m.publishSegment(m.currentSegment) m.firstSegmentFinalized = true @@ -540,9 +548,10 @@ func (m *muxerSegmenterFMP4) writeAudio(ntp time.Time, dts time.Duration, au []b m.videoTrack, m.audioTrack, m.audioTrackTimeScale, + m.prefix, m.factory, m.genPartID, - m.onPartFinalized, + m.publishPart, ) if err != nil { return err diff --git a/muxer_segmenter_mpegts.go b/muxer_segmenter_mpegts.go index f887bd1..95984b6 100644 --- a/muxer_segmenter_mpegts.go +++ b/muxer_segmenter_mpegts.go @@ -29,6 +29,7 @@ type muxerSegmenterMPEGTS struct { segmentMaxSize uint64 videoTrack *Track audioTrack *Track + prefix string factory storage.Factory onSegmentReady func(muxerSegment) @@ -47,6 +48,7 @@ func newMuxerSegmenterMPEGTS( segmentMaxSize uint64, videoTrack *Track, audioTrack *Track, + prefix string, factory storage.Factory, onSegmentReady func(muxerSegment), ) *muxerSegmenterMPEGTS { @@ -55,6 +57,7 @@ func newMuxerSegmenterMPEGTS( segmentMaxSize: segmentMaxSize, videoTrack: videoTrack, audioTrack: audioTrack, + prefix: prefix, factory: factory, onSegmentReady: onSegmentReady, } @@ -141,6 +144,7 @@ func (m *muxerSegmenterMPEGTS) writeH26x( m.writerAudioTrack, m.switchableWriter, m.writer, + m.prefix, m.factory) if err != nil { return err @@ -171,6 +175,7 @@ func (m *muxerSegmenterMPEGTS) writeH26x( m.writerAudioTrack, m.switchableWriter, m.writer, + m.prefix, m.factory, ) if err != nil { @@ -211,6 +216,7 @@ func (m *muxerSegmenterMPEGTS) writeMPEG4Audio(ntp time.Time, pts time.Duration, m.writerAudioTrack, m.switchableWriter, m.writer, + m.prefix, m.factory, ) if err != nil { @@ -234,6 +240,7 @@ func (m *muxerSegmenterMPEGTS) writeMPEG4Audio(ntp time.Time, pts time.Duration, m.writerAudioTrack, m.switchableWriter, m.writer, + m.prefix, m.factory, ) if err != nil { diff --git a/muxer_server.go b/muxer_server.go index a91cdc1..5b74f44 100644 --- a/muxer_server.go +++ b/muxer_server.go @@ -102,11 +102,37 @@ func parseMSNPart(msn string, part string) (uint64, uint64, error) { return (msnint), (partint), nil } +func bandwidth(segments []muxerSegment) (int, int) { + if len(segments) == 0 { + return 0, 0 + } + + var maxBandwidth uint64 + var sizes uint64 + var durations time.Duration + + for _, seg := range segments { + if _, ok := seg.(*muxerGap); !ok { + bandwidth := 8 * seg.getSize() * uint64(time.Second) / uint64(seg.getDuration()) + if bandwidth > maxBandwidth { + maxBandwidth = bandwidth + } + sizes += seg.getSize() + durations += seg.getDuration() + } + } + + averageBandwidth := 8 * sizes * uint64(time.Second) / uint64(durations) + + return int(maxBandwidth), int(averageBandwidth) +} + type muxerServer struct { variant MuxerVariant segmentCount int videoTrack *Track audioTrack *Track + prefix string storageFactory storage.Factory mutex sync.Mutex @@ -127,6 +153,7 @@ func newMuxerServer( segmentCount int, videoTrack *Track, audioTrack *Track, + prefix string, storageFactory storage.Factory, ) (*muxerServer, error) { s := &muxerServer{ @@ -134,6 +161,7 @@ func newMuxerServer( segmentCount: segmentCount, videoTrack: videoTrack, audioTrack: audioTrack, + prefix: prefix, storageFactory: storageFactory, segmentsByName: make(map[string]muxerSegment), partsByName: make(map[string]*muxerPart), @@ -227,7 +255,7 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) { switch { case name == "index.m3u8": - s.handleMultistreamPlaylist(w) + s.handleMultivariantPlaylist(w) case name == "stream.m3u8": q := r.URL.Query() @@ -236,7 +264,7 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) { skip := queryVal(q, "_HLS_skip") s.handleMediaPlaylist(msn, part, skip, w) - case s.variant != MuxerVariantMPEGTS && name == "init.mp4": + case s.variant != MuxerVariantMPEGTS && name == s.prefix+"_init.mp4": s.handleInitFile(w) case (s.variant != MuxerVariantMPEGTS && strings.HasSuffix(name, ".mp4")) || @@ -245,8 +273,8 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) { } } -func (s *muxerServer) handleMultistreamPlaylist(w http.ResponseWriter) { - byts := func() []byte { +func (s *muxerServer) handleMultivariantPlaylist(w http.ResponseWriter) { + byts, err := func() ([]byte, error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -254,101 +282,20 @@ func (s *muxerServer) handleMultistreamPlaylist(w http.ResponseWriter) { s.cond.Wait() } - bandwidth, averageBandwidth := s.bandwidth() - var resolution string - var frameRate *float64 - - if s.videoTrack != nil { - switch codec := s.videoTrack.Codec.(type) { - case *codecs.AV1: - var sh av1.SequenceHeader - err := sh.Unmarshal(codec.SequenceHeader) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return nil - } - - resolution = strconv.FormatInt(int64(sh.Width()), 10) + "x" + strconv.FormatInt(int64(sh.Height()), 10) - - // TODO: FPS - - case *codecs.H265: - var sps h265.SPS - err := sps.Unmarshal(codec.SPS) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return nil - } - - resolution = strconv.FormatInt(int64(sps.Width()), 10) + "x" + strconv.FormatInt(int64(sps.Height()), 10) - - f := sps.FPS() - if f != 0 { - frameRate = &f - } - - case *codecs.H264: - var sps h264.SPS - err := sps.Unmarshal(codec.SPS) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return nil - } - - resolution = strconv.FormatInt(int64(sps.Width()), 10) + "x" + strconv.FormatInt(int64(sps.Height()), 10) - - f := sps.FPS() - if f != 0 { - frameRate = &f - } - } - } - - pl := &playlist.Multivariant{ - Version: func() int { - if s.variant == MuxerVariantMPEGTS { - return 3 - } - return 9 - }(), - IndependentSegments: true, - Variants: []*playlist.MultivariantVariant{{ - Bandwidth: bandwidth, - AverageBandwidth: &averageBandwidth, - Codecs: func() []string { - var codecs []string - if s.videoTrack != nil { - codecs = append(codecs, codecparams.Marshal(s.videoTrack.Codec)) - } - if s.audioTrack != nil { - codecs = append(codecs, codecparams.Marshal(s.audioTrack.Codec)) - } - return codecs - }(), - Resolution: resolution, - FrameRate: frameRate, - URI: "stream.m3u8", - }}, - } - - byts, err := pl.Marshal() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return nil - } - - return byts + return s.generateMultivariantPlaylist() }() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } - if byts != nil { - // allow caching but use a small period in order to - // allow a stream to change tracks or bitrate - w.Header().Set("Cache-Control", "max-age=30") + // allow caching but use a small period in order to + // allow a stream to change tracks or bitrate + w.Header().Set("Cache-Control", "max-age=30") - w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) - w.WriteHeader(http.StatusOK) - w.Write(byts) - } + w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`) + w.WriteHeader(http.StatusOK) + w.Write(byts) } func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string, w http.ResponseWriter) { @@ -387,7 +334,7 @@ func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string, return nil } - return s.generatePlaylist(isDeltaUpdate) + return s.generateMediaPlaylist(isDeltaUpdate) }() if byts != nil { @@ -419,7 +366,7 @@ func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string, return nil } - return s.generatePlaylist(isDeltaUpdate) + return s.generateMediaPlaylist(isDeltaUpdate) }() if byts != nil { @@ -430,14 +377,92 @@ func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string, } } -func (s *muxerServer) generatePlaylist(isDeltaUpdate bool) []byte { +func (s *muxerServer) generateMultivariantPlaylist() ([]byte, error) { + bandwidth, averageBandwidth := bandwidth(s.segments) + var resolution string + var frameRate *float64 + + if s.videoTrack != nil { + switch codec := s.videoTrack.Codec.(type) { + case *codecs.AV1: + var sh av1.SequenceHeader + err := sh.Unmarshal(codec.SequenceHeader) + if err != nil { + return nil, err + } + + resolution = strconv.FormatInt(int64(sh.Width()), 10) + "x" + strconv.FormatInt(int64(sh.Height()), 10) + + // TODO: FPS + + case *codecs.H265: + var sps h265.SPS + err := sps.Unmarshal(codec.SPS) + if err != nil { + return nil, err + } + + resolution = strconv.FormatInt(int64(sps.Width()), 10) + "x" + strconv.FormatInt(int64(sps.Height()), 10) + + f := sps.FPS() + if f != 0 { + frameRate = &f + } + + case *codecs.H264: + var sps h264.SPS + err := sps.Unmarshal(codec.SPS) + if err != nil { + return nil, err + } + + resolution = strconv.FormatInt(int64(sps.Width()), 10) + "x" + strconv.FormatInt(int64(sps.Height()), 10) + + f := sps.FPS() + if f != 0 { + frameRate = &f + } + } + } + + pl := &playlist.Multivariant{ + Version: func() int { + if s.variant == MuxerVariantMPEGTS { + return 3 + } + return 9 + }(), + IndependentSegments: true, + Variants: []*playlist.MultivariantVariant{{ + Bandwidth: bandwidth, + AverageBandwidth: &averageBandwidth, + Codecs: func() []string { + var codecs []string + if s.videoTrack != nil { + codecs = append(codecs, codecparams.Marshal(s.videoTrack.Codec)) + } + if s.audioTrack != nil { + codecs = append(codecs, codecparams.Marshal(s.audioTrack.Codec)) + } + return codecs + }(), + Resolution: resolution, + FrameRate: frameRate, + URI: "stream.m3u8", + }}, + } + + return pl.Marshal() +} + +func (s *muxerServer) generateMediaPlaylist(isDeltaUpdate bool) []byte { if s.variant == MuxerVariantMPEGTS { - return s.generatePlaylistMPEGTS() + return s.generateMediaPlaylistMPEGTS() } - return s.generatePlaylistFMP4(isDeltaUpdate) + return s.generateMediaPlaylistFMP4(isDeltaUpdate) } -func (s *muxerServer) generatePlaylistMPEGTS() []byte { +func (s *muxerServer) generateMediaPlaylistMPEGTS() []byte { pl := &playlist.Media{ Version: 3, AllowCache: func() *bool { @@ -453,7 +478,7 @@ func (s *muxerServer) generatePlaylistMPEGTS() []byte { pl.Segments = append(pl.Segments, &playlist.MediaSegment{ DateTime: &seg.startNTP, Duration: seg.getDuration(), - URI: seg.name + ".ts", + URI: seg.name, }) } } @@ -462,7 +487,7 @@ func (s *muxerServer) generatePlaylistMPEGTS() []byte { return byts } -func (s *muxerServer) generatePlaylistFMP4(isDeltaUpdate bool) []byte { +func (s *muxerServer) generateMediaPlaylistFMP4(isDeltaUpdate bool) []byte { targetDuration := targetDuration(s.segments) skipBoundary := time.Duration(targetDuration) * 6 * time.Second @@ -491,7 +516,7 @@ func (s *muxerServer) generatePlaylistFMP4(isDeltaUpdate bool) []byte { if !isDeltaUpdate { pl.Map = &playlist.MediaMap{ - URI: "init.mp4", + URI: s.prefix + "_init.mp4", } } else { var curDuration time.Duration @@ -519,7 +544,7 @@ func (s *muxerServer) generatePlaylistFMP4(isDeltaUpdate bool) []byte { case *muxerSegmentFMP4: plse := &playlist.MediaSegment{ Duration: seg.getDuration(), - URI: seg.name + ".mp4", + URI: seg.name, } if (len(s.segments) - i) <= 2 { @@ -530,7 +555,7 @@ func (s *muxerServer) generatePlaylistFMP4(isDeltaUpdate bool) []byte { for _, part := range seg.parts { plse.Parts = append(plse.Parts, &playlist.MediaPart{ Duration: part.finalDuration, - URI: part.name() + ".mp4", + URI: part.getName(), Independent: part.isIndependent, }) } @@ -551,7 +576,7 @@ func (s *muxerServer) generatePlaylistFMP4(isDeltaUpdate bool) []byte { for _, part := range s.nextSegmentParts { pl.Parts = append(pl.Parts, &playlist.MediaPart{ Duration: part.finalDuration, - URI: part.name() + ".mp4", + URI: part.getName(), Independent: part.isIndependent, }) } @@ -559,7 +584,7 @@ func (s *muxerServer) generatePlaylistFMP4(isDeltaUpdate bool) []byte { // preload hint must always be present // otherwise hls.js goes into a loop pl.PreloadHint = &playlist.MediaPreloadHint{ - URI: fmp4PartName(s.nextPartID) + ".mp4", + URI: partName(s.prefix, s.nextPartID), } } @@ -592,7 +617,7 @@ func (s *muxerServer) handleInitFile(w http.ResponseWriter) { defer r.Close() // allow caching but use a small period in order to - // allow a stream to change tracks + // allow a stream to change track parameters w.Header().Set("Cache-Control", "max-age=30") w.Header().Set("Content-Type", "video/mp4") @@ -602,11 +627,9 @@ func (s *muxerServer) handleInitFile(w http.ResponseWriter) { func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) { switch { - case strings.HasPrefix(fname, "seg"): - base := strings.TrimSuffix(strings.TrimSuffix(fname, ".mp4"), ".ts") - + case strings.HasPrefix(fname, s.prefix+"_"+"seg"): s.mutex.Lock() - segment, ok := s.segmentsByName[base] + segment, ok := s.segmentsByName[fname] s.mutex.Unlock() if !ok { @@ -635,22 +658,20 @@ func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) { w.WriteHeader(http.StatusOK) io.Copy(w, r) - case s.variant == MuxerVariantLowLatency && strings.HasPrefix(fname, "part"): - base := strings.TrimSuffix(fname, ".mp4") - + case s.variant == MuxerVariantLowLatency && strings.HasPrefix(fname, s.prefix+"_"+"part"): s.mutex.Lock() - part := s.partsByName[base] + part := s.partsByName[fname] // support for EXT-X-PRELOAD-HINT - if part == nil && base == fmp4PartName(s.nextPartID) { + if part == nil && fname == partName(s.prefix, s.nextPartID) { partID := s.nextPartID for !s.closed && s.nextPartID <= partID { s.cond.Wait() } - part = s.partsByName[base] + part = s.partsByName[fname] } s.mutex.Unlock() @@ -674,32 +695,7 @@ func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) { } } -func (s *muxerServer) bandwidth() (int, int) { - if len(s.segments) == 0 { - return 0, 0 - } - - var maxBandwidth uint64 - var sizes uint64 - var durations time.Duration - - for _, seg := range s.segments { - if _, ok := seg.(*muxerGap); !ok { - bandwidth := 8 * seg.getSize() * uint64(time.Second) / uint64(seg.getDuration()) - if bandwidth > maxBandwidth { - maxBandwidth = bandwidth - } - sizes += seg.getSize() - durations += seg.getDuration() - } - } - - averageBandwidth := 8 * sizes * uint64(time.Second) / uint64(durations) - - return int(maxBandwidth), int(averageBandwidth) -} - -func (s *muxerServer) onSegmentFinalized(segment muxerSegment) { +func (s *muxerServer) publishSegment(segment muxerSegment) { func() { s.mutex.Lock() defer s.mutex.Unlock() @@ -727,7 +723,7 @@ func (s *muxerServer) onSegmentFinalized(segment muxerSegment) { if toDeleteSeg, ok := toDelete.(*muxerSegmentFMP4); ok { for _, part := range toDeleteSeg.parts { - delete(s.partsByName, part.name()) + delete(s.partsByName, part.getName()) } } @@ -742,12 +738,12 @@ func (s *muxerServer) onSegmentFinalized(segment muxerSegment) { s.cond.Broadcast() } -func (s *muxerServer) onPartFinalized(part *muxerPart) { +func (s *muxerServer) publishPart(part *muxerPart) { func() { s.mutex.Lock() defer s.mutex.Unlock() - s.partsByName[part.name()] = part + s.partsByName[part.getName()] = part s.nextSegmentParts = append(s.nextSegmentParts, part) s.nextPartID = part.id + 1 }() @@ -785,7 +781,7 @@ func (s *muxerServer) generateInitFile() error { }) } - f, err := s.storageFactory.NewFile("init.mp4") + f, err := s.storageFactory.NewFile(s.prefix + "_init.mp4") if err != nil { return err } diff --git a/muxer_test.go b/muxer_test.go index cc29780..a6c5f0b 100644 --- a/muxer_test.go +++ b/muxer_test.go @@ -223,12 +223,12 @@ func TestMuxerVideoAudio(t *testing.T) { `#EXT-X-MEDIA-SEQUENCE:0\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:4.00000,\n` + - `(seg0\.ts)\n` + + `(.*?_seg0\.ts)\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg1\.ts)\n$`) + `(.*?_seg1\.ts)\n$`) + require.Regexp(t, re, string(byts)) ma := re.FindStringSubmatch(string(byts)) - require.NotEqual(t, 0, len(ma)) _, h, err := doRequest(m, ma[2], "", "", "") require.NoError(t, err) @@ -240,62 +240,64 @@ func TestMuxerVideoAudio(t *testing.T) { `#EXT-X-VERSION:9\n` + `#EXT-X-TARGETDURATION:4\n` + `#EXT-X-MEDIA-SEQUENCE:0\n` + - `#EXT-X-MAP:URI="init.mp4"\n` + + `#EXT-X-MAP:URI="(.*?_init.mp4)"\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:4.00000,\n` + - `(seg0\.mp4)\n` + + `(.*?_seg0\.mp4)\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg1\.mp4)\n$`) + `(.*?_seg1\.mp4)\n$`) + require.Regexp(t, re, string(byts)) ma := re.FindStringSubmatch(string(byts)) - require.NotEqual(t, 0, len(ma)) - _, h, err := doRequest(m, "init.mp4", "", "", "") + _, h, err := doRequest(m, ma[1], "", "", "") require.NoError(t, err) require.Equal(t, "video/mp4", h.Get("Content-Type")) require.Equal(t, "max-age=30", h.Get("Cache-Control")) - _, h, err = doRequest(m, ma[2], "", "", "") + _, h, err = doRequest(m, ma[3], "", "", "") require.NoError(t, err) require.Equal(t, "video/mp4", h.Get("Content-Type")) require.Equal(t, "max-age=3600", h.Get("Cache-Control")) case "lowLatency": - require.Equal(t, - "#EXTM3U\n"+ - "#EXT-X-VERSION:9\n"+ - "#EXT-X-TARGETDURATION:4\n"+ - "#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK=5.00000,CAN-SKIP-UNTIL=24.00000\n"+ - "#EXT-X-PART-INF:PART-TARGET=2.00000\n"+ - "#EXT-X-MEDIA-SEQUENCE:2\n"+ - "#EXT-X-MAP:URI=\"init.mp4\"\n"+ - "#EXT-X-GAP\n"+ - "#EXTINF:4.00000,\n"+ - "gap.mp4\n"+ - "#EXT-X-GAP\n"+ - "#EXTINF:4.00000,\n"+ - "gap.mp4\n"+ - "#EXT-X-GAP\n"+ - "#EXTINF:4.00000,\n"+ - "gap.mp4\n"+ - "#EXT-X-GAP\n"+ - "#EXTINF:4.00000,\n"+ - "gap.mp4\n"+ - "#EXT-X-GAP\n"+ - "#EXTINF:4.00000,\n"+ - "gap.mp4\n"+ - "#EXT-X-PROGRAM-DATE-TIME:2010-01-01T01:01:02Z\n"+ - "#EXT-X-PART:DURATION=2.00000,URI=\"part0.mp4\",INDEPENDENT=YES\n"+ - "#EXT-X-PART:DURATION=2.00000,URI=\"part1.mp4\"\n"+ - "#EXTINF:4.00000,\n"+ - "seg7.mp4\n"+ - "#EXT-X-PROGRAM-DATE-TIME:2010-01-01T01:01:06Z\n"+ - "#EXT-X-PART:DURATION=1.00000,URI=\"part3.mp4\",INDEPENDENT=YES\n"+ - "#EXTINF:1.00000,\n"+ - "seg8.mp4\n"+ - "#EXT-X-PRELOAD-HINT:TYPE=PART,URI=\"part4.mp4\"\n", string(byts)) - - _, h, err := doRequest(m, "part3.mp4", "", "", "") + re := regexp.MustCompile( + `^#EXTM3U\n` + + `#EXT-X-VERSION:9\n` + + `#EXT-X-TARGETDURATION:4\n` + + `#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK=5.00000,CAN-SKIP-UNTIL=24.00000\n` + + `#EXT-X-PART-INF:PART-TARGET=2.00000\n` + + `#EXT-X-MEDIA-SEQUENCE:2\n` + + `#EXT-X-MAP:URI="(.*?_init\.mp4)"\n` + + `#EXT-X-GAP\n` + + `#EXTINF:4.00000,\n` + + `gap.mp4\n` + + `#EXT-X-GAP\n` + + `#EXTINF:4.00000,\n` + + `gap.mp4\n` + + `#EXT-X-GAP\n` + + `#EXTINF:4.00000,\n` + + `gap.mp4\n` + + `#EXT-X-GAP\n` + + `#EXTINF:4.00000,\n` + + `gap.mp4\n` + + `#EXT-X-GAP\n` + + `#EXTINF:4.00000,\n` + + `gap.mp4\n` + + `#EXT-X-PROGRAM-DATE-TIME:2010-01-01T01:01:02Z\n` + + `#EXT-X-PART:DURATION=2.00000,URI="(.*?_part0\.mp4)",INDEPENDENT=YES\n` + + `#EXT-X-PART:DURATION=2.00000,URI="(.*?_part1\.mp4)"\n` + + `#EXTINF:4.00000,\n` + + `(.*?_seg7\.mp4)\n` + + `#EXT-X-PROGRAM-DATE-TIME:2010-01-01T01:01:06Z\n` + + `#EXT-X-PART:DURATION=1.00000,URI="(.*?_part3\.mp4)",INDEPENDENT=YES\n` + + `#EXTINF:1.00000,\n` + + `(.*?_seg8\.mp4)\n` + + `#EXT-X-PRELOAD-HINT:TYPE=PART,URI="(.*?_part4\.mp4)"\n$`) + require.Regexp(t, re, string(byts)) + ma := re.FindStringSubmatch(string(byts)) + + _, h, err := doRequest(m, ma[4], "", "", "") require.NoError(t, err) require.Equal(t, "video/mp4", h.Get("Content-Type")) require.Equal(t, "max-age=3600", h.Get("Cache-Control")) @@ -303,7 +305,7 @@ func TestMuxerVideoAudio(t *testing.T) { recv := make(chan struct{}) go func() { - _, _, err := doRequest(m, "part4.mp4", "", "", "") + _, _, err := doRequest(m, ma[5], "", "", "") require.NoError(t, err) close(recv) }() @@ -391,44 +393,43 @@ func TestMuxerVideoOnly(t *testing.T) { byts, _, err = doRequest(m, "stream.m3u8", "", "", "") require.NoError(t, err) - var ma []string + var re *regexp.Regexp if ca == "mpegts" { - re := regexp.MustCompile(`^#EXTM3U\n` + + re = regexp.MustCompile(`^#EXTM3U\n` + `#EXT-X-VERSION:3\n` + `#EXT-X-ALLOW-CACHE:NO\n` + `#EXT-X-TARGETDURATION:4\n` + `#EXT-X-MEDIA-SEQUENCE:0\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:4.00000,\n` + - `(seg0\.ts)\n` + + `(.*?_seg0\.ts)\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg1\.ts)\n$`) - ma = re.FindStringSubmatch(string(byts)) + `(.*?_seg1\.ts)\n$`) } else { - re := regexp.MustCompile(`^#EXTM3U\n` + + re = regexp.MustCompile(`^#EXTM3U\n` + `#EXT-X-VERSION:9\n` + `#EXT-X-TARGETDURATION:4\n` + `#EXT-X-MEDIA-SEQUENCE:0\n` + - `#EXT-X-MAP:URI="init.mp4"\n` + + `#EXT-X-MAP:URI="(.*?_init.mp4)"\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:4.00000,\n` + - `(seg0\.mp4)\n` + + `(.*?_seg0\.mp4)\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg1\.mp4)\n$`) - ma = re.FindStringSubmatch(string(byts)) + `(.*?_seg1\.mp4)\n$`) } - require.NotEqual(t, 0, len(ma)) + require.Regexp(t, re, string(byts)) + ma := re.FindStringSubmatch(string(byts)) if ca == "mpegts" { _, _, err := doRequest(m, ma[2], "", "", "") require.NoError(t, err) } else { - _, _, err := doRequest(m, "init.mp4", "", "", "") + _, _, err := doRequest(m, ma[1], "", "", "") require.NoError(t, err) - _, _, err = doRequest(m, ma[2], "", "", "") + _, _, err = doRequest(m, ma[3], "", "", "") require.NoError(t, err) } }) @@ -501,41 +502,40 @@ func TestMuxerAudioOnly(t *testing.T) { byts, _, err = doRequest(m, "stream.m3u8", "", "", "") require.NoError(t, err) - var ma []string + var re *regexp.Regexp if ca == "mpegts" { - re := regexp.MustCompile(`^#EXTM3U\n` + + re = regexp.MustCompile(`^#EXTM3U\n` + `#EXT-X-VERSION:3\n` + `#EXT-X-ALLOW-CACHE:NO\n` + `#EXT-X-TARGETDURATION:1\n` + `#EXT-X-MEDIA-SEQUENCE:0\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg0\.ts)\n$`) - ma = re.FindStringSubmatch(string(byts)) + `(.*?_seg0\.ts)\n$`) } else { - re := regexp.MustCompile(`^#EXTM3U\n` + + re = regexp.MustCompile(`^#EXTM3U\n` + `#EXT-X-VERSION:9\n` + `#EXT-X-TARGETDURATION:1\n` + `#EXT-X-MEDIA-SEQUENCE:0\n` + - `#EXT-X-MAP:URI="init.mp4"\n` + + `#EXT-X-MAP:URI="(.*?_init.mp4)"\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg0\.mp4)\n` + + `(.*?_seg0\.mp4)\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + - `(seg1\.mp4)\n$`) - ma = re.FindStringSubmatch(string(byts)) + `(.*?_seg1\.mp4)\n$`) } - require.NotEqual(t, 0, len(ma)) + require.Regexp(t, re, string(byts)) + ma := re.FindStringSubmatch(string(byts)) if ca == "mpegts" { _, _, err := doRequest(m, ma[2], "", "", "") require.NoError(t, err) } else { - _, _, err := doRequest(m, "init.mp4", "", "", "") + _, _, err := doRequest(m, ma[1], "", "", "") require.NoError(t, err) - _, _, err = doRequest(m, ma[2], "", "", "") + _, _, err = doRequest(m, ma[3], "", "", "") require.NoError(t, err) } }) @@ -622,9 +622,9 @@ func TestMuxerDoubleRead(t *testing.T) { `#EXT-X-MEDIA-SEQUENCE:0\n` + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:2.00000,\n` + - `(seg0\.ts)\n$`) + `(.*?_seg0\.ts)\n$`) + require.Regexp(t, re, string(byts)) ma := re.FindStringSubmatch(string(byts)) - require.NotEqual(t, 0, len(ma)) byts1, _, err := doRequest(m, ma[2], "", "", "") require.NoError(t, err) @@ -702,25 +702,64 @@ func TestMuxerSaveToDisk(t *testing.T) { }) require.NoError(t, err) - if ca != "mpegts" { - _, err = os.ReadFile(filepath.Join(dir, "init.mp4")) - require.NoError(t, err) - } + err = m.WriteH26x(testTime, 3*time.Second, [][]byte{ + {5}, // IDR + {2}, + }) + require.NoError(t, err) + + byts, _, err := doRequest(m, "stream.m3u8", "", "", "") + require.NoError(t, err) - var ext string + var re *regexp.Regexp if ca == "mpegts" { - ext = "ts" + re = regexp.MustCompile(`^#EXTM3U\n` + + `#EXT-X-VERSION:3\n` + + `#EXT-X-ALLOW-CACHE:NO\n` + + `#EXT-X-TARGETDURATION:2\n` + + `#EXT-X-MEDIA-SEQUENCE:0\n` + + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + + `#EXTINF:2.00000,\n` + + `(.*?_seg0\.ts)\n` + + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + + `#EXTINF:1.00000,\n` + + `(.*?_seg1\.ts)\n$`) } else { - ext = "mp4" + re = regexp.MustCompile(`^#EXTM3U\n` + + `#EXT-X-VERSION:9\n` + + `#EXT-X-TARGETDURATION:2\n` + + `#EXT-X-MEDIA-SEQUENCE:0\n` + + `#EXT-X-MAP:URI="(.*?_init.mp4)"\n` + + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + + `#EXTINF:2.00000,\n` + + `(.*?_seg0\.mp4)\n` + + `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + + `#EXTINF:1.00000,\n` + + `(.*?_seg1\.mp4)\n$`) } + require.Regexp(t, re, string(byts)) + ma := re.FindStringSubmatch(string(byts)) - _, err = os.ReadFile(filepath.Join(dir, "seg0."+ext)) - require.NoError(t, err) + if ca == "mpegts" { + _, err = os.ReadFile(filepath.Join(dir, ma[2])) + require.NoError(t, err) + + m.Close() + + _, err = os.ReadFile(filepath.Join(dir, ma[2])) + require.Error(t, err) + } else { + _, err = os.ReadFile(filepath.Join(dir, ma[1])) + require.NoError(t, err) + + _, err = os.ReadFile(filepath.Join(dir, ma[3])) + require.NoError(t, err) - m.Close() + m.Close() - _, err = os.ReadFile(filepath.Join(dir, "seg0."+ext)) - require.Error(t, err) + _, err = os.ReadFile(filepath.Join(dir, ma[3])) + require.Error(t, err) + } }) } }