Skip to content

Commit

Permalink
Split Watch and OnChange for watching configuration changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong committed Nov 12, 2023
1 parent 6afc811 commit 894f097
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
### Changed

- Split file and fs provider (#49).
- Split Watch and OnChange for watching configuration changes (#52).

### Removed

Expand Down
7 changes: 6 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,10 @@ func BenchmarkWatch(b *testing.B) {
}
b.StopTimer()

assert.Equal(b, "changed", konf.Get[string]("config"))
var cfg string
config.OnChange(func(unmarshaler konf.Unmarshaler) {
assert.NoError(b, config.Unmarshal("config", &cfg))
})
watcher.change(map[string]any{"config": "changed"})
assert.Equal(b, "changed", cfg)
}
113 changes: 72 additions & 41 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type Config struct {

values *provider
providers []*provider
onChanges map[string][]func(Unmarshaler)
}

type Unmarshaler interface {
Unmarshal(path string, target any) error
}

// New returns a Config with the given Option(s).
Expand All @@ -29,6 +34,7 @@ func New(opts ...Option) (Config, error) {
config := option.Config
config.values = &provider{values: make(map[string]any)}
config.providers = make([]*provider, 0, len(option.loaders))
config.onChanges = make(map[string][]func(Unmarshaler))

for _, loader := range option.loaders {
if loader == nil {
Expand Down Expand Up @@ -65,8 +71,8 @@ func New(opts ...Option) (Config, error) {
// It blocks until ctx is done, or the service returns an error.
//
// It only can be called once. Call after first has no effects.
func (c Config) Watch(ctx context.Context) error { //nolint:funlen
changeChan := make(chan struct{})
func (c Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocognit
changeChan := make(chan []func(Unmarshaler))
defer close(changeChan)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -77,28 +83,37 @@ func (c Config) Watch(ctx context.Context) error { //nolint:funlen
waitGroup sync.WaitGroup
hasWatcher bool
)
for _, provider := range c.providers {
if provider.watcher != nil {
provider := provider
for _, p := range c.providers {
if p.watcher != nil {
watcher := p

provider.watchOnce.Do(func() {
watcher.watchOnce.Do(func() {
hasWatcher = true

waitGroup.Add(1)
go func() {
defer waitGroup.Done()

if err := provider.watcher.Watch(
ctx,
func(values map[string]any) {
provider.values = values
slog.Info(
"Configuration has been changed.",
"watcher", provider.watcher,
)
changeChan <- struct{}{}
},
); err != nil {
onChange := func(values map[string]any) {
slog.Info(
"Configuration has been changed.",
"watcher", watcher.watcher,
)

// Find the onChanges should be triggered.
oldValues := &provider{values: watcher.values}
newValues := &provider{values: values}
var callbacks []func(Unmarshaler)
for path, onChanges := range c.onChanges {
if oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil {
callbacks = append(callbacks, onChanges...)
}
}

watcher.values = values
changeChan <- callbacks
}
if err := watcher.watcher.Watch(ctx, onChange); err != nil {
errOnce.Do(func() {
firstErr = fmt.Errorf("[konf] watch configuration change: %w", err)
cancel()
Expand All @@ -119,13 +134,17 @@ func (c Config) Watch(ctx context.Context) error { //nolint:funlen

for {
select {
case <-changeChan:
case onChanges := <-changeChan:
values := make(map[string]any)
for _, w := range c.providers {
maps.Merge(values, w.values)
}
c.values.values = values

for _, onChange := range onChanges {
onChange(c)
}

case <-ctx.Done():
return
}
Expand All @@ -142,6 +161,40 @@ type provider struct {
values map[string]any
}

func (p *provider) sub(path string, delimiter string) any {
if path == "" {
return p.values
}

var next any = p.values
for _, key := range strings.Split(strings.ToLower(path), delimiter) {
mp, ok := next.(map[string]any)
if !ok {
return nil
}

val, exist := mp[key]
if !exist {
return nil
}
next = val
}

return next
}

// OnChange executes the given onChange function while the value of any given path
// (or any value is no paths) have been changed.
func (c Config) OnChange(onchange func(Unmarshaler), paths ...string) {
if len(paths) == 0 {
paths = []string{""}
}

for _, path := range paths {
c.onChanges[path] = append(c.onChanges[path], onchange)
}
}

// Unmarshal loads configuration under the given path into the given object
// pointed to by target. It supports [mapstructure] tags on struct fields.
//
Expand All @@ -163,31 +216,9 @@ func (c Config) Unmarshal(path string, target any) error {
return fmt.Errorf("[konf] new decoder: %w", err)
}

Check warning on line 217 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L216-L217

Added lines #L216 - L217 were not covered by tests

if err := decoder.Decode(c.sub(path)); err != nil {
if err := decoder.Decode(c.values.sub(path, c.delimiter)); err != nil {
return fmt.Errorf("[konf] decode: %w", err)
}

return nil
}

func (c Config) sub(path string) any {
if path == "" {
return c.values.values
}

var next any = c.values.values
for _, key := range strings.Split(strings.ToLower(path), c.delimiter) {
mp, ok := next.(map[string]any)
if !ok {
return nil
}

val, exist := mp[key]
if !exist {
return nil
}
next = val
}

return next
}
4 changes: 3 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ func TestConfig_Watch(t *testing.T) {
assert.NoError(t, config.Watch(ctx))
}()

config.OnChange(func(unmarshaler konf.Unmarshaler) {
assert.NoError(t, config.Unmarshal("config", &cfg))
})
watcher.change(map[string]any{"config": "changed"})
assert.NoError(t, config.Unmarshal("config", &cfg))
assert.Equal(t, "changed", cfg)
}

Expand Down
9 changes: 9 additions & 0 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func Watch(ctx context.Context) error {
return global.Watch(ctx)
}

// OnChange executes the given onChange function while the value of any given path
// (or any value is no paths) have been changed.
func OnChange(onChange func(), paths ...string) {
mux.RLock()
defer mux.RUnlock()

global.OnChange(func(Unmarshaler) { onChange() }, paths...)

Check warning on line 66 in global.go

View check run for this annotation

Codecov / codecov/patch

global.go#L62-L66

Added lines #L62 - L66 were not covered by tests
}

// SetGlobal makes c the global Config. After this call,
// the konf package's functions (e.g. konf.Get) will read from the global config.
//
Expand Down
6 changes: 5 additions & 1 deletion global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func TestWatch(t *testing.T) {
assert.NoError(t, konf.Watch(ctx))
}()

var cfg string
config.OnChange(func(unmarshaler konf.Unmarshaler) {
assert.NoError(t, config.Unmarshal("config", &cfg))
})
watcher.change(map[string]any{"config": "changed"})
assert.Equal(t, "changed", konf.Get[string]("config"))
assert.Equal(t, "changed", cfg)
}

0 comments on commit 894f097

Please sign in to comment.