diff --git a/CHANGELOG.md b/CHANGELOG.md index c2a13bbd..593d9eae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Changed - Split file and fs provider (#49). +- Split Watch and OnChange for watching configuration changes (#52). ### Removed diff --git a/benchmark_test.go b/benchmark_test.go index ae859f34..6c6ff8b5 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -60,5 +60,10 @@ func BenchmarkWatch(b *testing.B) { } b.StopTimer() - assert.Equal(b, "changed", konf.Get[string]("config")) + var cfg string + config.OnChange(func(unmarshaler konf.Unmarshaler) { + assert.NoError(b, config.Unmarshal("config", &cfg)) + }) + watcher.change(map[string]any{"config": "changed"}) + assert.Equal(b, "changed", cfg) } diff --git a/config.go b/config.go index 46bdc57e..51393caf 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,11 @@ type Config struct { values *provider providers []*provider + onChanges map[string][]func(Unmarshaler) +} + +type Unmarshaler interface { + Unmarshal(path string, target any) error } // New returns a Config with the given Option(s). @@ -29,6 +34,7 @@ func New(opts ...Option) (Config, error) { config := option.Config config.values = &provider{values: make(map[string]any)} config.providers = make([]*provider, 0, len(option.loaders)) + config.onChanges = make(map[string][]func(Unmarshaler)) for _, loader := range option.loaders { if loader == nil { @@ -65,8 +71,8 @@ func New(opts ...Option) (Config, error) { // It blocks until ctx is done, or the service returns an error. // // It only can be called once. Call after first has no effects. -func (c Config) Watch(ctx context.Context) error { //nolint:funlen - changeChan := make(chan struct{}) +func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocognit + changeChan := make(chan []func(Unmarshaler)) defer close(changeChan) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -77,28 +83,37 @@ func (c Config) Watch(ctx context.Context) error { //nolint:funlen waitGroup sync.WaitGroup hasWatcher bool ) - for _, provider := range c.providers { - if provider.watcher != nil { - provider := provider + for _, p := range c.providers { + if p.watcher != nil { + watcher := p - provider.watchOnce.Do(func() { + watcher.watchOnce.Do(func() { hasWatcher = true waitGroup.Add(1) go func() { defer waitGroup.Done() - if err := provider.watcher.Watch( - ctx, - func(values map[string]any) { - provider.values = values - slog.Info( - "Configuration has been changed.", - "watcher", provider.watcher, - ) - changeChan <- struct{}{} - }, - ); err != nil { + onChange := func(values map[string]any) { + slog.Info( + "Configuration has been changed.", + "watcher", watcher.watcher, + ) + + // Find the onChanges should be triggered. + oldValues := &provider{values: watcher.values} + newValues := &provider{values: values} + var callbacks []func(Unmarshaler) + for path, onChanges := range c.onChanges { + if oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil { + callbacks = append(callbacks, onChanges...) + } + } + + watcher.values = values + changeChan <- callbacks + } + if err := watcher.watcher.Watch(ctx, onChange); err != nil { errOnce.Do(func() { firstErr = fmt.Errorf("[konf] watch configuration change: %w", err) cancel() @@ -119,13 +134,17 @@ func (c Config) Watch(ctx context.Context) error { //nolint:funlen for { select { - case <-changeChan: + case onChanges := <-changeChan: values := make(map[string]any) for _, w := range c.providers { maps.Merge(values, w.values) } c.values.values = values + for _, onChange := range onChanges { + onChange(c) + } + case <-ctx.Done(): return } @@ -142,6 +161,40 @@ type provider struct { values map[string]any } +func (p *provider) sub(path string, delimiter string) any { + if path == "" { + return p.values + } + + var next any = p.values + for _, key := range strings.Split(strings.ToLower(path), delimiter) { + mp, ok := next.(map[string]any) + if !ok { + return nil + } + + val, exist := mp[key] + if !exist { + return nil + } + next = val + } + + return next +} + +// OnChange executes the given onChange function while the value of any given path +// (or any value is no paths) have been changed. +func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) { + if len(paths) == 0 { + paths = []string{""} + } + + for _, path := range paths { + c.onChanges[path] = append(c.onChanges[path], onchange) + } +} + // Unmarshal loads configuration under the given path into the given object // pointed to by target. It supports [mapstructure] tags on struct fields. // @@ -163,31 +216,9 @@ func (c Config) Unmarshal(path string, target any) error { return fmt.Errorf("[konf] new decoder: %w", err) } - if err := decoder.Decode(c.sub(path)); err != nil { + if err := decoder.Decode(c.values.sub(path, c.delimiter)); err != nil { return fmt.Errorf("[konf] decode: %w", err) } return nil } - -func (c Config) sub(path string) any { - if path == "" { - return c.values.values - } - - var next any = c.values.values - for _, key := range strings.Split(strings.ToLower(path), c.delimiter) { - mp, ok := next.(map[string]any) - if !ok { - return nil - } - - val, exist := mp[key] - if !exist { - return nil - } - next = val - } - - return next -} diff --git a/config_test.go b/config_test.go index eaa45886..a6551de3 100644 --- a/config_test.go +++ b/config_test.go @@ -167,8 +167,10 @@ func TestConfig_Watch(t *testing.T) { assert.NoError(t, config.Watch(ctx)) }() + config.OnChange(func(unmarshaler konf.Unmarshaler) { + assert.NoError(t, config.Unmarshal("config", &cfg)) + }) watcher.change(map[string]any{"config": "changed"}) - assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "changed", cfg) } diff --git a/global.go b/global.go index babb50cb..54a286b7 100644 --- a/global.go +++ b/global.go @@ -57,6 +57,15 @@ func Watch(ctx context.Context) error { return global.Watch(ctx) } +// OnChange executes the given onChange function while the value of any given path +// (or any value is no paths) have been changed. +func OnChange(onChange func(), paths ...string) { + mux.RLock() + defer mux.RUnlock() + + global.OnChange(func(Unmarshaler) { onChange() }, paths...) +} + // SetGlobal makes c the global Config. After this call, // the konf package's functions (e.g. konf.Get) will read from the global config. // diff --git a/global_test.go b/global_test.go index fd93cdeb..31694d50 100644 --- a/global_test.go +++ b/global_test.go @@ -64,6 +64,10 @@ func TestWatch(t *testing.T) { assert.NoError(t, konf.Watch(ctx)) }() + var cfg string + config.OnChange(func(unmarshaler konf.Unmarshaler) { + assert.NoError(t, config.Unmarshal("config", &cfg)) + }) watcher.change(map[string]any{"config": "changed"}) - assert.Equal(t, "changed", konf.Get[string]("config")) + assert.Equal(t, "changed", cfg) }