From 54771b2d78c9727e848dfa8526928f7134a4216d Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Fri, 23 Aug 2024 18:52:13 +0200 Subject: [PATCH] resuse the message queue for livestream downloading --- server/internal/livestream/livestream.go | 60 +++++-------------- server/internal/livestream/livestream_test.go | 4 +- server/internal/livestream/monitor.go | 17 +++--- server/internal/message_queue.go | 6 +- server/internal/process.go | 20 ++++--- server/server.go | 2 +- 6 files changed, 45 insertions(+), 64 deletions(-) diff --git a/server/internal/livestream/livestream.go b/server/internal/livestream/livestream.go index 48c311f..98ec8aa 100644 --- a/server/internal/livestream/livestream.go +++ b/server/internal/livestream/livestream.go @@ -2,10 +2,8 @@ package livestream import ( "bufio" - "context" "errors" "io" - "log/slog" "os" "os/exec" "strconv" @@ -13,6 +11,7 @@ import ( "time" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" + "github.com/marcopeocchi/yt-dlp-web-ui/server/internal" ) const ( @@ -28,34 +27,35 @@ type LiveStream struct { url string proc *os.Process // used to manually kill the yt-dlp process status int // whether is monitoring or completed - log chan []byte // keeps tracks of the process logs while monitoring, not when started done chan *LiveStream // where to signal the completition waitTimeChan chan time.Duration // time to livestream start waitTime time.Duration liveDate time.Time + + mq *internal.MessageQueue + db *internal.MemoryDB } -func New(url string, log chan []byte, done chan *LiveStream) *LiveStream { +func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream { return &LiveStream{ url: url, done: done, status: waiting, waitTime: time.Second * 0, - log: log, waitTimeChan: make(chan time.Duration), + mq: mq, + db: db, } } // Start the livestream monitoring process, once completion signals on the done channel func (l *LiveStream) Start() error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cmd := exec.Command( config.Instance().DownloaderPath, l.url, - "--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs + "--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs "--no-colors", // no ansi color fuzz + "--simulate", "--newline", "--paths", config.Instance().DownloadPath, ) @@ -67,13 +67,6 @@ func (l *LiveStream) Start() error { } defer stdout.Close() - stderr, err := cmd.StderrPipe() - if err != nil { - l.status = errored - return err - } - defer stderr.Close() - if err := cmd.Start(); err != nil { l.status = errored return err @@ -84,37 +77,29 @@ func (l *LiveStream) Start() error { // Start monitoring when the livestream is goin to be live. // If already live do nothing. - doneWaiting := make(chan struct{}) - go l.monitorStartTime(stdout, doneWaiting) - - //TODO: FFmpeg cannot be logged since is a subprocess of yt-dlp. - // It also may have implication on how the process is killed. - go func() { - <-doneWaiting - l.logFFMpeg(ctx, io.MultiReader(stdout, stderr)) - }() + go l.monitorStartTime(stdout) - // Wait to the yt-dlp+ffmpeg process to finish. + // Wait to the simulated download process to finish. cmd.Wait() // Set the job as completed and notify the parent the completion. l.status = completed l.done <- l - // cleanup - close(doneWaiting) + // Send the started livestream to the message queue! :D + p := &internal.Process{Url: l.url, Livestream: true} + l.db.Set(p) + l.mq.Publish(p) return nil } -func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) { +func (l *LiveStream) monitorStartTime(r io.Reader) { // yt-dlp shows the time in the stdout scanner := bufio.NewScanner(r) defer func() { l.status = inProgress - doneWait <- struct{}{} - close(l.waitTimeChan) }() @@ -224,16 +209,3 @@ func parseTimeSpan(timeStr string) (time.Time, error) { return start, nil } - -func (l *LiveStream) logFFMpeg(ctx context.Context, r io.Reader) { - scanner := bufio.NewScanner(r) - - select { - case <-ctx.Done(): - return - default: - for scanner.Scan() { - slog.Info("livestream output", slog.String("url", l.url), slog.String("stdout", scanner.Text())) - } - } -} diff --git a/server/internal/livestream/livestream_test.go b/server/internal/livestream/livestream_test.go index d374b68..a9f7666 100644 --- a/server/internal/livestream/livestream_test.go +++ b/server/internal/livestream/livestream_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" + "github.com/marcopeocchi/yt-dlp-web-ui/server/internal" ) func setupTest() { @@ -15,9 +16,8 @@ func TestLivestream(t *testing.T) { setupTest() done := make(chan *LiveStream) - log := make(chan []byte) - ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done) + ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{}) go ls.Start() time.AfterFunc(time.Second*20, func() { diff --git a/server/internal/livestream/monitor.go b/server/internal/livestream/monitor.go index 087fb1a..e995f1a 100644 --- a/server/internal/livestream/monitor.go +++ b/server/internal/livestream/monitor.go @@ -8,22 +8,26 @@ import ( "time" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" + "github.com/marcopeocchi/yt-dlp-web-ui/server/internal" ) type Monitor struct { + db *internal.MemoryDB // where the just started livestream will be published + mq *internal.MessageQueue // where the just started livestream will be published streams map[string]*LiveStream // keeps track of the livestreams done chan *LiveStream // to signal individual processes completition - logs chan []byte // to signal individual processes completition } -func NewMonitor() *Monitor { +func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor { return &Monitor{ + mq: mq, + db: db, streams: make(map[string]*LiveStream), done: make(chan *LiveStream), } } -// Detect each livestream completition, if done remove it from the monitor. +// Detect each livestream completition, if done detach it from the monitor. func (m *Monitor) Schedule() { for l := range m.done { delete(m.streams, l.url) @@ -31,7 +35,7 @@ func (m *Monitor) Schedule() { } func (m *Monitor) Add(url string) { - ls := New(url, m.logs, m.done) + ls := New(url, m.done, m.mq, m.db) go ls.Start() m.streams[url] = ls @@ -111,8 +115,3 @@ func (m *Monitor) Restore() error { return nil } - -// Return a fan-in logs channel -func (m *Monitor) Logs() <-chan []byte { - return m.logs -} diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index bc4f06e..9267f0f 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -63,7 +63,11 @@ func (m *MessageQueue) downloadConsumer() { ) if p.Progress.Status != StatusCompleted { - p.Start() + if p.Livestream { + go p.Start() // livestreams have higher priorty and will ignore the queue + } else { + p.Start() + } } slog.Info("started process", diff --git a/server/internal/process.go b/server/internal/process.go index f268e66..fdd71f2 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -36,17 +36,19 @@ const ( StatusDownloading StatusCompleted StatusErrored + StatusLivestream ) // Process descriptor type Process struct { - Id string - Url string - Params []string - Info DownloadInfo - Progress DownloadProgress - Output DownloadOutput - proc *os.Process + Id string + Url string + Livestream bool + Params []string + Info DownloadInfo + Progress DownloadProgress + Output DownloadOutput + proc *os.Process } // Starts spawns/forks a new yt-dlp process and parse its stdout. @@ -166,6 +168,10 @@ func (p *Process) Start() { ETA: progress.Eta, } + if p.Livestream { + p.Progress.Status = StatusLivestream + } + slog.Info("progress", slog.String("id", p.getShortId()), slog.String("url", p.Url), diff --git a/server/server.go b/server/server.go index 9b8ebc6..7b38680 100644 --- a/server/server.go +++ b/server/server.go @@ -140,7 +140,7 @@ func RunBlocking(cfg *RunConfig) { } func newServer(c serverConfig) *http.Server { - lm := livestream.NewMonitor() + lm := livestream.NewMonitor(c.mq, c.mdb) go lm.Schedule() go lm.Restore()