Skip to content

Commit

Permalink
improved livestream waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeocchi committed Aug 20, 2024
1 parent 92e3fd9 commit 3205711
Showing 1 changed file with 70 additions and 27 deletions.
97 changes: 70 additions & 27 deletions server/internal/livestream/livestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"errors"
"io"
"log/slog"
"os"
"os/exec"
"strconv"
Expand Down Expand Up @@ -53,6 +54,7 @@ func (l *LiveStream) Start() error {
l.url,
"--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs
"--no-colors", // no ansi color fuzz
"--newline",
"--paths", config.Instance().DownloadPath,
)
l.proc = cmd.Process
Expand All @@ -72,7 +74,13 @@ func (l *LiveStream) Start() error {

// Start monitoring when the livestream is goin to be live.
// If already live do nothing.
go l.monitorStartTime(stdout)
doneWaiting := make(chan struct{})
go l.monitorStartTime(stdout, doneWaiting)

go func() {
<-doneWaiting
l.logFFMpeg(stdout)
}()

// Wait to the yt-dlp+ffmpeg process to finish.
cmd.Wait()
Expand All @@ -81,14 +89,18 @@ func (l *LiveStream) Start() error {
l.status = completed
l.done <- l

// cleanup
close(doneWaiting)

return nil
}

func (l *LiveStream) monitorStartTime(r io.Reader) error {
func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
// yt-dlp shows the time in the stdout
scanner := bufio.NewScanner(r)

defer func() {
doneWait <- struct{}{}
close(l.waitTimeChan)
close(l.errors)
}()
Expand All @@ -97,7 +109,7 @@ func (l *LiveStream) monitorStartTime(r io.Reader) error {
// use a custom split funciton to set the line separator to \r instead of \r\n or \n
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
for i := 0; i < len(data); i++ {
if data[i] == '\r' {
if data[i] == '\r' || data[i] == '\n' {
return i + 1, data[:i], nil
}
}
Expand All @@ -108,37 +120,60 @@ func (l *LiveStream) monitorStartTime(r io.Reader) error {
return 0, data, bufio.ErrFinalToken
})

// start scanning the stdout
for scanner.Scan() {
// l.log <- scanner.Bytes()
waitTimeScanner := func() {
for scanner.Scan() {
// l.log <- scanner.Bytes()

parts := strings.Split(scanner.Text(), ": ")
if len(parts) < 2 {
continue
}
// if this substring is in the current line the download is starting,
// no need to monitor the time to live.
//TODO: silly
if !strings.Contains(scanner.Text(), "Remaining time until next attempt") {
l.status = inProgress
return
}

// if this substring is in the current line the download is starting,
// no need to monitor the time to live.
//TODO: silly
if !strings.Contains(scanner.Text(), "Remaining time until next attempt") {
l.status = inProgress
return nil
}
parts := strings.Split(scanner.Text(), ": ")
if len(parts) < 2 {
continue
}

startsIn := parts[1]
parsed, err := parseTimeSpan(startsIn)
if err != nil {
continue
}
startsIn := parts[1]
parsed, err := parseTimeSpan(startsIn)
if err != nil {
continue
}

l.liveDate = parsed
l.liveDate = parsed

//TODO: check if using channels is stupid or not
// l.waitTimeChan <- time.Until(start)
l.waitTime = time.Until(parsed)
//TODO: check if using channels is stupid or not
// l.waitTimeChan <- time.Until(start)
l.waitTime = time.Until(parsed)
}
}

return nil
const TRIES = 5

/*
if it's waiting a livestream the 5th line will indicate the time to live
its a dumb and not robust method.
example:
[youtube] Extracting URL: https://www.youtube.com/watch?v=IQVbGfVVjgY
[youtube] IQVbGfVVjgY: Downloading webpage
[youtube] IQVbGfVVjgY: Downloading ios player API JSON
[youtube] IQVbGfVVjgY: Downloading web creator player API JSON
WARNING: [youtube] This live event will begin in 27 minutes. <- STDERR, ignore
[wait] Waiting for 00:27:15 - Press Ctrl+C to try now <- 5th line
*/

for range TRIES {
scanner.Scan()
line := scanner.Text()

if strings.Contains(line, "Waiting for") {
waitTimeScanner()
}
}
}

func (l *LiveStream) WaitTime() <-chan time.Duration {
Expand Down Expand Up @@ -191,3 +226,11 @@ func parseTimeSpan(timeStr string) (time.Time, error) {

return start, nil
}

func (l *LiveStream) logFFMpeg(r io.Reader) {
scanner := bufio.NewScanner(r)

for scanner.Scan() {
slog.Info("livestream ffmpeg output", slog.String("url", l.url), slog.String("stdout", scanner.Text()))
}
}

0 comments on commit 3205711

Please sign in to comment.