Skip to content

Commit

Permalink
resuse the message queue for livestream downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopiovanello committed Aug 23, 2024
1 parent fceb36c commit 54771b2
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 64 deletions.
60 changes: 16 additions & 44 deletions server/internal/livestream/livestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package livestream

import (
"bufio"
"context"
"errors"
"io"
"log/slog"
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)

const (
Expand All @@ -28,34 +27,35 @@ type LiveStream struct {
url string
proc *os.Process // used to manually kill the yt-dlp process
status int // whether is monitoring or completed
log chan []byte // keeps tracks of the process logs while monitoring, not when started
done chan *LiveStream // where to signal the completition
waitTimeChan chan time.Duration // time to livestream start
waitTime time.Duration
liveDate time.Time

mq *internal.MessageQueue
db *internal.MemoryDB
}

func New(url string, log chan []byte, done chan *LiveStream) *LiveStream {
func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream {
return &LiveStream{
url: url,
done: done,
status: waiting,
waitTime: time.Second * 0,
log: log,
waitTimeChan: make(chan time.Duration),
mq: mq,
db: db,
}
}

// Start the livestream monitoring process, once completion signals on the done channel
func (l *LiveStream) Start() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cmd := exec.Command(
config.Instance().DownloaderPath,
l.url,
"--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs
"--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs
"--no-colors", // no ansi color fuzz
"--simulate",
"--newline",
"--paths", config.Instance().DownloadPath,
)
Expand All @@ -67,13 +67,6 @@ func (l *LiveStream) Start() error {
}
defer stdout.Close()

stderr, err := cmd.StderrPipe()
if err != nil {
l.status = errored
return err
}
defer stderr.Close()

if err := cmd.Start(); err != nil {
l.status = errored
return err
Expand All @@ -84,37 +77,29 @@ func (l *LiveStream) Start() error {

// Start monitoring when the livestream is goin to be live.
// If already live do nothing.
doneWaiting := make(chan struct{})
go l.monitorStartTime(stdout, doneWaiting)

//TODO: FFmpeg cannot be logged since is a subprocess of yt-dlp.
// It also may have implication on how the process is killed.
go func() {
<-doneWaiting
l.logFFMpeg(ctx, io.MultiReader(stdout, stderr))
}()
go l.monitorStartTime(stdout)

// Wait to the yt-dlp+ffmpeg process to finish.
// Wait to the simulated download process to finish.
cmd.Wait()

// Set the job as completed and notify the parent the completion.
l.status = completed
l.done <- l

// cleanup
close(doneWaiting)
// Send the started livestream to the message queue! :D
p := &internal.Process{Url: l.url, Livestream: true}
l.db.Set(p)
l.mq.Publish(p)

return nil
}

func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
func (l *LiveStream) monitorStartTime(r io.Reader) {
// yt-dlp shows the time in the stdout
scanner := bufio.NewScanner(r)

defer func() {
l.status = inProgress
doneWait <- struct{}{}

close(l.waitTimeChan)
}()

Expand Down Expand Up @@ -224,16 +209,3 @@ func parseTimeSpan(timeStr string) (time.Time, error) {

return start, nil
}

func (l *LiveStream) logFFMpeg(ctx context.Context, r io.Reader) {
scanner := bufio.NewScanner(r)

select {
case <-ctx.Done():
return
default:
for scanner.Scan() {
slog.Info("livestream output", slog.String("url", l.url), slog.String("stdout", scanner.Text()))
}
}
}
4 changes: 2 additions & 2 deletions server/internal/livestream/livestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)

func setupTest() {
Expand All @@ -15,9 +16,8 @@ func TestLivestream(t *testing.T) {
setupTest()

done := make(chan *LiveStream)
log := make(chan []byte)

ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done)
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
go ls.Start()

time.AfterFunc(time.Second*20, func() {
Expand Down
17 changes: 8 additions & 9 deletions server/internal/livestream/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,34 @@ import (
"time"

"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)

type Monitor struct {
db *internal.MemoryDB // where the just started livestream will be published
mq *internal.MessageQueue // where the just started livestream will be published
streams map[string]*LiveStream // keeps track of the livestreams
done chan *LiveStream // to signal individual processes completition
logs chan []byte // to signal individual processes completition
}

func NewMonitor() *Monitor {
func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
return &Monitor{
mq: mq,
db: db,
streams: make(map[string]*LiveStream),
done: make(chan *LiveStream),
}
}

// Detect each livestream completition, if done remove it from the monitor.
// Detect each livestream completition, if done detach it from the monitor.
func (m *Monitor) Schedule() {
for l := range m.done {
delete(m.streams, l.url)
}
}

func (m *Monitor) Add(url string) {
ls := New(url, m.logs, m.done)
ls := New(url, m.done, m.mq, m.db)

go ls.Start()
m.streams[url] = ls
Expand Down Expand Up @@ -111,8 +115,3 @@ func (m *Monitor) Restore() error {

return nil
}

// Return a fan-in logs channel
func (m *Monitor) Logs() <-chan []byte {
return m.logs
}
6 changes: 5 additions & 1 deletion server/internal/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func (m *MessageQueue) downloadConsumer() {
)

if p.Progress.Status != StatusCompleted {
p.Start()
if p.Livestream {
go p.Start() // livestreams have higher priorty and will ignore the queue
} else {
p.Start()
}
}

slog.Info("started process",
Expand Down
20 changes: 13 additions & 7 deletions server/internal/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ const (
StatusDownloading
StatusCompleted
StatusErrored
StatusLivestream
)

// Process descriptor
type Process struct {
Id string
Url string
Params []string
Info DownloadInfo
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
Id string
Url string
Livestream bool
Params []string
Info DownloadInfo
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
}

// Starts spawns/forks a new yt-dlp process and parse its stdout.
Expand Down Expand Up @@ -166,6 +168,10 @@ func (p *Process) Start() {
ETA: progress.Eta,
}

if p.Livestream {
p.Progress.Status = StatusLivestream
}

slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func RunBlocking(cfg *RunConfig) {
}

func newServer(c serverConfig) *http.Server {
lm := livestream.NewMonitor()
lm := livestream.NewMonitor(c.mq, c.mdb)
go lm.Schedule()
go lm.Restore()

Expand Down

0 comments on commit 54771b2

Please sign in to comment.