Skip to content

Commit

Permalink
fix(trace): pass context to concat
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkness4 committed Jul 13, 2024
1 parent bca3c70 commit a2c0c79
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 21 deletions.
5 changes: 3 additions & 2 deletions cmd/concat/concat_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var Command = &cli.Command{
},
},
Action: func(cCtx *cli.Context) error {
ctx := cCtx.Context
files := cCtx.Args().Slice()
if len(files) == 0 {
log.Error().Msg("arg[0] is empty")
Expand All @@ -59,7 +60,7 @@ var Command = &cli.Command{
Str("output", fnameMuxed).
Strs("input", files).
Msg("concat and remuxing streams...")
if err := concat.Do(fnameMuxed, files); err != nil {
if err := concat.Do(ctx, fnameMuxed, files); err != nil {
log.Error().
Str("output", fnameMuxed).
Strs("input", files).
Expand All @@ -68,7 +69,7 @@ var Command = &cli.Command{
}
if extractAudio {
log.Error().Str("output", fnameAudio).Strs("input", files).Msg("extrating audio...")
if err := concat.Do(fnameAudio, files, concat.WithAudioOnly()); err != nil {
if err := concat.Do(ctx, fnameAudio, files, concat.WithAudioOnly()); err != nil {
log.Error().
Str("output", fnameAudio).
Strs("input", files).
Expand Down
5 changes: 3 additions & 2 deletions cmd/remux/remux_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var Command = &cli.Command{
},
},
Action: func(cCtx *cli.Context) error {
ctx := cCtx.Context
file := cCtx.Args().Get(0)
if file == "" {
log.Error().Msg("arg[0] is empty")
Expand All @@ -54,7 +55,7 @@ var Command = &cli.Command{
fnameAudio := prepareFile(file, "m4a")

log.Info().Str("output", fnameMuxed).Str("input", file).Msg("remuxing stream...")
if err := remux.Do(fnameMuxed, file); err != nil {
if err := remux.Do(ctx, fnameMuxed, file); err != nil {
log.Error().
Str("output", fnameMuxed).
Str("input", file).
Expand All @@ -63,7 +64,7 @@ var Command = &cli.Command{
}
if extractAudio {
log.Error().Str("output", fnameAudio).Str("input", file).Msg("extrating audio...")
if err := remux.Do(fnameAudio, file, remux.WithAudioOnly()); err != nil {
if err := remux.Do(ctx, fnameAudio, file, remux.WithAudioOnly()); err != nil {
log.Error().
Str("output", fnameAudio).
Str("input", file).
Expand Down
8 changes: 4 additions & 4 deletions fc2/fc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
f.log.Info().Str("output", fnameMuxed).Str("input", fnameStream).Msg(
"remuxing stream...",
)
remuxErr = remux.Do(fnameMuxed, fnameStream)
remuxErr = remux.Do(ctx, fnameMuxed, fnameStream)
if remuxErr != nil {
f.log.Error().Err(remuxErr).Msg("ffmpeg remux finished with error")
}
Expand All @@ -284,7 +284,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
f.log.Info().Str("output", fnameAudio).Str("input", fnameStream).Msg(
"extrating audio...",
)
extractAudioErr = remux.Do(fnameAudio, fnameStream, remux.WithAudioOnly())
extractAudioErr = remux.Do(ctx, fnameAudio, fnameStream, remux.WithAudioOnly())
if extractAudioErr != nil {
f.log.Error().Err(extractAudioErr).Msg("ffmpeg audio extract finished with error")
}
Expand All @@ -301,7 +301,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
if f.params.Remux {
concatOpts = append(concatOpts, concat.IgnoreSingle())
}
if concatErr := concat.WithPrefix(f.params.RemuxFormat, nameConcatenatedPrefix, concatOpts...); concatErr != nil {
if concatErr := concat.WithPrefix(ctx, f.params.RemuxFormat, nameConcatenatedPrefix, concatOpts...); concatErr != nil {
f.log.Error().Err(concatErr).Msg("ffmpeg concat finished with error")
}

Expand All @@ -313,7 +313,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
"concatenating audio stream...",
)
concatOpts = append(concatOpts, concat.WithAudioOnly())
if concatErr := concat.WithPrefix("m4a", nameAudioConcatenatedPrefix, concatOpts...); concatErr != nil {
if concatErr := concat.WithPrefix(ctx, "m4a", nameAudioConcatenatedPrefix, concatOpts...); concatErr != nil {
f.log.Error().Err(concatErr).Msg("ffmpeg concat finished with error")
}
}
Expand Down
8 changes: 4 additions & 4 deletions video/concat/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func applyOptions(opts []Option) *Options {
}

// Do concat multiple video streams.
func Do(output string, inputs []string, opts ...Option) error {
ctx, span := otel.Tracer(tracerName).Start(context.Background(), "concat.Do")
func Do(ctx context.Context, output string, inputs []string, opts ...Option) error {
ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.Do")
defer span.End()

o := applyOptions(opts)
Expand Down Expand Up @@ -221,7 +221,7 @@ func filterFiles(
// WithPrefix Concat multiple videos with a prefix.
//
// Prefix can be a path.
func WithPrefix(remuxFormat string, prefix string, opts ...Option) error {
func WithPrefix(ctx context.Context, remuxFormat string, prefix string, opts ...Option) error {
o := applyOptions(opts)
path := filepath.Dir(prefix)
base := filepath.Base(prefix)
Expand All @@ -239,7 +239,7 @@ func WithPrefix(remuxFormat string, prefix string, opts ...Option) error {
return err
}

return Do(prefix+".combined."+remuxFormat, selected, opts...)
return Do(ctx, prefix+".combined."+remuxFormat, selected, opts...)
}

func areFormatMixed(files []string) bool {
Expand Down
11 changes: 9 additions & 2 deletions video/concat/concat_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package concat_test

import (
"context"
_ "net/http/pprof"
"testing"

Expand All @@ -11,11 +12,17 @@ import (
)

func TestDo(t *testing.T) {
err := concat.Do("output.mp4", []string{"input.ts", "input.mp4"})
err := concat.Do(context.Background(), "output.mp4", []string{"input.ts", "input.mp4"})
require.NoError(t, err)
}

func TestWithPrefix(t *testing.T) {
err := concat.WithPrefix("m4a", "input", concat.IgnoreExtension(), concat.WithAudioOnly())
err := concat.WithPrefix(
context.Background(),
"m4a",
"input",
concat.IgnoreExtension(),
concat.WithAudioOnly(),
)
require.NoError(t, err)
}
3 changes: 2 additions & 1 deletion video/concat/concat_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package concat

import (
"context"
"testing"

"github.com/Darkness4/fc2-live-dl-go/video/probe"
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestFilterFiles(t *testing.T) {
}

func TestDo(t *testing.T) {
err := Do("output.mp4", []string{"input.mp4"})
err := Do(context.Background(), "output.mp4", []string{"input.mp4"})
require.NoError(t, err)

err = probe.Do([]string{"output.mp4"}, probe.WithQuiet())
Expand Down
6 changes: 5 additions & 1 deletion video/concat/remux_intermediate_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/Darkness4/fc2-live-dl-go/utils"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
)

// remuxMixedTS remuxes mixed TS/AAC files into intermediate format.
Expand All @@ -19,6 +20,9 @@ func remuxMixedTS(
files []string,
opts ...Option,
) (intermediates []string, useFIFO bool, err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.remuxMixedTS")
defer span.End()

intermediates = make([]string, 0, len(files))

useFIFO = false
Expand Down Expand Up @@ -59,7 +63,7 @@ func remuxMixedTS(
doneCh <- struct{}{}
}()
// Will IO block due to the FIFO
if err := Do(intermediateName, []string{files[i]}, opts...); err != nil {
if err := Do(ctx, intermediateName, []string{files[i]}, opts...); err != nil {
log.Error().
Err(err).
Str("file", files[i]).
Expand Down
8 changes: 6 additions & 2 deletions video/concat/remux_intermediate_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import (

"github.com/Darkness4/fc2-live-dl-go/utils"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
)

// remuxMixedTS remuxes mixed TS/AAC files into intermediate format.
func remuxMixedTS(
_ context.Context, // ctx is not used as operations are IO bound and has finality.
ctx context.Context, // ctx is not used as operations are IO bound and has finality.
files []string,
opts ...Option,
) (intermediates []string, useFIFO bool, err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.Do")
defer span.End()

intermediates = make([]string, 0, len(files))

var wg sync.WaitGroup
Expand All @@ -34,7 +38,7 @@ func remuxMixedTS(
go func() {
defer wg.Done()
// Will IO block due to the FIFO
if err := Do(intermediateName, []string{file}, opts...); err != nil {
if err := Do(ctx, intermediateName, []string{file}, opts...); err != nil {
log.Error().
Err(err).
Str("file", file).
Expand Down
10 changes: 7 additions & 3 deletions video/remux/remux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Package remux provides functions for remuxing videos.
package remux

import "github.com/Darkness4/fc2-live-dl-go/video/concat"
import (
"context"

"github.com/Darkness4/fc2-live-dl-go/video/concat"
)

// Option is the option for remux.
type Option concat.Option
Expand All @@ -12,11 +16,11 @@ func WithAudioOnly() Option {
}

// Do remuxes the input file to the output file.
func Do(output string, input string, opts ...Option) error {
func Do(ctx context.Context, output string, input string, opts ...Option) error {
o := make([]concat.Option, 0, len(opts))
for _, opt := range opts {
o = append(o, concat.Option(opt))
}

return concat.Do(output, []string{input}, o...)
return concat.Do(ctx, output, []string{input}, o...)
}

0 comments on commit a2c0c79

Please sign in to comment.