From 7078629ab9b25c6d526bf566e7e42f96f99ad1bf Mon Sep 17 00:00:00 2001 From: Nguyen Marc Date: Sat, 27 Jul 2024 18:27:59 +0200 Subject: [PATCH] feat(state): add labels to state debugging + metrics --- cmd/watch/watch_command.go | 14 ++++++++++--- fc2/fc2.go | 16 ++++++++++----- state/state.go | 38 ++++++++++++++++++++++++++++++++--- state/state_metrics_helper.go | 32 ++++++++++++++++++++--------- state/state_test.go | 10 ++++++--- 5 files changed, 87 insertions(+), 23 deletions(-) diff --git a/cmd/watch/watch_command.go b/cmd/watch/watch_command.go index 65e4052..da1e114 100644 --- a/cmd/watch/watch_command.go +++ b/cmd/watch/watch_command.go @@ -264,7 +264,11 @@ func handleConfig(ctx context.Context, version string, config *Config) { defer wg.Done() log := log.With().Str("channelID", channelID).Logger() for { - state.DefaultState.SetChannelState(channelID, state.DownloadStateIdle, nil) + state.DefaultState.SetChannelState( + channelID, + state.DownloadStateIdle, + state.WithLabels(params.Labels), + ) if err := notifier.NotifyIdle(ctx, channelID, params.Labels); err != nil { log.Err(err).Msg("notify failed") } @@ -278,7 +282,7 @@ func handleConfig(ctx context.Context, version string, config *Config) { state.DefaultState.SetChannelState( channelID, state.DownloadStateCanceled, - nil, + state.WithLabels(params.Labels), ) if err := notifier.NotifyCanceled( context.Background(), @@ -301,7 +305,11 @@ func handleConfig(ctx context.Context, version string, config *Config) { log.Err(err).Msg("notify failed") } } else { - state.DefaultState.SetChannelState(channelID, state.DownloadStateFinished, nil) + state.DefaultState.SetChannelState( + channelID, + state.DownloadStateFinished, + state.WithLabels(params.Labels), + ) if err := notifier.NotifyFinished(ctx, channelID, params.Labels, meta); err != nil { log.Err(err).Msg("notify failed") } diff --git a/fc2/fc2.go b/fc2/fc2.go index 89398f5..efdc08e 100644 --- a/fc2/fc2.go +++ b/fc2/fc2.go @@ -109,7 +109,11 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { ) } span.AddEvent("preparing files") - state.DefaultState.SetChannelState(f.channelID, state.DownloadStatePreparingFiles, nil) + state.DefaultState.SetChannelState( + f.channelID, + state.DownloadStatePreparingFiles, + state.WithLabels(f.params.Labels), + ) if err := notifier.NotifyPreparingFiles(ctx, f.channelID, f.params.Labels, meta); err != nil { log.Err(err).Msg("notify failed") } @@ -225,9 +229,10 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { state.DefaultState.SetChannelState( f.channelID, state.DownloadStateDownloading, - map[string]interface{}{ + state.WithLabels(f.params.Labels), + state.WithExtra(map[string]interface{}{ "metadata": meta, - }, + }), ) if err := notifier.NotifyDownloading( ctx, @@ -268,9 +273,10 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) { state.DefaultState.SetChannelState( f.channelID, state.DownloadStatePostProcessing, - map[string]interface{}{ + state.WithLabels(f.params.Labels), + state.WithExtra(map[string]interface{}{ "metadata": meta, - }, + }), ) if err := notifier.NotifyPostProcessing( ctx, diff --git a/state/state.go b/state/state.go index 608462e..3a0da00 100644 --- a/state/state.go +++ b/state/state.go @@ -19,6 +19,7 @@ type State struct { type ChannelState struct { DownloadState DownloadState `json:"state"` Extra map[string]interface{} `json:"extra,omitempty"` + Labels map[string]string `json:"labels,omitempty"` Errors []DownloadError `json:"errors_log"` } @@ -123,8 +124,38 @@ func (s *State) GetChannelState(name string) DownloadState { return DownloadStateUnspecified } +type setChannelStateOptions struct { + labels map[string]string + extra map[string]interface{} +} + +// SetChannelStateOptions represents options for SetChannelState. +type SetChannelStateOptions func(*setChannelStateOptions) + +// WithLabels sets labels for a channel. +func WithLabels(labels map[string]string) SetChannelStateOptions { + return func(o *setChannelStateOptions) { + o.labels = labels + } +} + +// WithExtra sets extra data for a channel. +func WithExtra(extra map[string]interface{}) SetChannelStateOptions { + return func(o *setChannelStateOptions) { + o.extra = extra + } +} + // SetChannelState sets the state for a channel. -func (s *State) SetChannelState(name string, state DownloadState, extra map[string]interface{}) { +func (s *State) SetChannelState( + name string, + state DownloadState, + opts ...SetChannelStateOptions, +) { + o := &setChannelStateOptions{} + for _, opt := range opts { + opt(o) + } s.mu.Lock() defer s.mu.Unlock() if _, ok := s.Channels[name]; !ok { @@ -133,8 +164,9 @@ func (s *State) SetChannelState(name string, state DownloadState, extra map[stri } } s.Channels[name].DownloadState = state - s.Channels[name].Extra = extra - setStateMetrics(context.Background(), name, state) + s.Channels[name].Extra = o.extra + s.Channels[name].Labels = o.labels + setStateMetrics(context.Background(), name, state, o.labels) } // SetChannelError sets an error for a channel. diff --git a/state/state_metrics_helper.go b/state/state_metrics_helper.go index 10f280a..4a48394 100644 --- a/state/state_metrics_helper.go +++ b/state/state_metrics_helper.go @@ -9,18 +9,32 @@ import ( ) // setStateMetrics demuxes the state to the metrics. -func setStateMetrics(ctx context.Context, channelID string, state DownloadState) { - m := metrics.Watcher.State - m.Record(ctx, 1, metric.WithAttributes( +func setStateMetrics( + ctx context.Context, + channelID string, + state DownloadState, + labels map[string]string, +) { + attrs := []attribute.KeyValue{ attribute.String("channel_id", channelID), - attribute.String("state", state.String()), - )) + } + for k, v := range labels { + attrs = append(attrs, attribute.String(k, v)) + } + m := metrics.Watcher.State + m.Record( + ctx, + 1, + metric.WithAttributes(append(attrs, attribute.String("state", state.String()))...), + ) + // Remove the rest of the states from the metrics. for i := DownloadStateUnspecified; i <= DownloadStateCanceled; i++ { if i != state { - m.Record(ctx, 0, metric.WithAttributes( - attribute.String("channel_id", channelID), - attribute.String("state", i.String()), - )) + m.Record( + ctx, + 0, + metric.WithAttributes(append(attrs, attribute.String("state", i.String()))...), + ) } } } diff --git a/state/state_test.go b/state/state_test.go index 3385741..5d18ec4 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -15,9 +15,13 @@ func TestSetChannelState(t *testing.T) { } // Test - s.SetChannelState("test", state.DownloadStateDownloading, map[string]interface{}{ - "metadata": "meta", - }) + s.SetChannelState( + "test", + state.DownloadStateDownloading, + state.WithExtra(map[string]interface{}{ + "metadata": "meta", + }), + ) // Assert require.Equal(t, state.DownloadStateDownloading, s.GetChannelState("test"))