Skip to content

Commit

Permalink
use config.Load
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong committed Nov 16, 2023
1 parent 4b7273b commit b1f46de
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 149 deletions.
7 changes: 1 addition & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Changed

- Split file and fs provider (#49).
- Split Watch and OnChange for watching configuration changes (#52).

### Removed

- Remove konf.Logger in favor of slog (#48).
- [BREAKING] Redesign API.

## [v0.2.0] - 3/18/2023

Expand Down
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ configuration source(s) (implementation) it actually wants to use. Something lik
var config embed.FS
func main() {
// Create the global Config that loads configuration
// from embed file system and environment variables.
config, err := konf.New(
konf.WithLoader(
fs.New(config, "config/config.json"),
env.New(env.WithPrefix("server")),
),
)
if err != nil {
// Create the Config.
config := konf.New()
// Load configuration from embed file system and environment variables.
if err := config.Load(
fs.New(config, "config/config.json"),
env.New(env.WithPrefix("server")),
); err != nil {
// Handle error here.
}
Expand Down
9 changes: 6 additions & 3 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func BenchmarkNew(b *testing.B) {
err error
)
for i := 0; i < b.N; i++ {
config, err = konf.New(konf.WithLoader(mapLoader{"k": "v"}))
config = konf.New()
err = config.Load(mapLoader{"k": "v"})
}
b.StopTimer()

Expand All @@ -26,7 +27,8 @@ func BenchmarkNew(b *testing.B) {
}

func BenchmarkGet(b *testing.B) {
config, err := konf.New(konf.WithLoader(mapLoader{"k": "v"}))
config := konf.New()
err := config.Load(mapLoader{"k": "v"})
assert.NoError(b, err)
konf.SetGlobal(config)
b.ResetTimer()
Expand All @@ -41,7 +43,8 @@ func BenchmarkGet(b *testing.B) {
}

func BenchmarkUnmarshal(b *testing.B) {
config, err := konf.New(konf.WithLoader(mapLoader{"k": "v"}))
config := konf.New()
err := config.Load(mapLoader{"k": "v"})
assert.NoError(b, err)
konf.SetGlobal(config)
b.ResetTimer()
Expand Down
162 changes: 88 additions & 74 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,57 @@ type Config struct {
delimiter string
tagName string

values *provider
values map[string]any
providers []*provider

onChanges map[string][]func(Unmarshaler)
onChangesChannel chan []func(Unmarshaler)
onChanges map[string][]func(*Config)
onChangesChannel chan []func(*Config)
onChangesMutex sync.RWMutex

watchOnce sync.Once
}

type Unmarshaler interface {
Unmarshal(path string, target any) error
type provider struct {
values map[string]any
watcher Watcher
}

// New returns a Config with the given Option(s).
func New(opts ...Option) (*Config, error) {
func New(opts ...Option) *Config {
option := &options{
Config: Config{
delimiter: ".",
tagName: "konf",
decodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
textUnmarshalerHookFunc(),
),
},
delimiter: ".",
tagName: "konf",
decodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
textUnmarshalerHookFunc(),
),
values: make(map[string]any),
onChanges: make(map[string][]func(*Config)),
}
for _, opt := range opts {
opt(option)
}
option.values = &provider{values: make(map[string]any)}
option.providers = make([]*provider, 0, len(option.loaders))
option.onChanges = make(map[string][]func(Unmarshaler))

for _, loader := range option.loaders {
return (*Config)(option)
}

// Load loads configuration from given loaders.
//
// Each loader takes precedence over the loaders before it
// while multiple loaders are specified.
//
// This method can be called multiple times but it is not concurrency-safe.
func (c *Config) Load(loaders ...Loader) error {
for _, loader := range loaders {
if loader == nil {
continue
}

values, err := loader.Load()
if err != nil {
return nil, fmt.Errorf("load configuration: %w", err)
return fmt.Errorf("load configuration: %w", err)
}
maps.Merge(option.values.values, values)
maps.Merge(c.values, values)
slog.Info(
"Configuration has been loaded.",
"loader", loader,
Expand All @@ -76,18 +86,27 @@ func New(opts ...Option) (*Config, error) {
if w, ok := loader.(Watcher); ok {
provider.watcher = w
}
option.providers = append(option.providers, provider)
c.providers = append(c.providers, provider)
}

return &option.Config, nil
return nil
}

// Watch watches and updates configuration when it changes.
// It blocks until ctx is done, or the service returns an error.
// WARNING: All loaders passed in Load after calling Watch do not get watched.
//
// It only can be called once. Call after first has no effects.
func (c *Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocognit
c.onChangesChannel = make(chan []func(Unmarshaler))
initialized := true
c.watchOnce.Do(func() {
initialized = false
})
if initialized {
return nil
}

c.onChangesChannel = make(chan []func(*Config))
defer close(c.onChangesChannel)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -104,7 +123,7 @@ func (c *Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocog
for _, w := range c.providers {
maps.Merge(values, w.values)
}
c.values.values = values
c.values = values

for _, onChange := range onChanges {
onChange(c)
Expand All @@ -117,47 +136,44 @@ func (c *Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocog
}()

errChan := make(chan error, len(c.providers))
for _, p := range c.providers {
if p.watcher != nil {
watcher := p

watcher.watchOnce.Do(func() {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()

onChange := func(values map[string]any) {
slog.Info(
"Configuration has been changed.",
"watcher", watcher.watcher,
)

// Find the onChanges should be triggered.
oldValues := &provider{values: watcher.values}
newValues := &provider{values: values}
onChanges := func() []func(Unmarshaler) {
c.onChangesMutex.RLock()
defer c.onChangesMutex.RUnlock()

var callbacks []func(Unmarshaler)
for path, onChanges := range c.onChanges {
if oldValues.sub(path, c.delimiter) != nil || newValues.sub(path, c.delimiter) != nil {
callbacks = append(callbacks, onChanges...)
}
for _, provider := range c.providers {
if provider.watcher != nil {
provider := provider

waitGroup.Add(1)
go func() {
defer waitGroup.Done()

onChange := func(values map[string]any) {
slog.Info(
"Configuration has been changed.",
"provider", provider.watcher,
)

// Find the onChanges should be triggered.
oldValues := provider.values
provider.values = values

onChanges := func() []func(*Config) {
c.onChangesMutex.RLock()
defer c.onChangesMutex.RUnlock()

var callbacks []func(*Config)
for path, onChanges := range c.onChanges {
if sub(oldValues, path, c.delimiter) != nil || sub(values, path, c.delimiter) != nil {
callbacks = append(callbacks, onChanges...)
}

return callbacks
}

watcher.values = values
c.onChangesChannel <- onChanges()
}
if err := watcher.watcher.Watch(ctx, onChange); err != nil {
cancel()
errChan <- fmt.Errorf("watch configuration change: %w", err)
return callbacks
}
}()
})
c.onChangesChannel <- onChanges()
}
if err := provider.watcher.Watch(ctx, onChange); err != nil {
errChan <- fmt.Errorf("watch configuration change: %w", err)
cancel()
}
}()
}
}
waitGroup.Wait()
Expand All @@ -171,19 +187,13 @@ func (c *Config) Watch(ctx context.Context) error { //nolint:cyclop,funlen,gocog
return err
}

type provider struct {
values map[string]any
watcher Watcher
watchOnce sync.Once
}

func (p *provider) sub(path string, delimiter string) any {
func sub(values map[string]any, path string, delimiter string) any {
if path == "" {
return p.values
return values
}

var next any = p.values
for _, key := range strings.Split(strings.ToLower(path), delimiter) {
var next any = values
for _, key := range strings.Split(path, delimiter) {
mp, ok := next.(map[string]any)
if !ok {
return nil
Expand All @@ -201,9 +211,12 @@ func (p *provider) sub(path string, delimiter string) any {

// OnChange executes the given onChange function while the value of any given path
// (or any value is no paths) have been changed.
//
// It requires Config.Watch has been called.
func (c *Config) OnChange(onchange func(Unmarshaler), paths ...string) {
//
// The paths are case-insensitive.
//
// This method is concurrency-safe.
func (c *Config) OnChange(onchange func(*Config), paths ...string) {
c.onChangesMutex.Lock()
defer c.onChangesMutex.Unlock()

Expand All @@ -212,6 +225,7 @@ func (c *Config) OnChange(onchange func(Unmarshaler), paths ...string) {
}

for _, path := range paths {
path = strings.ToLower(path)
c.onChanges[path] = append(c.onChanges[path], onchange)
}
}
Expand All @@ -233,7 +247,7 @@ func (c *Config) Unmarshal(path string, target any) error {
return fmt.Errorf("new decoder: %w", err)

Check warning on line 247 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L247

Added line #L247 was not covered by tests
}

if err := decoder.Decode(c.values.sub(path, c.delimiter)); err != nil {
if err := decoder.Decode(sub(c.values, strings.ToLower(path), c.delimiter)); err != nil {
return fmt.Errorf("decode: %w", err)
}

Expand Down
Loading

0 comments on commit b1f46de

Please sign in to comment.