From a2c0c7943b2e0570269da43ba1cecf91fd937510 Mon Sep 17 00:00:00 2001 From: Nguyen Marc Date: Sat, 13 Jul 2024 21:37:27 +0200 Subject: [PATCH] fix(trace): pass context to concat --- cmd/concat/concat_command.go | 5 +++-- cmd/remux/remux_command.go | 5 +++-- fc2/fc2.go | 8 ++++---- video/concat/concat.go | 8 ++++---- video/concat/concat_integration_test.go | 11 +++++++++-- video/concat/concat_test.go | 3 ++- video/concat/remux_intermediate_unix.go | 6 +++++- video/concat/remux_intermediate_windows.go | 8 ++++++-- video/remux/remux.go | 10 +++++++--- 9 files changed, 43 insertions(+), 21 deletions(-) diff --git a/cmd/concat/concat_command.go b/cmd/concat/concat_command.go index 9fccdb9..b857243 100644 --- a/cmd/concat/concat_command.go +++ b/cmd/concat/concat_command.go @@ -40,6 +40,7 @@ var Command = &cli.Command{ }, }, Action: func(cCtx *cli.Context) error { + ctx := cCtx.Context files := cCtx.Args().Slice() if len(files) == 0 { log.Error().Msg("arg[0] is empty") @@ -59,7 +60,7 @@ var Command = &cli.Command{ Str("output", fnameMuxed). Strs("input", files). Msg("concat and remuxing streams...") - if err := concat.Do(fnameMuxed, files); err != nil { + if err := concat.Do(ctx, fnameMuxed, files); err != nil { log.Error(). Str("output", fnameMuxed). Strs("input", files). @@ -68,7 +69,7 @@ var Command = &cli.Command{ } if extractAudio { log.Error().Str("output", fnameAudio).Strs("input", files).Msg("extrating audio...") - if err := concat.Do(fnameAudio, files, concat.WithAudioOnly()); err != nil { + if err := concat.Do(ctx, fnameAudio, files, concat.WithAudioOnly()); err != nil { log.Error(). Str("output", fnameAudio). Strs("input", files). diff --git a/cmd/remux/remux_command.go b/cmd/remux/remux_command.go index b243ced..79ceb82 100644 --- a/cmd/remux/remux_command.go +++ b/cmd/remux/remux_command.go @@ -40,6 +40,7 @@ var Command = &cli.Command{ }, }, Action: func(cCtx *cli.Context) error { + ctx := cCtx.Context file := cCtx.Args().Get(0) if file == "" { log.Error().Msg("arg[0] is empty") @@ -54,7 +55,7 @@ var Command = &cli.Command{ fnameAudio := prepareFile(file, "m4a") log.Info().Str("output", fnameMuxed).Str("input", file).Msg("remuxing stream...") - if err := remux.Do(fnameMuxed, file); err != nil { + if err := remux.Do(ctx, fnameMuxed, file); err != nil { log.Error(). Str("output", fnameMuxed). Str("input", file). @@ -63,7 +64,7 @@ var Command = &cli.Command{ } if extractAudio { log.Error().Str("output", fnameAudio).Str("input", file).Msg("extrating audio...") - if err := remux.Do(fnameAudio, file, remux.WithAudioOnly()); err != nil { + if err := remux.Do(ctx, fnameAudio, file, remux.WithAudioOnly()); err != nil { log.Error(). Str("output", fnameAudio). Str("input", file). diff --git a/fc2/fc2.go b/fc2/fc2.go index 8035ff6..a7d3fa4 100644 --- a/fc2/fc2.go +++ b/fc2/fc2.go @@ -273,7 +273,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { f.log.Info().Str("output", fnameMuxed).Str("input", fnameStream).Msg( "remuxing stream...", ) - remuxErr = remux.Do(fnameMuxed, fnameStream) + remuxErr = remux.Do(ctx, fnameMuxed, fnameStream) if remuxErr != nil { f.log.Error().Err(remuxErr).Msg("ffmpeg remux finished with error") } @@ -284,7 +284,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { f.log.Info().Str("output", fnameAudio).Str("input", fnameStream).Msg( "extrating audio...", ) - extractAudioErr = remux.Do(fnameAudio, fnameStream, remux.WithAudioOnly()) + extractAudioErr = remux.Do(ctx, fnameAudio, fnameStream, remux.WithAudioOnly()) if extractAudioErr != nil { f.log.Error().Err(extractAudioErr).Msg("ffmpeg audio extract finished with error") } @@ -301,7 +301,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { if f.params.Remux { concatOpts = append(concatOpts, concat.IgnoreSingle()) } - if concatErr := concat.WithPrefix(f.params.RemuxFormat, nameConcatenatedPrefix, concatOpts...); concatErr != nil { + if concatErr := concat.WithPrefix(ctx, f.params.RemuxFormat, nameConcatenatedPrefix, concatOpts...); concatErr != nil { f.log.Error().Err(concatErr).Msg("ffmpeg concat finished with error") } @@ -313,7 +313,7 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { "concatenating audio stream...", ) concatOpts = append(concatOpts, concat.WithAudioOnly()) - if concatErr := concat.WithPrefix("m4a", nameAudioConcatenatedPrefix, concatOpts...); concatErr != nil { + if concatErr := concat.WithPrefix(ctx, "m4a", nameAudioConcatenatedPrefix, concatOpts...); concatErr != nil { f.log.Error().Err(concatErr).Msg("ffmpeg concat finished with error") } } diff --git a/video/concat/concat.go b/video/concat/concat.go index 4031ae3..04b55e9 100644 --- a/video/concat/concat.go +++ b/video/concat/concat.go @@ -90,8 +90,8 @@ func applyOptions(opts []Option) *Options { } // Do concat multiple video streams. -func Do(output string, inputs []string, opts ...Option) error { - ctx, span := otel.Tracer(tracerName).Start(context.Background(), "concat.Do") +func Do(ctx context.Context, output string, inputs []string, opts ...Option) error { + ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.Do") defer span.End() o := applyOptions(opts) @@ -221,7 +221,7 @@ func filterFiles( // WithPrefix Concat multiple videos with a prefix. // // Prefix can be a path. -func WithPrefix(remuxFormat string, prefix string, opts ...Option) error { +func WithPrefix(ctx context.Context, remuxFormat string, prefix string, opts ...Option) error { o := applyOptions(opts) path := filepath.Dir(prefix) base := filepath.Base(prefix) @@ -239,7 +239,7 @@ func WithPrefix(remuxFormat string, prefix string, opts ...Option) error { return err } - return Do(prefix+".combined."+remuxFormat, selected, opts...) + return Do(ctx, prefix+".combined."+remuxFormat, selected, opts...) } func areFormatMixed(files []string) bool { diff --git a/video/concat/concat_integration_test.go b/video/concat/concat_integration_test.go index 3f79a1f..5ea1c59 100644 --- a/video/concat/concat_integration_test.go +++ b/video/concat/concat_integration_test.go @@ -3,6 +3,7 @@ package concat_test import ( + "context" _ "net/http/pprof" "testing" @@ -11,11 +12,17 @@ import ( ) func TestDo(t *testing.T) { - err := concat.Do("output.mp4", []string{"input.ts", "input.mp4"}) + err := concat.Do(context.Background(), "output.mp4", []string{"input.ts", "input.mp4"}) require.NoError(t, err) } func TestWithPrefix(t *testing.T) { - err := concat.WithPrefix("m4a", "input", concat.IgnoreExtension(), concat.WithAudioOnly()) + err := concat.WithPrefix( + context.Background(), + "m4a", + "input", + concat.IgnoreExtension(), + concat.WithAudioOnly(), + ) require.NoError(t, err) } diff --git a/video/concat/concat_test.go b/video/concat/concat_test.go index a54c057..526e642 100644 --- a/video/concat/concat_test.go +++ b/video/concat/concat_test.go @@ -1,6 +1,7 @@ package concat import ( + "context" "testing" "github.com/Darkness4/fc2-live-dl-go/video/probe" @@ -103,7 +104,7 @@ func TestFilterFiles(t *testing.T) { } func TestDo(t *testing.T) { - err := Do("output.mp4", []string{"input.mp4"}) + err := Do(context.Background(), "output.mp4", []string{"input.mp4"}) require.NoError(t, err) err = probe.Do([]string{"output.mp4"}, probe.WithQuiet()) diff --git a/video/concat/remux_intermediate_unix.go b/video/concat/remux_intermediate_unix.go index 7de0483..f66fdf5 100644 --- a/video/concat/remux_intermediate_unix.go +++ b/video/concat/remux_intermediate_unix.go @@ -11,6 +11,7 @@ import ( "github.com/Darkness4/fc2-live-dl-go/utils" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" ) // remuxMixedTS remuxes mixed TS/AAC files into intermediate format. @@ -19,6 +20,9 @@ func remuxMixedTS( files []string, opts ...Option, ) (intermediates []string, useFIFO bool, err error) { + ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.remuxMixedTS") + defer span.End() + intermediates = make([]string, 0, len(files)) useFIFO = false @@ -59,7 +63,7 @@ func remuxMixedTS( doneCh <- struct{}{} }() // Will IO block due to the FIFO - if err := Do(intermediateName, []string{files[i]}, opts...); err != nil { + if err := Do(ctx, intermediateName, []string{files[i]}, opts...); err != nil { log.Error(). Err(err). Str("file", files[i]). diff --git a/video/concat/remux_intermediate_windows.go b/video/concat/remux_intermediate_windows.go index a94644a..707809d 100644 --- a/video/concat/remux_intermediate_windows.go +++ b/video/concat/remux_intermediate_windows.go @@ -8,14 +8,18 @@ import ( "github.com/Darkness4/fc2-live-dl-go/utils" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" ) // remuxMixedTS remuxes mixed TS/AAC files into intermediate format. func remuxMixedTS( - _ context.Context, // ctx is not used as operations are IO bound and has finality. + ctx context.Context, // ctx is not used as operations are IO bound and has finality. files []string, opts ...Option, ) (intermediates []string, useFIFO bool, err error) { + ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.Do") + defer span.End() + intermediates = make([]string, 0, len(files)) var wg sync.WaitGroup @@ -34,7 +38,7 @@ func remuxMixedTS( go func() { defer wg.Done() // Will IO block due to the FIFO - if err := Do(intermediateName, []string{file}, opts...); err != nil { + if err := Do(ctx, intermediateName, []string{file}, opts...); err != nil { log.Error(). Err(err). Str("file", file). diff --git a/video/remux/remux.go b/video/remux/remux.go index 8caa3b7..3d2fe7b 100644 --- a/video/remux/remux.go +++ b/video/remux/remux.go @@ -1,7 +1,11 @@ // Package remux provides functions for remuxing videos. package remux -import "github.com/Darkness4/fc2-live-dl-go/video/concat" +import ( + "context" + + "github.com/Darkness4/fc2-live-dl-go/video/concat" +) // Option is the option for remux. type Option concat.Option @@ -12,11 +16,11 @@ func WithAudioOnly() Option { } // Do remuxes the input file to the output file. -func Do(output string, input string, opts ...Option) error { +func Do(ctx context.Context, output string, input string, opts ...Option) error { o := make([]concat.Option, 0, len(opts)) for _, opt := range opts { o = append(o, concat.Option(opt)) } - return concat.Do(output, []string{input}, o...) + return concat.Do(ctx, output, []string{input}, o...) }