Skip to content

Commit

Permalink
feat(state): add labels to state debugging + metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkness4 committed Jul 27, 2024
1 parent b4ef363 commit 7078629
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 23 deletions.
14 changes: 11 additions & 3 deletions cmd/watch/watch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ func handleConfig(ctx context.Context, version string, config *Config) {
defer wg.Done()
log := log.With().Str("channelID", channelID).Logger()
for {
state.DefaultState.SetChannelState(channelID, state.DownloadStateIdle, nil)
state.DefaultState.SetChannelState(
channelID,
state.DownloadStateIdle,
state.WithLabels(params.Labels),
)
if err := notifier.NotifyIdle(ctx, channelID, params.Labels); err != nil {
log.Err(err).Msg("notify failed")
}
Expand All @@ -278,7 +282,7 @@ func handleConfig(ctx context.Context, version string, config *Config) {
state.DefaultState.SetChannelState(
channelID,
state.DownloadStateCanceled,
nil,
state.WithLabels(params.Labels),
)
if err := notifier.NotifyCanceled(
context.Background(),
Expand All @@ -301,7 +305,11 @@ func handleConfig(ctx context.Context, version string, config *Config) {
log.Err(err).Msg("notify failed")
}
} else {
state.DefaultState.SetChannelState(channelID, state.DownloadStateFinished, nil)
state.DefaultState.SetChannelState(
channelID,
state.DownloadStateFinished,
state.WithLabels(params.Labels),
)
if err := notifier.NotifyFinished(ctx, channelID, params.Labels, meta); err != nil {
log.Err(err).Msg("notify failed")
}
Expand Down
16 changes: 11 additions & 5 deletions fc2/fc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
)
}
span.AddEvent("preparing files")
state.DefaultState.SetChannelState(f.channelID, state.DownloadStatePreparingFiles, nil)
state.DefaultState.SetChannelState(
f.channelID,
state.DownloadStatePreparingFiles,
state.WithLabels(f.params.Labels),
)
if err := notifier.NotifyPreparingFiles(ctx, f.channelID, f.params.Labels, meta); err != nil {
log.Err(err).Msg("notify failed")
}
Expand Down Expand Up @@ -225,9 +229,10 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
state.DefaultState.SetChannelState(
f.channelID,
state.DownloadStateDownloading,
map[string]interface{}{
state.WithLabels(f.params.Labels),
state.WithExtra(map[string]interface{}{
"metadata": meta,
},
}),
)
if err := notifier.NotifyDownloading(
ctx,
Expand Down Expand Up @@ -268,9 +273,10 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
state.DefaultState.SetChannelState(
f.channelID,
state.DownloadStatePostProcessing,
map[string]interface{}{
state.WithLabels(f.params.Labels),
state.WithExtra(map[string]interface{}{
"metadata": meta,
},
}),
)
if err := notifier.NotifyPostProcessing(
ctx,
Expand Down
38 changes: 35 additions & 3 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type State struct {
type ChannelState struct {
DownloadState DownloadState `json:"state"`
Extra map[string]interface{} `json:"extra,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Errors []DownloadError `json:"errors_log"`
}

Expand Down Expand Up @@ -123,8 +124,38 @@ func (s *State) GetChannelState(name string) DownloadState {
return DownloadStateUnspecified
}

type setChannelStateOptions struct {
labels map[string]string
extra map[string]interface{}
}

// SetChannelStateOptions represents options for SetChannelState.
type SetChannelStateOptions func(*setChannelStateOptions)

// WithLabels sets labels for a channel.
func WithLabels(labels map[string]string) SetChannelStateOptions {
return func(o *setChannelStateOptions) {
o.labels = labels
}
}

// WithExtra sets extra data for a channel.
func WithExtra(extra map[string]interface{}) SetChannelStateOptions {
return func(o *setChannelStateOptions) {
o.extra = extra
}
}

// SetChannelState sets the state for a channel.
func (s *State) SetChannelState(name string, state DownloadState, extra map[string]interface{}) {
func (s *State) SetChannelState(
name string,
state DownloadState,
opts ...SetChannelStateOptions,
) {
o := &setChannelStateOptions{}
for _, opt := range opts {
opt(o)
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.Channels[name]; !ok {
Expand All @@ -133,8 +164,9 @@ func (s *State) SetChannelState(name string, state DownloadState, extra map[stri
}
}
s.Channels[name].DownloadState = state
s.Channels[name].Extra = extra
setStateMetrics(context.Background(), name, state)
s.Channels[name].Extra = o.extra
s.Channels[name].Labels = o.labels
setStateMetrics(context.Background(), name, state, o.labels)
}

// SetChannelError sets an error for a channel.
Expand Down
32 changes: 23 additions & 9 deletions state/state_metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,32 @@ import (
)

// setStateMetrics demuxes the state to the metrics.
func setStateMetrics(ctx context.Context, channelID string, state DownloadState) {
m := metrics.Watcher.State
m.Record(ctx, 1, metric.WithAttributes(
func setStateMetrics(
ctx context.Context,
channelID string,
state DownloadState,
labels map[string]string,
) {
attrs := []attribute.KeyValue{
attribute.String("channel_id", channelID),
attribute.String("state", state.String()),
))
}
for k, v := range labels {
attrs = append(attrs, attribute.String(k, v))
}
m := metrics.Watcher.State
m.Record(
ctx,
1,
metric.WithAttributes(append(attrs, attribute.String("state", state.String()))...),
)
// Remove the rest of the states from the metrics.
for i := DownloadStateUnspecified; i <= DownloadStateCanceled; i++ {
if i != state {
m.Record(ctx, 0, metric.WithAttributes(
attribute.String("channel_id", channelID),
attribute.String("state", i.String()),
))
m.Record(
ctx,
0,
metric.WithAttributes(append(attrs, attribute.String("state", i.String()))...),
)
}
}
}
10 changes: 7 additions & 3 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ func TestSetChannelState(t *testing.T) {
}

// Test
s.SetChannelState("test", state.DownloadStateDownloading, map[string]interface{}{
"metadata": "meta",
})
s.SetChannelState(
"test",
state.DownloadStateDownloading,
state.WithExtra(map[string]interface{}{
"metadata": "meta",
}),
)

// Assert
require.Equal(t, state.DownloadStateDownloading, s.GetChannelState("test"))
Expand Down

0 comments on commit 7078629

Please sign in to comment.