diff --git a/docker-compose.yml b/docker-compose.yml index 250d5c11..d26ec0bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ networks: services: consul: container_name: harvester_consul_dev - image: consul:1.8.0 + image: consul:1.15 networks: - consul-network ports: diff --git a/examples/main.go b/examples/main.go index 2940be60..22fc5bc3 100644 --- a/examples/main.go +++ b/examples/main.go @@ -67,10 +67,12 @@ func main() { redisClient := createRedisClient() - h, err := harvester.New(&cfg). - WithConsulSeed(consulAddress, consulDC, consulToken, 0).WithConsulMonitor(consulAddress, consulDC, consulToken, 0). - WithRedisSeed(redisClient).WithRedisMonitor(redisClient, 200*time.Millisecond). - WithNotification(chNotify).Create() + h, err := harvester.New(&cfg, chNotify, + harvester.WithConsulSeed(consulAddress, consulDC, consulToken, 0), + harvester.WithConsulMonitor(consulAddress, consulDC, consulToken, 0), + harvester.WithRedisSeed(redisClient), + harvester.WithRedisMonitor(redisClient, 200*time.Millisecond), + ) if err != nil { log.Fatalf("failed to create harvester: %v", err) } diff --git a/harvester.go b/harvester.go index b66b73ff..d38b24bb 100644 --- a/harvester.go +++ b/harvester.go @@ -2,18 +2,10 @@ package harvester import ( "context" - "errors" - "log/slog" - "time" "github.com/beatlabs/harvester/config" "github.com/beatlabs/harvester/monitor" - "github.com/beatlabs/harvester/monitor/consul" - redismon "github.com/beatlabs/harvester/monitor/redis" "github.com/beatlabs/harvester/seed" - seedconsul "github.com/beatlabs/harvester/seed/consul" - seedredis "github.com/beatlabs/harvester/seed/redis" - "github.com/go-redis/redis/v8" ) // Seeder interface for seeding initial values of the configuration. @@ -51,255 +43,37 @@ func (h *harvester) Harvest(ctx context.Context) error { return h.monitor.Monitor(ctx) } -type consulConfig struct { - addr, dataCenter, token, folderPrefix string - timeout time.Duration -} - -// Builder of a harvester instance. -type Builder struct { - cfg interface{} - seedConsulCfg *consulConfig - monitorConsulCfg *consulConfig - err error - chNotify chan<- config.ChangeNotification - monitorRedisClient redis.UniversalClient - seedRedisClient redis.UniversalClient - monitorRedisPollInterval time.Duration -} - -// New constructor. -func New(cfg interface{}) *Builder { - return &Builder{cfg: cfg} -} - -// WithNotification constructor. -func (b *Builder) WithNotification(chNotify chan<- config.ChangeNotification) *Builder { - if b.err != nil { - return b - } - - if chNotify == nil { - b.err = errors.New("notification channel is nil") - return b - } - - b.chNotify = chNotify - return b -} - -// WithConsulSeed enables support for seeding values with consul. -func (b *Builder) WithConsulSeed(addr, dataCenter, token string, timeout time.Duration) *Builder { - return b.WithConsulSeedWithPrefix(addr, dataCenter, token, "", timeout) -} - -// WithConsulSeedWithPrefix enables support for seeding values with consul including a folder prefix. -func (b *Builder) WithConsulSeedWithPrefix(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder { - if b.err != nil { - return b - } - - b.seedConsulCfg = &consulConfig{ - addr: addr, - dataCenter: dataCenter, - token: token, - folderPrefix: folderPrefix, - timeout: timeout, - } - return b -} - -// WithConsulMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config -// and monitors every field found tagged with ConsulLogger. -func (b *Builder) WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) *Builder { - return b.WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout) -} - -// WithConsulFolderPrefixMonitor enables support for monitoring key/prefixes on ConsulLogger. It automatically parses the config -// and monitors every field found tagged with ConsulLogger. -func (b *Builder) WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) *Builder { - if b.err != nil { - return b - } - - b.monitorConsulCfg = &consulConfig{ - addr: addr, - dataCenter: dataCenter, - token: token, - folderPrefix: folderPrefix, - timeout: timeout, - } - return b -} - -// WithRedisSeed enables support for seeding values with redis. -func (b *Builder) WithRedisSeed(client redis.UniversalClient) *Builder { - if b.err != nil { - return b - } - - if client == nil { - b.err = errors.New("redis seed client is nil") - return b - } - b.seedRedisClient = client - return b -} - -// WithRedisMonitor enables support for monitoring keys in Redis. It automatically parses the config -// and monitors every field found tagged with ConsulLogger. -func (b *Builder) WithRedisMonitor(client redis.UniversalClient, pollInterval time.Duration) *Builder { - if b.err != nil { - return b - } - - if client == nil { - b.err = errors.New("redis monitor client is nil") - return b - } - - if pollInterval <= 0 { - b.err = errors.New("redis monitor poll interval should be a positive number") - return b - } - - b.monitorRedisClient = client - b.monitorRedisPollInterval = pollInterval - return b -} - -// Create the harvester instance. -func (b *Builder) Create() (Harvester, error) { - if b.err != nil { - return nil, b.err - } - - cfg, err := config.New(b.cfg, b.chNotify) - if err != nil { - return nil, err - } - - sd, err := b.setupSeeding() - if err != nil { - return nil, err - } - - mon, err := b.setupMonitoring(cfg) - if err != nil { - return nil, err - } - - return &harvester{seeder: sd, monitor: mon, cfg: cfg}, nil -} - -func (b *Builder) setupSeeding() (Seeder, error) { - pp := make([]seed.Param, 0) - - consulSeedParam, err := b.setupConsulSeeding() - if err != nil { - return nil, err - } - - if consulSeedParam != nil { - pp = append(pp, *consulSeedParam) - } - - redisSeedParam, err := b.setupRedisSeeding() - if err != nil { - return nil, err - } - - if redisSeedParam != nil { - pp = append(pp, *redisSeedParam) - } - - return seed.New(pp...), nil -} - -func (b *Builder) setupConsulSeeding() (*seed.Param, error) { - if b.seedConsulCfg == nil { - return nil, nil - } - - getter, err := seedconsul.NewWithFolderPrefix(b.seedConsulCfg.addr, b.seedConsulCfg.dataCenter, b.seedConsulCfg.token, b.seedConsulCfg.folderPrefix, - b.seedConsulCfg.timeout) +// New constructor with functional options support. +// Notification channel is optional and can be nil. +func New(cfg interface{}, ch chan<- config.ChangeNotification, oo ...OptionFunc) (Harvester, error) { + hCfg, err := config.New(cfg, ch) if err != nil { return nil, err } - return seed.NewParam(config.SourceConsul, getter) -} - -func (b *Builder) setupRedisSeeding() (*seed.Param, error) { - if b.seedRedisClient == nil { - return nil, nil + opt := &options{ + cfg: hCfg, } - getter, err := seedredis.New(b.seedRedisClient) - if err != nil { - return nil, err + for _, option := range oo { + err = option(opt) + if err != nil { + return nil, err + } } - return seed.NewParam(config.SourceRedis, getter) -} - -func (b *Builder) setupMonitoring(cfg *config.Config) (Monitor, error) { - var watchers []monitor.Watcher + sd := seed.New(opt.seedParams...) - consulWatcher, err := b.setupConsulMonitoring(cfg) - if err != nil { - return nil, err - } + var mon *monitor.Monitor - if consulWatcher != nil { - watchers = append(watchers, consulWatcher) + if len(opt.monitorParams) == 0 { + return &harvester{cfg: hCfg, seeder: sd, monitor: nil}, nil } - redisWatcher, err := b.setupRedisMonitoring(cfg) + mon, err = monitor.New(opt.cfg, opt.monitorParams...) if err != nil { return nil, err } - if redisWatcher != nil { - watchers = append(watchers, redisWatcher) - } - - if len(watchers) == 0 { - return nil, nil - } - - return monitor.New(cfg, watchers...) -} - -func (b *Builder) setupConsulMonitoring(cfg *config.Config) (*consul.Watcher, error) { - if b.monitorConsulCfg == nil { - return nil, nil - } - items := make([]consul.Item, 0) - for _, field := range cfg.Fields { - consulKey, ok := field.Sources()[config.SourceConsul] - if !ok { - continue - } - slog.Debug("monitoring consul", "key", consulKey) - items = append(items, consul.NewKeyItemWithPrefix(consulKey, b.monitorConsulCfg.folderPrefix)) - } - return consul.New(b.monitorConsulCfg.addr, b.monitorConsulCfg.dataCenter, b.monitorConsulCfg.token, - b.monitorConsulCfg.timeout, items...) -} - -func (b *Builder) setupRedisMonitoring(cfg *config.Config) (*redismon.Watcher, error) { - if b.monitorRedisClient == nil { - return nil, nil - } - items := make([]string, 0) - for _, field := range cfg.Fields { - redisKey, ok := field.Sources()[config.SourceRedis] - if !ok { - continue - } - slog.Debug("monitoring redis", "key", redisKey) - items = append(items, redisKey) - } - return redismon.New(b.monitorRedisClient, b.monitorRedisPollInterval, items) + return &harvester{cfg: hCfg, seeder: sd, monitor: mon}, nil } diff --git a/harvester_integration_test.go b/harvester_integration_test.go index 987d2e7c..86a5f765 100644 --- a/harvester_integration_test.go +++ b/harvester_integration_test.go @@ -75,12 +75,11 @@ func TestMain(m *testing.M) { func Test_harvester_Harvest(t *testing.T) { cfg := testConfigWithSecret{} - h, err := New(&cfg). - WithConsulSeed(addr, "", "", 0). - WithConsulMonitor(addr, "", "", 0). - WithRedisSeed(redisClient). - WithRedisMonitor(redisClient, 10*time.Millisecond). - Create() + h, err := New(&cfg, nil, + WithConsulSeed(addr, "", "", 0), + WithConsulMonitor(addr, "", "", 0), + WithRedisSeed(redisClient), + WithRedisMonitor(redisClient, 10*time.Millisecond)) require.NoError(t, err) ctx, cnl := context.WithCancel(context.Background()) diff --git a/harvester_test.go b/harvester_test.go index 8cd5b631..56911abf 100644 --- a/harvester_test.go +++ b/harvester_test.go @@ -2,7 +2,6 @@ package harvester import ( "context" - "path/filepath" "testing" "time" @@ -54,7 +53,7 @@ func TestCreateWithConsulAndRedis(t *testing.T) { seedRedisClient: nil, monitorRedisClient: redisClient, monitoringPollInterval: 10 * time.Millisecond, - }, expectedErr: "redis seed client is nil", + }, expectedErr: "client is nil", }, "invalid redis monitor client": { args: args{ @@ -63,7 +62,7 @@ func TestCreateWithConsulAndRedis(t *testing.T) { seedRedisClient: redisClient, monitorRedisClient: nil, monitoringPollInterval: 10 * time.Millisecond, - }, expectedErr: "redis monitor client is nil", + }, expectedErr: "client is nil", }, "invalid redis monitor poll interval": { args: args{ @@ -86,13 +85,13 @@ func TestCreateWithConsulAndRedis(t *testing.T) { } for name, tt := range tests { t.Run(name, func(t *testing.T) { - got, err := New(tt.args.cfg). - WithConsulSeed(tt.args.consulAddress, "", "", 0). - WithConsulMonitor(tt.args.consulAddress, "", "", 0). - WithConsulFolderPrefixMonitor(tt.args.consulAddress, "", "", "", 0). - WithRedisSeed(tt.args.seedRedisClient). - WithRedisMonitor(tt.args.monitorRedisClient, tt.args.monitoringPollInterval). - Create() + got, err := New(tt.args.cfg, nil, + WithConsulSeed(tt.args.consulAddress, "", "", 0), + WithConsulMonitor(tt.args.consulAddress, "", "", 0), + WithConsulFolderPrefixMonitor(tt.args.consulAddress, "", "", "", 0), + WithRedisSeed(tt.args.seedRedisClient), + WithRedisMonitor(tt.args.monitorRedisClient, tt.args.monitoringPollInterval)) + if tt.expectedErr != "" { assert.EqualError(t, err, tt.expectedErr) assert.Nil(t, got) @@ -110,65 +109,23 @@ func TestWithNotification(t *testing.T) { chNotify chan<- config.ChangeNotification } tests := map[string]struct { - args args - wantErr bool + args args }{ - "nil notify channel": {args: args{cfg: &testConfig{}, chNotify: nil}, wantErr: true}, - "success": {args: args{cfg: &testConfig{}, chNotify: make(chan config.ChangeNotification)}, wantErr: false}, + "nil notify channel": {args: args{cfg: &testConfig{}, chNotify: nil}}, + "success": {args: args{cfg: &testConfig{}, chNotify: make(chan config.ChangeNotification)}}, } for name, tt := range tests { t.Run(name, func(t *testing.T) { - got, err := New(tt.args.cfg).WithNotification(tt.args.chNotify).Create() - if tt.wantErr { - assert.Error(t, err) - assert.Nil(t, got) - } else { - assert.NoError(t, err) - assert.NotNil(t, got) - } - }) - } -} - -func TestWithConsulFolderPrefixMonitor(t *testing.T) { - tests := []struct { - Name string - InputFolderPrefix string - ExpectedKeyLocation string - }{ - { - Name: "Setup Consul with folder prefix", - InputFolderPrefix: "folder/prefix", - ExpectedKeyLocation: "folder/prefix/key1", - }, - { - Name: "Setup Consul with empty folder prefix", - ExpectedKeyLocation: "key1", - }, - { - Name: "Setup Consul with folder prefix trailing /", - InputFolderPrefix: "folder/prefix/", - ExpectedKeyLocation: "folder/prefix/key1", - }, - } - - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - builder := New(testConfig{}) - builder.WithConsulFolderPrefixMonitor("addr", "data-center", "token", test.InputFolderPrefix, time.Second*42) - - assert.Equal(t, "addr", builder.monitorConsulCfg.addr) - assert.Equal(t, "data-center", builder.monitorConsulCfg.dataCenter) - assert.Equal(t, "token", builder.monitorConsulCfg.token) - assert.Equal(t, time.Second*42, builder.monitorConsulCfg.timeout) - assert.Equal(t, test.ExpectedKeyLocation, filepath.Join(builder.monitorConsulCfg.folderPrefix, "key1")) + got, err := New(tt.args.cfg, tt.args.chNotify) + assert.NoError(t, err) + assert.NotNil(t, got) }) } } func TestCreate_NoConsulOrRedis(t *testing.T) { cfg := &testConfigNoConsul{} - got, err := New(cfg).Create() + got, err := New(cfg, nil) assert.NoError(t, err) assert.NotNil(t, got) ctx, cnl := context.WithCancel(context.Background()) @@ -185,7 +142,7 @@ func TestCreate_NoConsulOrRedis(t *testing.T) { func TestCreate_SeedError(t *testing.T) { cfg := &testConfigSeedError{} - got, err := New(cfg).Create() + got, err := New(cfg, nil) assert.NoError(t, err) assert.NotNil(t, got) ctx, cnl := context.WithCancel(context.Background()) diff --git a/options.go b/options.go new file mode 100644 index 00000000..29d452cf --- /dev/null +++ b/options.go @@ -0,0 +1,120 @@ +package harvester + +import ( + "errors" + "time" + + "github.com/beatlabs/harvester/config" + "github.com/beatlabs/harvester/monitor" + "github.com/beatlabs/harvester/monitor/consul" + redismon "github.com/beatlabs/harvester/monitor/redis" + "github.com/beatlabs/harvester/seed" + seedconsul "github.com/beatlabs/harvester/seed/consul" + seedredis "github.com/beatlabs/harvester/seed/redis" + "github.com/go-redis/redis/v8" +) + +type options struct { + cfg *config.Config + seedParams []seed.Param + monitorParams []monitor.Watcher +} + +// OptionFunc is used to configure harvester in an optional manner. +type OptionFunc func(opts *options) error + +// WithConsulSeedWithPrefix set's up Consul seeder to use prefixes. +func WithConsulSeedWithPrefix(addr, dataCenter, token, folderPrefix string, timeout time.Duration) OptionFunc { + return func(opts *options) error { + getter, err := seedconsul.NewWithFolderPrefix(addr, dataCenter, token, folderPrefix, timeout) + if err != nil { + return err + } + + prm, err := seed.NewParam(config.SourceConsul, getter) + if err != nil { + return err + } + + opts.seedParams = append(opts.seedParams, *prm) + + return nil + } +} + +// WithConsulSeed set's up a Consul seeder. +func WithConsulSeed(addr, dataCenter, token string, timeout time.Duration) OptionFunc { + return WithConsulSeedWithPrefix(addr, dataCenter, token, "", timeout) +} + +// WithConsulFolderPrefixMonitor set's up a Consul monitor to use prefixes. +func WithConsulFolderPrefixMonitor(addr, dataCenter, token, folderPrefix string, timeout time.Duration) OptionFunc { + return func(opts *options) error { + items := make([]consul.Item, 0) + for _, field := range opts.cfg.Fields { + consulKey, ok := field.Sources()[config.SourceConsul] + if !ok { + continue + } + items = append(items, consul.NewKeyItemWithPrefix(consulKey, folderPrefix)) + } + + prm, err := consul.New(addr, dataCenter, token, timeout, items...) + if err != nil { + return err + } + + opts.monitorParams = append(opts.monitorParams, prm) + + return nil + } +} + +// WithConsulMonitor set's up a Consul monitor. +func WithConsulMonitor(addr, dataCenter, token string, timeout time.Duration) OptionFunc { + return WithConsulFolderPrefixMonitor(addr, dataCenter, token, "", timeout) +} + +// WithConsulSeed set's up a Redis seeder. +func WithRedisSeed(client redis.UniversalClient) OptionFunc { + return func(opts *options) error { + getter, err := seedredis.New(client) + if err != nil { + return err + } + + prm, err := seed.NewParam(config.SourceRedis, getter) + if err != nil { + return err + } + + opts.seedParams = append(opts.seedParams, *prm) + + return nil + } +} + +// WithRedisMonitor set's up a Redis monitor. +func WithRedisMonitor(client redis.UniversalClient, pollInterval time.Duration) OptionFunc { + return func(opts *options) error { + if pollInterval <= 0 { + return errors.New("redis monitor poll interval should be a positive number") + } + + items := make([]string, 0) + for _, field := range opts.cfg.Fields { + redisKey, ok := field.Sources()[config.SourceRedis] + if !ok { + continue + } + items = append(items, redisKey) + } + wtc, err := redismon.New(client, pollInterval, items) + if err != nil { + return err + } + + opts.monitorParams = append(opts.monitorParams, wtc) + return nil + } +} diff --git a/seed/redis/getter.go b/seed/redis/getter.go index 60950526..3bf83fd8 100644 --- a/seed/redis/getter.go +++ b/seed/redis/getter.go @@ -22,7 +22,7 @@ func New(client redis.UniversalClient) (*Getter, error) { } // Get value by key. -func (g Getter) Get(key string) (*string, uint64, error) { +func (g *Getter) Get(key string) (*string, uint64, error) { val, err := g.client.Get(context.Background(), key).Result() if err != nil { return nil, 0, err