Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

next version #63

Merged
merged 4 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ const (
clientMaxDTSRTCDiff = 10 * time.Second
)

// ClientOnDownloadPrimaryPlaylistFunc is the prototype of Client.OnDownloadPrimaryPlaylist.
type ClientOnDownloadPrimaryPlaylistFunc func(url string)

// ClientOnDownloadStreamPlaylistFunc is the prototype of Client.OnDownloadStreamPlaylist.
type ClientOnDownloadStreamPlaylistFunc func(url string)

// ClientOnDownloadSegmentFunc is the prototype of Client.OnDownloadSegment.
type ClientOnDownloadSegmentFunc func(url string)

// ClientOnDecodeErrorFunc is the prototype of Client.OnDecodeError.
type ClientOnDecodeErrorFunc func(err error)

// ClientOnTracksFunc is the prototype of the function passed to OnTracks().
type ClientOnTracksFunc func([]*Track) error

Expand All @@ -44,16 +56,25 @@ func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
// Client is a HLS client.
type Client struct {
//
// Parameters (all optional except URI)
// parameters (all optional except URI)
//
// URI of the playlist.
URI string
// HTTP client.
// It defaults to http.DefaultClient.
HTTPClient *http.Client
// function that receives log messages.
// It defaults to log.Printf.
Log LogFunc

//
// callbacks (all optional)
//
// called before downloading a primary playlist.
OnDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc
// called before downloading a stream playlist.
OnDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
// called before downloading a segment.
OnDownloadSegment ClientOnDownloadSegmentFunc
// called when a non-fatal decode error occurs.
OnDecodeError ClientOnDecodeErrorFunc

//
// private
Expand All @@ -74,8 +95,17 @@ func (c *Client) Start() error {
if c.HTTPClient == nil {
c.HTTPClient = http.DefaultClient
}
if c.Log == nil {
c.Log = defaultLog
if c.OnDownloadPrimaryPlaylist == nil {
c.OnDownloadPrimaryPlaylist = func(_ string) {}
}
if c.OnDownloadStreamPlaylist == nil {
c.OnDownloadStreamPlaylist = func(_ string) {}
}
if c.OnDownloadSegment == nil {
c.OnDownloadSegment = func(_ string) {}
}
if c.OnDecodeError == nil {
c.OnDecodeError = func(_ error) {}
}

var err error
Expand Down Expand Up @@ -134,7 +164,10 @@ func (c *Client) runInner() error {
dl := newClientDownloaderPrimary(
c.playlistURL,
c.HTTPClient,
c.Log,
c.OnDownloadPrimaryPlaylist,
c.OnDownloadStreamPlaylist,
c.OnDownloadSegment,
c.OnDecodeError,
rp,
c.onTracks,
c.onData,
Expand Down
56 changes: 35 additions & 21 deletions client_downloader_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,15 @@
type clientTimeSync interface{}

type clientDownloaderPrimary struct {
primaryPlaylistURL *url.URL
httpClient *http.Client
log LogFunc
onTracks ClientOnTracksFunc
onData map[*Track]interface{}
rp *clientRoutinePool
primaryPlaylistURL *url.URL
httpClient *http.Client
onDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
onDownloadSegment ClientOnDownloadSegmentFunc
onDecodeError ClientOnDecodeErrorFunc
onTracks ClientOnTracksFunc
onData map[*Track]interface{}
rp *clientRoutinePool

leadingTimeSync clientTimeSync

Expand All @@ -119,26 +122,32 @@
func newClientDownloaderPrimary(
primaryPlaylistURL *url.URL,
httpClient *http.Client,
log LogFunc,
onDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc,
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc,
onDownloadSegment ClientOnDownloadSegmentFunc,
onDecodeError ClientOnDecodeErrorFunc,
rp *clientRoutinePool,
onTracks ClientOnTracksFunc,
onData map[*Track]interface{},
) *clientDownloaderPrimary {
return &clientDownloaderPrimary{
primaryPlaylistURL: primaryPlaylistURL,
httpClient: httpClient,
log: log,
onTracks: onTracks,
onData: onData,
rp: rp,
streamTracks: make(chan []*Track),
startStreaming: make(chan struct{}),
leadingTimeSyncReady: make(chan struct{}),
primaryPlaylistURL: primaryPlaylistURL,
httpClient: httpClient,
onDownloadPrimaryPlaylist: onDownloadPrimaryPlaylist,
onDownloadStreamPlaylist: onDownloadStreamPlaylist,
onDownloadSegment: onDownloadSegment,
onDecodeError: onDecodeError,
onTracks: onTracks,
onData: onData,
rp: rp,
streamTracks: make(chan []*Track),
startStreaming: make(chan struct{}),
leadingTimeSyncReady: make(chan struct{}),
}
}

func (d *clientDownloaderPrimary) run(ctx context.Context) error {
d.log(LogLevelDebug, "downloading primary playlist %s", d.primaryPlaylistURL)
d.onDownloadPrimaryPlaylist(d.primaryPlaylistURL.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.primaryPlaylistURL)
if err != nil {
Expand All @@ -149,13 +158,14 @@

switch plt := pl.(type) {
case *playlist.Media:
d.log(LogLevelDebug, "primary playlist is a stream playlist")
ds := newClientDownloaderStream(
true,
d.httpClient,
d.onDownloadStreamPlaylist,
d.onDownloadSegment,
d.onDecodeError,
d.primaryPlaylistURL,
plt,
d.log,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
Expand All @@ -179,9 +189,11 @@
ds := newClientDownloaderStream(
true,
d.httpClient,
d.onDownloadStreamPlaylist,
d.onDownloadSegment,
d.onDecodeError,

Check warning on line 194 in client_downloader_primary.go

View check run for this annotation

Codecov / codecov/patch

client_downloader_primary.go#L192-L194

Added lines #L192 - L194 were not covered by tests
u,
nil,
d.log,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
Expand All @@ -205,9 +217,11 @@
ds := newClientDownloaderStream(
false,
d.httpClient,
d.onDownloadStreamPlaylist,
d.onDownloadSegment,
d.onDecodeError,

Check warning on line 222 in client_downloader_primary.go

View check run for this annotation

Codecov / codecov/patch

client_downloader_primary.go#L220-L222

Added lines #L220 - L222 were not covered by tests
u,
nil,
d.log,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
Expand Down
58 changes: 31 additions & 27 deletions client_downloader_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,49 @@ func findSegmentWithID(seqNo int, segments []*playlist.MediaSegment, id int) (*p
}

type clientDownloaderStream struct {
isLeading bool
httpClient *http.Client
playlistURL *url.URL
initialPlaylist *playlist.Media
log LogFunc
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
isLeading bool
httpClient *http.Client
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
onDownloadSegment ClientOnDownloadSegmentFunc
onDecodeError ClientOnDecodeErrorFunc
playlistURL *url.URL
initialPlaylist *playlist.Media
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}

curSegmentID *int
}

func newClientDownloaderStream(
isLeading bool,
httpClient *http.Client,
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc,
onDownloadSegment ClientOnDownloadSegmentFunc,
onDecodeError ClientOnDecodeErrorFunc,
playlistURL *url.URL,
initialPlaylist *playlist.Media,
log LogFunc,
rp *clientRoutinePool,
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onData map[*Track]interface{},
) *clientDownloaderStream {
return &clientDownloaderStream{
isLeading: isLeading,
httpClient: httpClient,
playlistURL: playlistURL,
initialPlaylist: initialPlaylist,
log: log,
rp: rp,
onStreamTracks: onStreamTracks,
onSetLeadingTimeSync: onSetLeadingTimeSync,
onGetLeadingTimeSync: onGetLeadingTimeSync,
onData: onData,
isLeading: isLeading,
httpClient: httpClient,
onDownloadStreamPlaylist: onDownloadStreamPlaylist,
onDownloadSegment: onDownloadSegment,
onDecodeError: onDecodeError,
playlistURL: playlistURL,
initialPlaylist: initialPlaylist,
rp: rp,
onStreamTracks: onStreamTracks,
onSetLeadingTimeSync: onSetLeadingTimeSync,
onGetLeadingTimeSync: onGetLeadingTimeSync,
onData: onData,
}
}

Expand Down Expand Up @@ -98,7 +104,6 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
d.isLeading,
byts,
segmentQueue,
d.log,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
Expand All @@ -112,9 +117,9 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
d.rp.add(proc)
} else {
proc := newClientProcessorMPEGTS(
d.onDecodeError,
d.isLeading,
segmentQueue,
d.log,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
Expand Down Expand Up @@ -148,7 +153,7 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
}

func (d *clientDownloaderStream) downloadPlaylist(ctx context.Context) (*playlist.Media, error) {
d.log(LogLevelDebug, "downloading stream playlist %s", d.playlistURL.String())
d.onDownloadStreamPlaylist(d.playlistURL.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.playlistURL)
if err != nil {
Expand All @@ -174,7 +179,8 @@ func (d *clientDownloaderStream) downloadSegment(
return nil, err
}

d.log(LogLevelDebug, "downloading segment %s", u)
d.onDownloadSegment(u.String())

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -233,8 +239,6 @@ func (d *clientDownloaderStream) fillSegmentQueue(
return fmt.Errorf("next segment not found or not ready yet")
}

d.log(LogLevelDebug, "distance of next segment from end of playlist: %d", invPos)

if !pl.Endlist && invPos > clientLiveMaxDistanceFromEnd {
return fmt.Errorf("playback is too late")
}
Expand Down
3 changes: 0 additions & 3 deletions client_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func fmp4PickLeadingTrack(init *fmp4.Init) int {
type clientProcessorFMP4 struct {
isLeading bool
segmentQueue *clientSegmentQueue
log LogFunc
rp *clientRoutinePool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
Expand All @@ -47,7 +46,6 @@ func newClientProcessorFMP4(
isLeading bool,
initFile []byte,
segmentQueue *clientSegmentQueue,
log LogFunc,
rp *clientRoutinePool,
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
Expand All @@ -57,7 +55,6 @@ func newClientProcessorFMP4(
p := &clientProcessorFMP4{
isLeading: isLeading,
segmentQueue: segmentQueue,
log: log,
rp: rp,
onSetLeadingTimeSync: onSetLeadingTimeSync,
onGetLeadingTimeSync: onGetLeadingTimeSync,
Expand Down
14 changes: 7 additions & 7 deletions client_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/asticode/go-astits"
Expand Down Expand Up @@ -57,9 +56,9 @@
}

type clientProcessorMPEGTS struct {
onDecodeError ClientOnDecodeErrorFunc
isLeading bool
segmentQueue *clientSegmentQueue
log LogFunc
rp *clientRoutinePool
onStreamTracks func(context.Context, []*Track) bool
onSetLeadingTimeSync func(clientTimeSync)
Expand All @@ -74,19 +73,19 @@
}

func newClientProcessorMPEGTS(
onDecodeError ClientOnDecodeErrorFunc,
isLeading bool,
segmentQueue *clientSegmentQueue,
log LogFunc,
rp *clientRoutinePool,
onStreamTracks func(context.Context, []*Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onData map[*Track]interface{},
) *clientProcessorMPEGTS {
return &clientProcessorMPEGTS{
onDecodeError: onDecodeError,
isLeading: isLeading,
segmentQueue: segmentQueue,
log: log,
rp: rp,
onStreamTracks: onStreamTracks,
onSetLeadingTimeSync: onSetLeadingTimeSync,
Expand Down Expand Up @@ -119,6 +118,10 @@
return err
}

p.reader.OnDecodeError(func(err error) {
p.onDecodeError(err)
})

Check warning on line 123 in client_processor_mpegts.go

View check run for this annotation

Codecov / codecov/patch

client_processor_mpegts.go#L122-L123

Added lines #L122 - L123 were not covered by tests

for _, track := range p.reader.Tracks() {
switch track.Codec.(type) {
case *mpegts.CodecH264, *mpegts.CodecMPEG4Audio:
Expand Down Expand Up @@ -220,9 +223,6 @@
if err == astits.ErrNoMorePackets {
return nil
}
if strings.HasPrefix(err.Error(), "astits: parsing PES data failed") {
continue
}
return err
}
}
Expand Down
Loading
Loading