diff --git a/CHANGELOG.md b/CHANGELOG.md index c2a13bbd..593d9eae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Changed - Split file and fs provider (#49). +- Split Watch and OnChange for watching configuration changes (#52). ### Removed diff --git a/README.md b/README.md index 4f2bc308..06a757f2 100644 --- a/README.md +++ b/README.md @@ -52,9 +52,11 @@ Application also can watch the changes of configuration like: func main() { // ... setup global Config ... - konf.Watch(func(){ - // Read configuration and reconfig application. - }) + go func() { + if err := konf.Watch(ctx); err != nil { + // Handle error here. + } + } // ... other setup code ... } @@ -65,12 +67,18 @@ configuration source(s). They read configuration in terms of functions in packag ``` func (app *appObject) Run() { + // Read the server configuration. type serverConfig struct { Host string Port int } cfg := konf.Get[serverConfig]("server") + // Register callbacks while server configuration changes. + konf.OnChange(func() { + // Reconfig the application object. + }, "server") + // ... use cfg in app code ... } ``` diff --git a/benchmark_test.go b/benchmark_test.go index d2992c95..dad3a372 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -5,7 +5,7 @@ package konf_test import ( "context" - "sync" + "sync/atomic" "testing" "github.com/ktong/konf" @@ -14,7 +14,7 @@ import ( func BenchmarkNew(b *testing.B) { var ( - config *konf.Config + config konf.Config err error ) for i := 0; i < b.N; i++ { @@ -48,27 +48,21 @@ func BenchmarkWatch(b *testing.B) { assert.NoError(b, err) konf.SetGlobal(config) - cfg := konf.Get[string]("config") - assert.Equal(b, "string", cfg) - var waitGroup sync.WaitGroup - waitGroup.Add(b.N) + assert.Equal(b, "string", konf.Get[string]("config")) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { - err := konf.Watch(ctx, func() { - defer waitGroup.Done() - - cfg = konf.Get[string]("config") - }) - assert.NoError(b, err) + assert.NoError(b, konf.Watch(ctx)) }() b.ResetTimer() + var cfg atomic.Value + konf.OnChange(func() { + cfg.Store(konf.Get[string]("config")) + }) for i := 0; i < b.N; i++ { watcher.change(map[string]any{"config": "changed"}) } - waitGroup.Wait() b.StopTimer() - - assert.Equal(b, "changed", cfg) + assert.Equal(b, "changed", cfg.Load()) } diff --git a/config.go b/config.go index 554806fa..5079df0f 100644 --- a/config.go +++ b/config.go @@ -5,7 +5,6 @@ package konf import ( "context" - "errors" "fmt" "log/slog" "strings" @@ -22,15 +21,20 @@ type Config struct { values *provider providers []*provider - watchOnce sync.Once + onChanges *onChanges +} + +type Unmarshaler interface { + Unmarshal(path string, target any) error } // New returns a Config with the given Option(s). -func New(opts ...Option) (*Config, error) { +func New(opts ...Option) (Config, error) { option := apply(opts) config := option.Config config.values = &provider{values: make(map[string]any)} config.providers = make([]*provider, 0, len(option.loaders)) + config.onChanges = &onChanges{onChanges: make(map[string][]func(Unmarshaler))} for _, loader := range option.loaders { if loader == nil { @@ -43,7 +47,7 @@ func New(opts ...Option) (*Config, error) { values, err := loader.Load() if err != nil { - return nil, fmt.Errorf("[konf] load configuration: %w", err) + return Config{}, fmt.Errorf("[konf] load configuration: %w", err) } maps.Merge(config.values.values, values) slog.Info( @@ -63,41 +67,103 @@ func New(opts ...Option) (*Config, error) { return config, nil } -// Unmarshal loads configuration under the given path into the given object -// pointed to by target. It supports [mapstructure] tags on struct fields. +// Watch watches and updates configuration when it changes. +// It blocks until ctx is done, or the service returns an error. // -// The path is case-insensitive. -func (c *Config) Unmarshal(path string, target any) error { - decoder, err := mapstructure.NewDecoder( - &mapstructure.DecoderConfig{ - Metadata: nil, - Result: target, - WeaklyTypedInput: true, - DecodeHook: mapstructure.ComposeDecodeHookFunc( - mapstructure.StringToTimeDurationHookFunc(), - mapstructure.StringToSliceHookFunc(","), - mapstructure.TextUnmarshallerHookFunc(), - ), - }, +// It only can be called once. Call after first has no effects. +func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen + changeChan := make(chan []func(Unmarshaler)) + defer close(changeChan) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + firstErr error + errOnce sync.Once + waitGroup sync.WaitGroup + hasWatcher bool ) - if err != nil { - return fmt.Errorf("[konf] new decoder: %w", err) + for _, p := range c.providers { + if p.watcher != nil { + watcher := p + + watcher.watchOnce.Do(func() { + hasWatcher = true + + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + onChange := func(values map[string]any) { + slog.Info( + "Configuration has been changed.", + "watcher", watcher.watcher, + ) + + // Find the onChanges should be triggered. + oldValues := &provider{values: watcher.values} + newValues := &provider{values: values} + onChanges := c.onChanges.filter(func(path string) bool { + return oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil + }) + watcher.values = values + changeChan <- onChanges + } + if err := watcher.watcher.Watch(ctx, onChange); err != nil { + errOnce.Do(func() { + firstErr = fmt.Errorf("[konf] watch configuration change: %w", err) + cancel() + }) + } + }() + }) + } } - if err := decoder.Decode(c.sub(path)); err != nil { - return fmt.Errorf("[konf] decode: %w", err) + if !hasWatcher { + return nil } - return nil + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + for { + select { + case onChanges := <-changeChan: + values := make(map[string]any) + for _, w := range c.providers { + maps.Merge(values, w.values) + } + c.values.values = values + + for _, onChange := range onChanges { + onChange(c) + } + + case <-ctx.Done(): + return + } + } + }() + waitGroup.Wait() + + return firstErr } -func (c *Config) sub(path string) any { +type provider struct { + watcher Watcher + watchOnce sync.Once + values map[string]any +} + +func (p *provider) sub(path string, delimiter string) any { if path == "" { - return c.values.values + return p.values } - var next any = c.values.values - for _, key := range strings.Split(strings.ToLower(path), c.delimiter) { + var next any = p.values + for _, key := range strings.Split(strings.ToLower(path), delimiter) { mp, ok := next.(map[string]any) if !ok { return nil @@ -113,87 +179,70 @@ func (c *Config) sub(path string) any { return next } -// Watch watches configuration and triggers callbacks when it changes. -// It blocks until ctx is done, or the service returns an error. +// OnChange executes the given onChange function while the value of any given path +// (or any value is no paths) have been changed. // -// It only can be called once. Call after first returns an error. -func (c *Config) Watch(ctx context.Context, fns ...func(*Config)) error { //nolint:funlen - var first bool - c.watchOnce.Do(func() { - first = true - }) - if !first { - return errOnlyOnce - } +// It requires Config.Watch has been called. +func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) { + c.onChanges.append(onchange, paths) +} - changeChan := make(chan struct{}) - defer close(changeChan) - ctx, cancel := context.WithCancel(ctx) - defer cancel() +type onChanges struct { + onChanges map[string][]func(Unmarshaler) + mutex sync.RWMutex +} - var waitGroup sync.WaitGroup - waitGroup.Add(1) - go func() { - defer waitGroup.Done() +func (c *onChanges) append(onchange func(Unmarshaler), paths []string) { + c.mutex.Lock() + defer c.mutex.Unlock() - for { - select { - case <-changeChan: - values := make(map[string]any) - for _, w := range c.providers { - maps.Merge(values, w.values) - } - c.values.values = values + if len(paths) == 0 { + paths = []string{""} + } - for _, fn := range fns { - fn(c) - } + for _, path := range paths { + c.onChanges[path] = append(c.onChanges[path], onchange) + } +} - case <-ctx.Done(): - return - } - } - }() +func (c *onChanges) filter(predict func(string) bool) []func(Unmarshaler) { + c.mutex.RLock() + defer c.mutex.RUnlock() - var ( - firstErr error - errOnce sync.Once - ) - for _, provider := range c.providers { - if provider.watcher != nil { - provider := provider - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - - if err := provider.watcher.Watch( - ctx, - func(values map[string]any) { - provider.values = values - slog.Info( - "Configuration has been changed.", - "watcher", provider.watcher, - ) - changeChan <- struct{}{} - }, - ); err != nil { - errOnce.Do(func() { - firstErr = fmt.Errorf("[konf] watch configuration change: %w", err) - cancel() - }) - } - }() + var callbacks []func(Unmarshaler) + for path, onChanges := range c.onChanges { + if predict(path) { + callbacks = append(callbacks, onChanges...) } } - waitGroup.Wait() - return firstErr + return callbacks } -var errOnlyOnce = errors.New("[konf] Watch only can be called once") +// Unmarshal loads configuration under the given path into the given object +// pointed to by target. It supports [mapstructure] tags on struct fields. +// +// The path is case-insensitive. +func (c Config) Unmarshal(path string, target any) error { + decoder, err := mapstructure.NewDecoder( + &mapstructure.DecoderConfig{ + Metadata: nil, + Result: target, + WeaklyTypedInput: true, + DecodeHook: mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + mapstructure.TextUnmarshallerHookFunc(), + ), + }, + ) + if err != nil { + return fmt.Errorf("[konf] new decoder: %w", err) + } -type provider struct { - watcher Watcher - values map[string]any + if err := decoder.Decode(c.values.sub(path, c.delimiter)); err != nil { + return fmt.Errorf("[konf] decode: %w", err) + } + + return nil } diff --git a/config_test.go b/config_test.go index 9f218fe7..ad2ce652 100644 --- a/config_test.go +++ b/config_test.go @@ -6,8 +6,9 @@ package konf_test import ( "context" "errors" - "sync" + "sync/atomic" "testing" + "time" "github.com/ktong/konf" "github.com/ktong/konf/internal/assert" @@ -19,11 +20,11 @@ func TestConfig_Unmarshal(t *testing.T) { testcases := []struct { description string opts []konf.Option - assert func(*konf.Config) + assert func(konf.Config) }{ { description: "empty values", - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "", cfg) @@ -32,7 +33,7 @@ func TestConfig_Unmarshal(t *testing.T) { { description: "nil loader", opts: []konf.Option{konf.WithLoader(nil)}, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "", cfg) @@ -41,7 +42,7 @@ func TestConfig_Unmarshal(t *testing.T) { { description: "for primary type", opts: []konf.Option{konf.WithLoader(mapLoader{"config": "string"})}, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config", &cfg)) assert.Equal(t, "string", cfg) @@ -50,7 +51,7 @@ func TestConfig_Unmarshal(t *testing.T) { { description: "config for struct", opts: []konf.Option{konf.WithLoader(mapLoader{"config": "struct"})}, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg struct { Config string } @@ -69,7 +70,7 @@ func TestConfig_Unmarshal(t *testing.T) { }, ), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config.nest", &cfg)) assert.Equal(t, "string", cfg) @@ -87,7 +88,7 @@ func TestConfig_Unmarshal(t *testing.T) { }, ), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config_nest", &cfg)) assert.Equal(t, "string", cfg) @@ -104,7 +105,7 @@ func TestConfig_Unmarshal(t *testing.T) { }, ), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var cfg string assert.NoError(t, config.Unmarshal("config.nest", &cfg)) assert.Equal(t, "", cfg) @@ -115,7 +116,7 @@ func TestConfig_Unmarshal(t *testing.T) { opts: []konf.Option{ konf.WithLoader(mapLoader{}), }, - assert: func(config *konf.Config) { + assert: func(config konf.Config) { var configured bool assert.NoError(t, config.Unmarshal("configured", &configured)) assert.True(t, configured) @@ -138,7 +139,11 @@ func TestConfig_Unmarshal(t *testing.T) { type mapLoader map[string]any -func (m mapLoader) WithConfig(*konf.Config) { +func (m mapLoader) WithConfig( + interface { + Unmarshal(path string, target any) error + }, +) { m["configured"] = true } @@ -159,22 +164,18 @@ func TestConfig_Watch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - var waitGroup sync.WaitGroup - waitGroup.Add(1) go func() { - err := config.Watch(ctx, func(config *konf.Config) { - defer waitGroup.Done() - - assert.NoError(t, config.Unmarshal("config", &cfg)) - }) - assert.NoError(t, err) + assert.NoError(t, config.Watch(ctx)) }() + var newCfg atomic.Value + config.OnChange(func(unmarshaler konf.Unmarshaler) { + var cfg string + assert.NoError(t, config.Unmarshal("config", &cfg)) + newCfg.Store(cfg) + }) watcher.change(map[string]any{"config": "changed"}) - waitGroup.Wait() - - assert.Equal(t, "changed", cfg) + assert.Equal(t, "changed", newCfg.Load()) } type mapWatcher chan map[string]any @@ -196,6 +197,8 @@ func (m mapWatcher) Watch(ctx context.Context, fn func(map[string]any)) error { func (m mapWatcher) change(values map[string]any) { m <- values + + time.Sleep(time.Second) // Wait for change gets propagated. } func TestConfig_Watch_error(t *testing.T) { diff --git a/doc.go b/doc.go index f0869643..2df20f22 100644 --- a/doc.go +++ b/doc.go @@ -14,8 +14,8 @@ // Config has following main methods: // - Config.Unmarshal loads configuration under the given path // into the given object pointed to by target. -// - Config.Watch reloads configuration and triggers callbacks -// when configuration changes. +// - Config.Watch reloads configuration when it changes. +// - Config.OnChange register callback on configuration changes. // // # Global Config // @@ -26,6 +26,6 @@ // It returns zero value if there is an error while getting configuration. // - Unmarshal loads configuration under the given path // into the given object pointed to by target. -// - Watch reloads configuration and triggers callbacks -// when configuration changes. +// - Watch reloads configuration when it changes. +// - OnChange register callback on configuration changes. package konf diff --git a/example_test.go b/example_test.go index 85c918c0..86b87c34 100644 --- a/example_test.go +++ b/example_test.go @@ -9,7 +9,7 @@ import ( "github.com/ktong/konf" "github.com/ktong/konf/provider/env" - pfs "github.com/ktong/konf/provider/fs" + kfs "github.com/ktong/konf/provider/fs" ) func ExampleGet() { @@ -44,7 +44,7 @@ var testdata embed.FS func ExampleSetGlobal() { cfg, err := konf.New( konf.WithLoader( - pfs.New(testdata, "testdata/config.json"), + kfs.New(testdata, "testdata/config.json"), env.New(env.WithPrefix("server")), ), ) diff --git a/global.go b/global.go index e254c12a..a49d8aeb 100644 --- a/global.go +++ b/global.go @@ -46,29 +46,33 @@ func Unmarshal(path string, target any) error { return global.Unmarshal(path, target) } -// Watch watches configuration and triggers callbacks when it changes. +// Watch watches and updates configuration when it changes. // It blocks until ctx is done, or the service returns an error. // -// It only can be called once. Call after first returns an error. -func Watch(ctx context.Context, fns ...func()) error { +// It only can be called once. Call after first has no effects. +func Watch(ctx context.Context) error { mux.RLock() defer mux.RUnlock() - return global.Watch( - ctx, - func(*Config) { - for _, fn := range fns { - fn() - } - }, - ) + return global.Watch(ctx) +} + +// OnChange executes the given onChange function while the value of any given path +// (or any value is no paths) have been changed. +// +// It requires Watch has been called. +func OnChange(onChange func(), paths ...string) { + mux.RLock() + defer mux.RUnlock() + + global.OnChange(func(Unmarshaler) { onChange() }, paths...) } // SetGlobal makes c the global Config. After this call, // the konf package's functions (e.g. konf.Get) will read from the global config. // // The default global config only loads configuration from environment variables. -func SetGlobal(config *Config) { +func SetGlobal(config Config) { mux.Lock() defer mux.Unlock() diff --git a/global_test.go b/global_test.go index c2c76e4c..fbe4cb07 100644 --- a/global_test.go +++ b/global_test.go @@ -7,7 +7,7 @@ import ( "bytes" "context" "log" - "sync" + "sync/atomic" "testing" "github.com/ktong/konf" @@ -57,26 +57,18 @@ func TestWatch(t *testing.T) { config, err := konf.New(konf.WithLoader(watcher)) assert.NoError(t, err) konf.SetGlobal(config) - - cfg := konf.Get[string]("config") - assert.Equal(t, "string", cfg) + assert.Equal(t, "string", konf.Get[string]("config")) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - var waitGroup sync.WaitGroup - waitGroup.Add(1) go func() { - err := konf.Watch(ctx, func() { - defer waitGroup.Done() - - cfg = konf.Get[string]("config") - }) - assert.NoError(t, err) + assert.NoError(t, konf.Watch(ctx)) }() + var cfg atomic.Value + konf.OnChange(func() { + cfg.Store(konf.Get[string]("config")) + }) watcher.change(map[string]any{"config": "changed"}) - waitGroup.Wait() - - assert.Equal(t, "changed", cfg) + assert.Equal(t, "changed", cfg.Load()) } diff --git a/option.go b/option.go index ece6c49b..11bd6461 100644 --- a/option.go +++ b/option.go @@ -26,14 +26,14 @@ func WithDelimiter(delimiter string) Option { type Option func(*options) type options struct { - *Config + Config loaders []Loader } func apply(opts []Option) options { option := &options{ - Config: &Config{ + Config: Config{ delimiter: ".", }, } diff --git a/provider.go b/provider.go index 23eafd69..4faf6a4b 100644 --- a/provider.go +++ b/provider.go @@ -28,5 +28,7 @@ type Watcher interface { // WithConfig enables provider uses configuration loaded by providers before it. // It ensures the WithConfig is called before executing methods in Loader and Watcher. type ConfigAware interface { - WithConfig(config *Config) + WithConfig(config interface { + Unmarshal(path string, target any) error + }) } diff --git a/provider/fs/benchmark_test.go b/provider/fs/benchmark_test.go index 28672855..1ef5cf83 100644 --- a/provider/fs/benchmark_test.go +++ b/provider/fs/benchmark_test.go @@ -8,7 +8,7 @@ import ( "testing/fstest" "github.com/ktong/konf/internal/assert" - pfs "github.com/ktong/konf/provider/fs" + kfs "github.com/ktong/konf/provider/fs" ) func BenchmarkNew(b *testing.B) { @@ -19,9 +19,9 @@ func BenchmarkNew(b *testing.B) { } b.ResetTimer() - var loader pfs.FS + var loader kfs.FS for i := 0; i < b.N; i++ { - loader = pfs.New(mapFS, "config.json") + loader = kfs.New(mapFS, "config.json") } b.StopTimer() @@ -36,7 +36,7 @@ func BenchmarkLoad(b *testing.B) { Data: []byte(`{"k":"v"}`), }, } - loader := pfs.New(fs, "config.json") + loader := kfs.New(fs, "config.json") b.ResetTimer() var ( diff --git a/provider/fs/fs_test.go b/provider/fs/fs_test.go index 75364bbf..b9513810 100644 --- a/provider/fs/fs_test.go +++ b/provider/fs/fs_test.go @@ -13,7 +13,7 @@ import ( "testing/fstest" "github.com/ktong/konf/internal/assert" - pfs "github.com/ktong/konf/provider/fs" + kfs "github.com/ktong/konf/provider/fs" ) func TestFile_Load(t *testing.T) { @@ -23,7 +23,7 @@ func TestFile_Load(t *testing.T) { description string fs fs.FS path string - opts []pfs.Option + opts []kfs.Option expected map[string]any err string }{ @@ -51,8 +51,8 @@ func TestFile_Load(t *testing.T) { description: "fs file (ignore not exist)", fs: fstest.MapFS{}, path: "not_found.json", - opts: []pfs.Option{ - pfs.IgnoreFileNotExit(), + opts: []kfs.Option{ + kfs.IgnoreFileNotExit(), }, expected: map[string]any{}, }, @@ -64,8 +64,8 @@ func TestFile_Load(t *testing.T) { }, }, path: "config.json", - opts: []pfs.Option{ - pfs.WithUnmarshal(func([]byte, any) error { + opts: []kfs.Option{ + kfs.WithUnmarshal(func([]byte, any) error { return errors.New("unmarshal error") }), }, @@ -79,7 +79,7 @@ func TestFile_Load(t *testing.T) { t.Run(testcase.description, func(t *testing.T) { t.Parallel() - values, err := pfs.New(testcase.fs, testcase.path, testcase.opts...).Load() + values, err := kfs.New(testcase.fs, testcase.path, testcase.opts...).Load() if err != nil { assert.True(t, strings.HasPrefix(err.Error(), testcase.err)) } else { @@ -93,5 +93,5 @@ func TestFile_Load(t *testing.T) { func TestFile_String(t *testing.T) { t.Parallel() - assert.Equal(t, "fs:config.json", pfs.New(fstest.MapFS{}, "config.json").String()) + assert.Equal(t, "fs:config.json", kfs.New(fstest.MapFS{}, "config.json").String()) }