From 33bb5750e3925f00e5767a4701badedb61296f1d Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 12:45:47 -0800 Subject: [PATCH 1/8] rewrite watch --- README.md | 8 ++-- benchmark_test.go | 18 ++------ config.go | 110 ++++++++++++++++++++++------------------------ config_test.go | 41 ++++++++--------- global.go | 17 +++---- global_test.go | 19 ++------ option.go | 4 +- provider.go | 4 +- 8 files changed, 93 insertions(+), 128 deletions(-) diff --git a/README.md b/README.md index 4f2bc308..77c3a4d7 100644 --- a/README.md +++ b/README.md @@ -52,9 +52,11 @@ Application also can watch the changes of configuration like: func main() { // ... setup global Config ... - konf.Watch(func(){ - // Read configuration and reconfig application. - }) + go func() { + if err := konf.Watch(ctx); err != nil { + // Handle error here. + } + } // ... other setup code ... } diff --git a/benchmark_test.go b/benchmark_test.go index d2992c95..ae859f34 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -5,7 +5,6 @@ package konf_test import ( "context" - "sync" "testing" "github.com/ktong/konf" @@ -14,7 +13,7 @@ import ( func BenchmarkNew(b *testing.B) { var ( - config *konf.Config + config konf.Config err error ) for i := 0; i < b.N; i++ { @@ -48,27 +47,18 @@ func BenchmarkWatch(b *testing.B) { assert.NoError(b, err) konf.SetGlobal(config) - cfg := konf.Get[string]("config") - assert.Equal(b, "string", cfg) - var waitGroup sync.WaitGroup - waitGroup.Add(b.N) + assert.Equal(b, "string", konf.Get[string]("config")) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { - err := konf.Watch(ctx, func() { - defer waitGroup.Done() - - cfg = konf.Get[string]("config") - }) - assert.NoError(b, err) + assert.NoError(b, konf.Watch(ctx)) }() b.ResetTimer() for i := 0; i < b.N; i++ { watcher.change(map[string]any{"config": "changed"}) } - waitGroup.Wait() b.StopTimer() - assert.Equal(b, "changed", cfg) + assert.Equal(b, "changed", konf.Get[string]("config")) } diff --git a/config.go b/config.go index 554806fa..04d91772 100644 --- a/config.go +++ b/config.go @@ -5,7 +5,6 @@ package konf import ( "context" - "errors" "fmt" "log/slog" "strings" @@ -22,11 +21,10 @@ type Config struct { values *provider providers []*provider - watchOnce sync.Once } // New returns a Config with the given Option(s). -func New(opts ...Option) (*Config, error) { +func New(opts ...Option) (Config, error) { option := apply(opts) config := option.Config config.values = &provider{values: make(map[string]any)} @@ -43,7 +41,7 @@ func New(opts ...Option) (*Config, error) { values, err := loader.Load() if err != nil { - return nil, fmt.Errorf("[konf] load configuration: %w", err) + return Config{}, fmt.Errorf("[konf] load configuration: %w", err) } maps.Merge(config.values.values, values) slog.Info( @@ -67,7 +65,7 @@ func New(opts ...Option) (*Config, error) { // pointed to by target. It supports [mapstructure] tags on struct fields. // // The path is case-insensitive. -func (c *Config) Unmarshal(path string, target any) error { +func (c Config) Unmarshal(path string, target any) error { decoder, err := mapstructure.NewDecoder( &mapstructure.DecoderConfig{ Metadata: nil, @@ -91,7 +89,7 @@ func (c *Config) Unmarshal(path string, target any) error { return nil } -func (c *Config) sub(path string) any { +func (c Config) sub(path string) any { if path == "" { return c.values.values } @@ -113,25 +111,58 @@ func (c *Config) sub(path string) any { return next } -// Watch watches configuration and triggers callbacks when it changes. +// Watch watches and updates configuration when it changes. // It blocks until ctx is done, or the service returns an error. // -// It only can be called once. Call after first returns an error. -func (c *Config) Watch(ctx context.Context, fns ...func(*Config)) error { //nolint:funlen - var first bool - c.watchOnce.Do(func() { - first = true - }) - if !first { - return errOnlyOnce - } - +// It only can be called once. Call after first has no effects. +func (c Config) Watch(ctx context.Context) error { //nolint:funlen changeChan := make(chan struct{}) defer close(changeChan) ctx, cancel := context.WithCancel(ctx) defer cancel() - var waitGroup sync.WaitGroup + var ( + firstErr error + errOnce sync.Once + waitGroup sync.WaitGroup + hasWatcher bool + ) + for _, provider := range c.providers { + if provider.watcher != nil { + provider := provider + + provider.watchOnce.Do(func() { + hasWatcher = true + + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + if err := provider.watcher.Watch( + ctx, + func(values map[string]any) { + provider.values = values + slog.Info( + "Configuration has been changed.", + "watcher", provider.watcher, + ) + changeChan <- struct{}{} + }, + ); err != nil { + errOnce.Do(func() { + firstErr = fmt.Errorf("[konf] watch configuration change: %w", err) + cancel() + }) + } + }() + }) + } + } + + if !hasWatcher { + return nil + } + waitGroup.Add(1) go func() { defer waitGroup.Done() @@ -145,55 +176,18 @@ func (c *Config) Watch(ctx context.Context, fns ...func(*Config)) error { //noli } c.values.values = values - for _, fn := range fns { - fn(c) - } - case <-ctx.Done(): return } } }() - - var ( - firstErr error - errOnce sync.Once - ) - for _, provider := range c.providers { - if provider.watcher != nil { - provider := provider - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - - if err := provider.watcher.Watch( - ctx, - func(values map[string]any) { - provider.values = values - slog.Info( - "Configuration has been changed.", - "watcher", provider.watcher, - ) - changeChan <- struct{}{} - }, - ); err != nil { - errOnce.Do(func() { - firstErr = fmt.Errorf("[konf] watch configuration change: %w", err) - cancel() - }) - } - }() - } - } waitGroup.Wait() return firstErr } -var errOnlyOnce = errors.New("[konf] Watch only can be called once") - type provider struct { - watcher Watcher - values map[string]any + watcher Watcher + watchOnce sync.Once + values map[string]any } diff --git a/config_test.go b/config_test.go index 9f218fe7..eaa45886 100644 --- a/config_test.go +++ b/config_test.go @@ -6,8 +6,8 @@ package konf_test import ( "context" "errors" - "sync" "testing" + "time" "github.com/ktong/konf" "github.com/ktong/konf/internal/assert" @@ -19,11 +19,11 @@ func TestConfig_Unmarshal(t *testing.T) { testcases := []struct { description string opts []konf.Option - assert func(*konf.Config) + assert func(konf.Config) }{ { description: "empty values", - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "", cfg) @@ -32,7 +32,7 @@ func TestConfig_Unmarshal(t *testing.T) { { description: "nil loader", opts: []konf.Option{konf.WithLoader(nil)}, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "", cfg) @@ -41,7 +41,7 @@ func TestConfig_Unmarshal(t *testing.T) { { description: "for primary type", opts: []konf.Option{konf.WithLoader(mapLoader{"config": "string"})}, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "string", cfg) @@ -50,7 +50,7 @@ func TestConfig_Unmarshal(t *testing.T) { { description: "config for struct", opts: []konf.Option{konf.WithLoader(mapLoader{"config": "struct"})}, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg struct { Config string } @@ -69,7 +69,7 @@ func TestConfig_Unmarshal(t *testing.T) { }, ), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config.nest", &cfg)) assert.Equal(t, "string", cfg) @@ -87,7 +87,7 @@ func TestConfig_Unmarshal(t *testing.T) { }, ), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config_nest", &cfg)) assert.Equal(t, "string", cfg) @@ -104,7 +104,7 @@ func TestConfig_Unmarshal(t *testing.T) { }, ), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config.nest", &cfg)) assert.Equal(t, "", cfg) @@ -115,7 +115,7 @@ func TestConfig_Unmarshal(t *testing.T) { opts: []konf.Option{ konf.WithLoader(mapLoader{}), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var configured bool assert.NoError(t, config.Unmarshal("configured", &configured)) assert.True(t, configured) @@ -138,7 +138,11 @@ func TestConfig_Unmarshal(t *testing.T) { type mapLoader map[string]any -func (m mapLoader) WithConfig(*konf.Config) { +func (m mapLoader) WithConfig( + interface { + Unmarshal(path string, target any) error + }, +) { m["configured"] = true } @@ -159,21 +163,12 @@ func TestConfig_Watch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - var waitGroup sync.WaitGroup - waitGroup.Add(1) go func() { - err := config.Watch(ctx, func(config *konf.Config) { - defer waitGroup.Done() - - assert.NoError(t, config.Unmarshal("config", &cfg)) - }) - assert.NoError(t, err) + assert.NoError(t, config.Watch(ctx)) }() watcher.change(map[string]any{"config": "changed"}) - waitGroup.Wait() - + assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "changed", cfg) } @@ -196,6 +191,8 @@ func (m mapWatcher) Watch(ctx context.Context, fn func(map[string]any)) error { func (m mapWatcher) change(values map[string]any) { m <- values + + time.Sleep(time.Second) // Wait for change gets propagated. } func TestConfig_Watch_error(t *testing.T) { diff --git a/global.go b/global.go index e254c12a..babb50cb 100644 --- a/global.go +++ b/global.go @@ -46,29 +46,22 @@ func Unmarshal(path string, target any) error { return global.Unmarshal(path, target) } -// Watch watches configuration and triggers callbacks when it changes. +// Watch watches and updates configuration when it changes. // It blocks until ctx is done, or the service returns an error. // -// It only can be called once. Call after first returns an error. -func Watch(ctx context.Context, fns ...func()) error { +// It only can be called once. Call after first has no effects. +func Watch(ctx context.Context) error { mux.RLock() defer mux.RUnlock() - return global.Watch( - ctx, - func(*Config) { - for _, fn := range fns { - fn() - } - }, - ) + return global.Watch(ctx) } // SetGlobal makes c the global Config. After this call, // the konf package's functions (e.g. konf.Get) will read from the global config. // // The default global config only loads configuration from environment variables. -func SetGlobal(config *Config) { +func SetGlobal(config Config) { mux.Lock() defer mux.Unlock() diff --git a/global_test.go b/global_test.go index c2c76e4c..fd93cdeb 100644 --- a/global_test.go +++ b/global_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "log" - "sync" "testing" "github.com/ktong/konf" @@ -57,26 +56,14 @@ func TestWatch(t *testing.T) { config, err := konf.New(konf.WithLoader(watcher)) assert.NoError(t, err) konf.SetGlobal(config) - - cfg := konf.Get[string]("config") - assert.Equal(t, "string", cfg) + assert.Equal(t, "string", konf.Get[string]("config")) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - var waitGroup sync.WaitGroup - waitGroup.Add(1) go func() { - err := konf.Watch(ctx, func() { - defer waitGroup.Done() - - cfg = konf.Get[string]("config") - }) - assert.NoError(t, err) + assert.NoError(t, konf.Watch(ctx)) }() watcher.change(map[string]any{"config": "changed"}) - waitGroup.Wait() - - assert.Equal(t, "changed", cfg) + assert.Equal(t, "changed", konf.Get[string]("config")) } diff --git a/option.go b/option.go index ece6c49b..11bd6461 100644 --- a/option.go +++ b/option.go @@ -26,14 +26,14 @@ func WithDelimiter(delimiter string) Option { type Option func(*options) type options struct { - *Config + Config loaders []Loader } func apply(opts []Option) options { option := &options{ - Config: &Config{ + Config: Config{ delimiter: ".", }, } diff --git a/provider.go b/provider.go index 23eafd69..4faf6a4b 100644 --- a/provider.go +++ b/provider.go @@ -28,5 +28,7 @@ type Watcher interface { // WithConfig enables provider uses configuration loaded by providers before it. // It ensures the WithConfig is called before executing methods in Loader and Watcher. type ConfigAware interface { - WithConfig(config *Config) + WithConfig(config interface { + Unmarshal(path string, target any) error + }) } From 6afc811af6f0369ffa309dfe222869340ca16a5f Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 12:55:44 -0800 Subject: [PATCH 2/8] rename --- config.go | 100 +++++++++++++++++----------------- example_test.go | 4 +- provider/fs/benchmark_test.go | 8 +-- provider/fs/fs_test.go | 16 +++--- 4 files changed, 64 insertions(+), 64 deletions(-) diff --git a/config.go b/config.go index 04d91772..46bdc57e 100644 --- a/config.go +++ b/config.go @@ -61,56 +61,6 @@ func New(opts ...Option) (Config, error) { return config, nil } -// Unmarshal loads configuration under the given path into the given object -// pointed to by target. It supports [mapstructure] tags on struct fields. -// -// The path is case-insensitive. -func (c Config) Unmarshal(path string, target any) error { - decoder, err := mapstructure.NewDecoder( - &mapstructure.DecoderConfig{ - Metadata: nil, - Result: target, - WeaklyTypedInput: true, - DecodeHook: mapstructure.ComposeDecodeHookFunc( - mapstructure.StringToTimeDurationHookFunc(), - mapstructure.StringToSliceHookFunc(","), - mapstructure.TextUnmarshallerHookFunc(), - ), - }, - ) - if err != nil { - return fmt.Errorf("[konf] new decoder: %w", err) - } - - if err := decoder.Decode(c.sub(path)); err != nil { - return fmt.Errorf("[konf] decode: %w", err) - } - - return nil -} - -func (c Config) sub(path string) any { - if path == "" { - return c.values.values - } - - var next any = c.values.values - for _, key := range strings.Split(strings.ToLower(path), c.delimiter) { - mp, ok := next.(map[string]any) - if !ok { - return nil - } - - val, exist := mp[key] - if !exist { - return nil - } - next = val - } - - return next -} - // Watch watches and updates configuration when it changes. // It blocks until ctx is done, or the service returns an error. // @@ -191,3 +141,53 @@ type provider struct { watchOnce sync.Once values map[string]any } + +// Unmarshal loads configuration under the given path into the given object +// pointed to by target. It supports [mapstructure] tags on struct fields. +// +// The path is case-insensitive. +func (c Config) Unmarshal(path string, target any) error { + decoder, err := mapstructure.NewDecoder( + &mapstructure.DecoderConfig{ + Metadata: nil, + Result: target, + WeaklyTypedInput: true, + DecodeHook: mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + mapstructure.TextUnmarshallerHookFunc(), + ), + }, + ) + if err != nil { + return fmt.Errorf("[konf] new decoder: %w", err) + } + + if err := decoder.Decode(c.sub(path)); err != nil { + return fmt.Errorf("[konf] decode: %w", err) + } + + return nil +} + +func (c Config) sub(path string) any { + if path == "" { + return c.values.values + } + + var next any = c.values.values + for _, key := range strings.Split(strings.ToLower(path), c.delimiter) { + mp, ok := next.(map[string]any) + if !ok { + return nil + } + + val, exist := mp[key] + if !exist { + return nil + } + next = val + } + + return next +} diff --git a/example_test.go b/example_test.go index 85c918c0..86b87c34 100644 --- a/example_test.go +++ b/example_test.go @@ -9,7 +9,7 @@ import ( "github.com/ktong/konf" "github.com/ktong/konf/provider/env" - pfs "github.com/ktong/konf/provider/fs" + kfs "github.com/ktong/konf/provider/fs" ) func ExampleGet() { @@ -44,7 +44,7 @@ var testdata embed.FS func ExampleSetGlobal() { cfg, err := konf.New( konf.WithLoader( - pfs.New(testdata, "testdata/config.json"), + kfs.New(testdata, "testdata/config.json"), env.New(env.WithPrefix("server")), ), ) diff --git a/provider/fs/benchmark_test.go b/provider/fs/benchmark_test.go index 28672855..1ef5cf83 100644 --- a/provider/fs/benchmark_test.go +++ b/provider/fs/benchmark_test.go @@ -8,7 +8,7 @@ import ( "testing/fstest" "github.com/ktong/konf/internal/assert" - pfs "github.com/ktong/konf/provider/fs" + kfs "github.com/ktong/konf/provider/fs" ) func BenchmarkNew(b *testing.B) { @@ -19,9 +19,9 @@ func BenchmarkNew(b *testing.B) { } b.ResetTimer() - var loader pfs.FS + var loader kfs.FS for i := 0; i < b.N; i++ { - loader = pfs.New(mapFS, "config.json") + loader = kfs.New(mapFS, "config.json") } b.StopTimer() @@ -36,7 +36,7 @@ func BenchmarkLoad(b *testing.B) { Data: []byte(`{"k":"v"}`), }, } - loader := pfs.New(fs, "config.json") + loader := kfs.New(fs, "config.json") b.ResetTimer() var ( diff --git a/provider/fs/fs_test.go b/provider/fs/fs_test.go index 75364bbf..b9513810 100644 --- a/provider/fs/fs_test.go +++ b/provider/fs/fs_test.go @@ -13,7 +13,7 @@ import ( "testing/fstest" "github.com/ktong/konf/internal/assert" - pfs "github.com/ktong/konf/provider/fs" + kfs "github.com/ktong/konf/provider/fs" ) func TestFile_Load(t *testing.T) { @@ -23,7 +23,7 @@ func TestFile_Load(t *testing.T) { description string fs fs.FS path string - opts []pfs.Option + opts []kfs.Option expected map[string]any err string }{ @@ -51,8 +51,8 @@ func TestFile_Load(t *testing.T) { description: "fs file (ignore not exist)", fs: fstest.MapFS{}, path: "not_found.json", - opts: []pfs.Option{ - pfs.IgnoreFileNotExit(), + opts: []kfs.Option{ + kfs.IgnoreFileNotExit(), }, expected: map[string]any{}, }, @@ -64,8 +64,8 @@ func TestFile_Load(t *testing.T) { }, }, path: "config.json", - opts: []pfs.Option{ - pfs.WithUnmarshal(func([]byte, any) error { + opts: []kfs.Option{ + kfs.WithUnmarshal(func([]byte, any) error { return errors.New("unmarshal error") }), }, @@ -79,7 +79,7 @@ func TestFile_Load(t *testing.T) { t.Run(testcase.description, func(t *testing.T) { t.Parallel() - values, err := pfs.New(testcase.fs, testcase.path, testcase.opts...).Load() + values, err := kfs.New(testcase.fs, testcase.path, testcase.opts...).Load() if err != nil { assert.True(t, strings.HasPrefix(err.Error(), testcase.err)) } else { @@ -93,5 +93,5 @@ func TestFile_Load(t *testing.T) { func TestFile_String(t *testing.T) { t.Parallel() - assert.Equal(t, "fs:config.json", pfs.New(fstest.MapFS{}, "config.json").String()) + assert.Equal(t, "fs:config.json", kfs.New(fstest.MapFS{}, "config.json").String()) } From 894f0974fd6528ee97c42cf28578c45ef2ffccac Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 14:30:56 -0800 Subject: [PATCH 3/8] Split Watch and OnChange for watching configuration changes --- CHANGELOG.md | 1 + benchmark_test.go | 7 ++- config.go | 113 +++++++++++++++++++++++++++++----------------- config_test.go | 4 +- global.go | 9 ++++ global_test.go | 6 ++- 6 files changed, 96 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2a13bbd..593d9eae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Changed - Split file and fs provider (#49). +- Split Watch and OnChange for watching configuration changes (#52). ### Removed diff --git a/benchmark_test.go b/benchmark_test.go index ae859f34..6c6ff8b5 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -60,5 +60,10 @@ func BenchmarkWatch(b *testing.B) { } b.StopTimer() - assert.Equal(b, "changed", konf.Get[string]("config")) + var cfg string + config.OnChange(func(unmarshaler konf.Unmarshaler) { + assert.NoError(b, config.Unmarshal("config", &cfg)) + }) + watcher.change(map[string]any{"config": "changed"}) + assert.Equal(b, "changed", cfg) } diff --git a/config.go b/config.go index 46bdc57e..51393caf 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,11 @@ type Config struct { values *provider providers []*provider + onChanges map[string][]func(Unmarshaler) +} + +type Unmarshaler interface { + Unmarshal(path string, target any) error } // New returns a Config with the given Option(s). @@ -29,6 +34,7 @@ func New(opts ...Option) (Config, error) { config := option.Config config.values = &provider{values: make(map[string]any)} config.providers = make([]*provider, 0, len(option.loaders)) + config.onChanges = make(map[string][]func(Unmarshaler)) for _, loader := range option.loaders { if loader == nil { @@ -65,8 +71,8 @@ func New(opts ...Option) (Config, error) { // It blocks until ctx is done, or the service returns an error. // // It only can be called once. Call after first has no effects. -func (c Config) Watch(ctx context.Context) error { //nolint:funlen - changeChan := make(chan struct{}) +func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocognit + changeChan := make(chan []func(Unmarshaler)) defer close(changeChan) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -77,28 +83,37 @@ func (c Config) Watch(ctx context.Context) error { //nolint:funlen waitGroup sync.WaitGroup hasWatcher bool ) - for _, provider := range c.providers { - if provider.watcher != nil { - provider := provider + for _, p := range c.providers { + if p.watcher != nil { + watcher := p - provider.watchOnce.Do(func() { + watcher.watchOnce.Do(func() { hasWatcher = true waitGroup.Add(1) go func() { defer waitGroup.Done() - if err := provider.watcher.Watch( - ctx, - func(values map[string]any) { - provider.values = values - slog.Info( - "Configuration has been changed.", - "watcher", provider.watcher, - ) - changeChan <- struct{}{} - }, - ); err != nil { + onChange := func(values map[string]any) { + slog.Info( + "Configuration has been changed.", + "watcher", watcher.watcher, + ) + + // Find the onChanges should be triggered. + oldValues := &provider{values: watcher.values} + newValues := &provider{values: values} + var callbacks []func(Unmarshaler) + for path, onChanges := range c.onChanges { + if oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil { + callbacks = append(callbacks, onChanges...) + } + } + + watcher.values = values + changeChan <- callbacks + } + if err := watcher.watcher.Watch(ctx, onChange); err != nil { errOnce.Do(func() { firstErr = fmt.Errorf("[konf] watch configuration change: %w", err) cancel() @@ -119,13 +134,17 @@ func (c Config) Watch(ctx context.Context) error { //nolint:funlen for { select { - case <-changeChan: + case onChanges := <-changeChan: values := make(map[string]any) for _, w := range c.providers { maps.Merge(values, w.values) } c.values.values = values + for _, onChange := range onChanges { + onChange(c) + } + case <-ctx.Done(): return } @@ -142,6 +161,40 @@ type provider struct { values map[string]any } +func (p *provider) sub(path string, delimiter string) any { + if path == "" { + return p.values + } + + var next any = p.values + for _, key := range strings.Split(strings.ToLower(path), delimiter) { + mp, ok := next.(map[string]any) + if !ok { + return nil + } + + val, exist := mp[key] + if !exist { + return nil + } + next = val + } + + return next +} + +// OnChange executes the given onChange function while the value of any given path +// (or any value is no paths) have been changed. +func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) { + if len(paths) == 0 { + paths = []string{""} + } + + for _, path := range paths { + c.onChanges[path] = append(c.onChanges[path], onchange) + } +} + // Unmarshal loads configuration under the given path into the given object // pointed to by target. It supports [mapstructure] tags on struct fields. // @@ -163,31 +216,9 @@ func (c Config) Unmarshal(path string, target any) error { return fmt.Errorf("[konf] new decoder: %w", err) } - if err := decoder.Decode(c.sub(path)); err != nil { + if err := decoder.Decode(c.values.sub(path, c.delimiter)); err != nil { return fmt.Errorf("[konf] decode: %w", err) } return nil } - -func (c Config) sub(path string) any { - if path == "" { - return c.values.values - } - - var next any = c.values.values - for _, key := range strings.Split(strings.ToLower(path), c.delimiter) { - mp, ok := next.(map[string]any) - if !ok { - return nil - } - - val, exist := mp[key] - if !exist { - return nil - } - next = val - } - - return next -} diff --git a/config_test.go b/config_test.go index eaa45886..a6551de3 100644 --- a/config_test.go +++ b/config_test.go @@ -167,8 +167,10 @@ func TestConfig_Watch(t *testing.T) { assert.NoError(t, config.Watch(ctx)) }() + config.OnChange(func(unmarshaler konf.Unmarshaler) { + assert.NoError(t, config.Unmarshal("config", &cfg)) + }) watcher.change(map[string]any{"config": "changed"}) - assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "changed", cfg) } diff --git a/global.go b/global.go index babb50cb..54a286b7 100644 --- a/global.go +++ b/global.go @@ -57,6 +57,15 @@ func Watch(ctx context.Context) error { return global.Watch(ctx) } +// OnChange executes the given onChange function while the value of any given path +// (or any value is no paths) have been changed. +func OnChange(onChange func(), paths ...string) { + mux.RLock() + defer mux.RUnlock() + + global.OnChange(func(Unmarshaler) { onChange() }, paths...) +} + // SetGlobal makes c the global Config. After this call, // the konf package's functions (e.g. konf.Get) will read from the global config. // diff --git a/global_test.go b/global_test.go index fd93cdeb..31694d50 100644 --- a/global_test.go +++ b/global_test.go @@ -64,6 +64,10 @@ func TestWatch(t *testing.T) { assert.NoError(t, konf.Watch(ctx)) }() + var cfg string + config.OnChange(func(unmarshaler konf.Unmarshaler) { + assert.NoError(t, config.Unmarshal("config", &cfg)) + }) watcher.change(map[string]any{"config": "changed"}) - assert.Equal(t, "changed", konf.Get[string]("config")) + assert.Equal(t, "changed", cfg) } From f0f1dfb0a27d5ac2378fcd7f3db1d577c4bd137b Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 14:34:39 -0800 Subject: [PATCH 4/8] add doc --- README.md | 6 ++++++ config.go | 2 ++ global.go | 2 ++ 3 files changed, 10 insertions(+) diff --git a/README.md b/README.md index 77c3a4d7..06a757f2 100644 --- a/README.md +++ b/README.md @@ -67,12 +67,18 @@ configuration source(s). They read configuration in terms of functions in packag ``` func (app *appObject) Run() { + // Read the server configuration. type serverConfig struct { Host string Port int } cfg := konf.Get[serverConfig]("server") + // Register callbacks while server configuration changes. + konf.OnChange(func() { + // Reconfig the application object. + }, "server") + // ... use cfg in app code ... } ``` diff --git a/config.go b/config.go index 51393caf..e4191eb3 100644 --- a/config.go +++ b/config.go @@ -185,6 +185,8 @@ func (p *provider) sub(path string, delimiter string) any { // OnChange executes the given onChange function while the value of any given path // (or any value is no paths) have been changed. +// +// It requires Config.Watch has been called. func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) { if len(paths) == 0 { paths = []string{""} diff --git a/global.go b/global.go index 54a286b7..a49d8aeb 100644 --- a/global.go +++ b/global.go @@ -59,6 +59,8 @@ func Watch(ctx context.Context) error { // OnChange executes the given onChange function while the value of any given path // (or any value is no paths) have been changed. +// +// It requires Watch has been called. func OnChange(onChange func(), paths ...string) { mux.RLock() defer mux.RUnlock() From 7c48f92b5e91ab3e099b69f207cedd1ed91e1be7 Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 14:38:03 -0800 Subject: [PATCH 5/8] fix tests --- benchmark_test.go | 4 ++-- global_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmark_test.go b/benchmark_test.go index 6c6ff8b5..f2842da7 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -61,8 +61,8 @@ func BenchmarkWatch(b *testing.B) { b.StopTimer() var cfg string - config.OnChange(func(unmarshaler konf.Unmarshaler) { - assert.NoError(b, config.Unmarshal("config", &cfg)) + konf.OnChange(func() { + assert.NoError(b, konf.Unmarshal("config", &cfg)) }) watcher.change(map[string]any{"config": "changed"}) assert.Equal(b, "changed", cfg) diff --git a/global_test.go b/global_test.go index 31694d50..ee4f201d 100644 --- a/global_test.go +++ b/global_test.go @@ -65,8 +65,8 @@ func TestWatch(t *testing.T) { }() var cfg string - config.OnChange(func(unmarshaler konf.Unmarshaler) { - assert.NoError(t, config.Unmarshal("config", &cfg)) + konf.OnChange(func() { + assert.NoError(t, konf.Unmarshal("config", &cfg)) }) watcher.change(map[string]any{"config": "changed"}) assert.Equal(t, "changed", cfg) From 7124ef45c0b16171d38844bede0cab7f3eb3c029 Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 15:17:32 -0800 Subject: [PATCH 6/8] fix race tests --- benchmark_test.go | 13 ++++++------- config_test.go | 6 +++++- global_test.go | 7 ++++--- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/benchmark_test.go b/benchmark_test.go index f2842da7..dad3a372 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -5,6 +5,7 @@ package konf_test import ( "context" + "sync/atomic" "testing" "github.com/ktong/konf" @@ -55,15 +56,13 @@ func BenchmarkWatch(b *testing.B) { }() b.ResetTimer() + var cfg atomic.Value + konf.OnChange(func() { + cfg.Store(konf.Get[string]("config")) + }) for i := 0; i < b.N; i++ { watcher.change(map[string]any{"config": "changed"}) } b.StopTimer() - - var cfg string - konf.OnChange(func() { - assert.NoError(b, konf.Unmarshal("config", &cfg)) - }) - watcher.change(map[string]any{"config": "changed"}) - assert.Equal(b, "changed", cfg) + assert.Equal(b, "changed", cfg.Load()) } diff --git a/config_test.go b/config_test.go index a6551de3..ad2ce652 100644 --- a/config_test.go +++ b/config_test.go @@ -6,6 +6,7 @@ package konf_test import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -167,11 +168,14 @@ func TestConfig_Watch(t *testing.T) { assert.NoError(t, config.Watch(ctx)) }() + var newCfg atomic.Value config.OnChange(func(unmarshaler konf.Unmarshaler) { + var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) + newCfg.Store(cfg) }) watcher.change(map[string]any{"config": "changed"}) - assert.Equal(t, "changed", cfg) + assert.Equal(t, "changed", newCfg.Load()) } type mapWatcher chan map[string]any diff --git a/global_test.go b/global_test.go index ee4f201d..fbe4cb07 100644 --- a/global_test.go +++ b/global_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "log" + "sync/atomic" "testing" "github.com/ktong/konf" @@ -64,10 +65,10 @@ func TestWatch(t *testing.T) { assert.NoError(t, konf.Watch(ctx)) }() - var cfg string + var cfg atomic.Value konf.OnChange(func() { - assert.NoError(t, konf.Unmarshal("config", &cfg)) + cfg.Store(konf.Get[string]("config")) }) watcher.change(map[string]any{"config": "changed"}) - assert.Equal(t, "changed", cfg) + assert.Equal(t, "changed", cfg.Load()) } From 9f97436e9e23005c1c326e479c63929ea9a6cd58 Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 15:21:06 -0800 Subject: [PATCH 7/8] add doc --- doc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc.go b/doc.go index f0869643..2df20f22 100644 --- a/doc.go +++ b/doc.go @@ -14,8 +14,8 @@ // Config has following main methods: // - Config.Unmarshal loads configuration under the given path // into the given object pointed to by target. -// - Config.Watch reloads configuration and triggers callbacks -// when configuration changes. +// - Config.Watch reloads configuration when it changes. +// - Config.OnChange register callback on configuration changes. // // # Global Config // @@ -26,6 +26,6 @@ // It returns zero value if there is an error while getting configuration. // - Unmarshal loads configuration under the given path // into the given object pointed to by target. -// - Watch reloads configuration and triggers callbacks -// when configuration changes. +// - Watch reloads configuration when it changes. +// - OnChange register callback on configuration changes. package konf From b4b0d1226609b98c3c1d18bf0c63fe6e4eaddb05 Mon Sep 17 00:00:00 2001 From: ktong <kuisong.tong@gmail.com> Date: Sun, 12 Nov 2023 15:30:07 -0800 Subject: [PATCH 8/8] mutex on onChanges --- config.go | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/config.go b/config.go index e4191eb3..5079df0f 100644 --- a/config.go +++ b/config.go @@ -21,7 +21,7 @@ type Config struct { values *provider providers []*provider - onChanges map[string][]func(Unmarshaler) + onChanges *onChanges } type Unmarshaler interface { @@ -34,7 +34,7 @@ func New(opts ...Option) (Config, error) { config := option.Config config.values = &provider{values: make(map[string]any)} config.providers = make([]*provider, 0, len(option.loaders)) - config.onChanges = make(map[string][]func(Unmarshaler)) + config.onChanges = &onChanges{onChanges: make(map[string][]func(Unmarshaler))} for _, loader := range option.loaders { if loader == nil { @@ -71,7 +71,7 @@ func New(opts ...Option) (Config, error) { // It blocks until ctx is done, or the service returns an error. // // It only can be called once. Call after first has no effects. -func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocognit +func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen changeChan := make(chan []func(Unmarshaler)) defer close(changeChan) ctx, cancel := context.WithCancel(ctx) @@ -103,15 +103,11 @@ func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocogn // Find the onChanges should be triggered. oldValues := &provider{values: watcher.values} newValues := &provider{values: values} - var callbacks []func(Unmarshaler) - for path, onChanges := range c.onChanges { - if oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil { - callbacks = append(callbacks, onChanges...) - } - } - + onChanges := c.onChanges.filter(func(path string) bool { + return oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil + }) watcher.values = values - changeChan <- callbacks + changeChan <- onChanges } if err := watcher.watcher.Watch(ctx, onChange); err != nil { errOnce.Do(func() { @@ -188,6 +184,18 @@ func (p *provider) sub(path string, delimiter string) any { // // It requires Config.Watch has been called. func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) { + c.onChanges.append(onchange, paths) +} + +type onChanges struct { + onChanges map[string][]func(Unmarshaler) + mutex sync.RWMutex +} + +func (c *onChanges) append(onchange func(Unmarshaler), paths []string) { + c.mutex.Lock() + defer c.mutex.Unlock() + if len(paths) == 0 { paths = []string{""} } @@ -197,6 +205,20 @@ func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) { } } +func (c *onChanges) filter(predict func(string) bool) []func(Unmarshaler) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + var callbacks []func(Unmarshaler) + for path, onChanges := range c.onChanges { + if predict(path) { + callbacks = append(callbacks, onChanges...) + } + } + + return callbacks +} + // Unmarshal loads configuration under the given path into the given object // pointed to by target. It supports [mapstructure] tags on struct fields. //