diff --git a/pkg/experiment/local/assignment_service_test.go b/pkg/experiment/local/assignment_service_test.go index d1d5b77..4d364d6 100644 --- a/pkg/experiment/local/assignment_service_test.go +++ b/pkg/experiment/local/assignment_service_test.go @@ -22,9 +22,9 @@ func TestToEvent(t *testing.T) { }, }, "flag-key-2": { - Key: "control", + Key: "control", Metadata: map[string]interface{}{ - "default": true, + "default": true, "segmentName": "All Other Users", "flagVersion": float64(12), }, diff --git a/pkg/experiment/local/client.go b/pkg/experiment/local/client.go index c658bc2..47e60ff 100644 --- a/pkg/experiment/local/client.go +++ b/pkg/experiment/local/client.go @@ -67,8 +67,8 @@ func Initialize(apiKey string, config *Config) *Client { flagStreamApi = NewFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout) } deploymentRunner = newDeploymentRunner( - config, - NewFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), + config, + NewFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), flagStreamApi, flagConfigStorage, cohortStorage, cohortLoader) client = &Client{ log: log, diff --git a/pkg/experiment/local/client_stream_test.go b/pkg/experiment/local/client_stream_test.go index b3627b6..543d7d4 100644 --- a/pkg/experiment/local/client_stream_test.go +++ b/pkg/experiment/local/client_stream_test.go @@ -25,8 +25,8 @@ func init() { } streamClient = Initialize("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz", &Config{ - StreamUpdates: true, - StreamServerUrl: "https://stream.lab.amplitude.com", + StreamUpdates: true, + StreamServerUrl: "https://stream.lab.amplitude.com", CohortSyncConfig: &cohortSyncConfig, }) err = streamClient.Start() diff --git a/pkg/experiment/local/cohort_loader.go b/pkg/experiment/local/cohort_loader.go index bc7870d..e3d234d 100644 --- a/pkg/experiment/local/cohort_loader.go +++ b/pkg/experiment/local/cohort_loader.go @@ -10,7 +10,7 @@ import ( ) type cohortLoader struct { - log *logger.Log + log *logger.Log cohortDownloadApi cohortDownloadApi cohortStorage cohortStorage jobs sync.Map @@ -122,4 +122,4 @@ func (cl *cohortLoader) downloadCohorts(cohortIDs map[string]struct{}) { if len(errorMessages) > 0 { cl.log.Error("One or more cohorts failed to download:\n%s", strings.Join(errorMessages, "\n")) } -} \ No newline at end of file +} diff --git a/pkg/experiment/local/config.go b/pkg/experiment/local/config.go index 3759107..400732d 100644 --- a/pkg/experiment/local/config.go +++ b/pkg/experiment/local/config.go @@ -24,9 +24,9 @@ type Config struct { ServerZone ServerZone FlagConfigPollerInterval time.Duration FlagConfigPollerRequestTimeout time.Duration - StreamUpdates bool - StreamServerUrl string - StreamFlagConnTimeout time.Duration + StreamUpdates bool + StreamServerUrl string + StreamFlagConnTimeout time.Duration AssignmentConfig *AssignmentConfig CohortSyncConfig *CohortSyncConfig } @@ -50,9 +50,9 @@ var DefaultConfig = &Config{ ServerZone: USServerZone, FlagConfigPollerInterval: 30 * time.Second, FlagConfigPollerRequestTimeout: 10 * time.Second, - StreamUpdates: false, - StreamServerUrl: "https://stream.lab.amplitude.com", - StreamFlagConnTimeout: 1500 * time.Millisecond, + StreamUpdates: false, + StreamServerUrl: "https://stream.lab.amplitude.com", + StreamFlagConnTimeout: 1500 * time.Millisecond, } var DefaultAssignmentConfig = &AssignmentConfig{ diff --git a/pkg/experiment/local/deployment_runner.go b/pkg/experiment/local/deployment_runner.go index 2997ba5..dc8bf30 100644 --- a/pkg/experiment/local/deployment_runner.go +++ b/pkg/experiment/local/deployment_runner.go @@ -29,7 +29,7 @@ func newDeploymentRunner( cohortLoader *cohortLoader, ) *deploymentRunner { flagConfigUpdater := NewFlagConfigFallbackRetryWrapper(NewFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug) - if (flagConfigStreamApi != nil) { + if flagConfigStreamApi != nil { flagConfigUpdater = NewFlagConfigFallbackRetryWrapper(NewFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug) } dr := &deploymentRunner{ @@ -37,7 +37,7 @@ func newDeploymentRunner( flagConfigStorage: flagConfigStorage, cohortLoader: cohortLoader, flagConfigUpdater: flagConfigUpdater, - poller: newPoller(), + poller: newPoller(), log: logger.New(config.Debug), } return dr @@ -47,7 +47,7 @@ func (dr *deploymentRunner) start() error { dr.lock.Lock() defer dr.lock.Unlock() err := dr.flagConfigUpdater.Start(nil) - if (err != nil) { + if err != nil { return err } diff --git a/pkg/experiment/local/flag_config_stream_api.go b/pkg/experiment/local/flag_config_stream_api.go index b223a4c..330a646 100644 --- a/pkg/experiment/local/flag_config_stream_api.go +++ b/pkg/experiment/local/flag_config_stream_api.go @@ -16,21 +16,21 @@ const streamApiReconnInterval = 15 * time.Minute type flagConfigStreamApi interface { Connect( - onInitUpdate func (map[string]*evaluation.Flag) error, - onUpdate func (map[string]*evaluation.Flag) error, - onError func (error), + onInitUpdate func(map[string]*evaluation.Flag) error, + onUpdate func(map[string]*evaluation.Flag) error, + onError func(error), ) error Close() } type flagConfigStreamApiV2 struct { - DeploymentKey string - ServerURL string - connectionTimeout time.Duration - stopCh chan bool - lock sync.Mutex - newSseStreamFactory func ( - authToken, + DeploymentKey string + ServerURL string + connectionTimeout time.Duration + stopCh chan bool + lock sync.Mutex + newSseStreamFactory func( + authToken, url string, connectionTimeout time.Duration, keepaliveTimeout time.Duration, @@ -40,24 +40,24 @@ type flagConfigStreamApiV2 struct { } func NewFlagConfigStreamApiV2( - deploymentKey string, - serverURL string, - connectionTimeout time.Duration, + deploymentKey string, + serverURL string, + connectionTimeout time.Duration, ) *flagConfigStreamApiV2 { return &flagConfigStreamApiV2{ - DeploymentKey: deploymentKey, - ServerURL: serverURL, - connectionTimeout: connectionTimeout, - stopCh: nil, - lock: sync.Mutex{}, + DeploymentKey: deploymentKey, + ServerURL: serverURL, + connectionTimeout: connectionTimeout, + stopCh: nil, + lock: sync.Mutex{}, newSseStreamFactory: NewSseStream, } } func (api *flagConfigStreamApiV2) Connect( - onInitUpdate func (map[string]*evaluation.Flag) error, - onUpdate func (map[string]*evaluation.Flag) error, - onError func (error), + onInitUpdate func(map[string]*evaluation.Flag) error, + onUpdate func(map[string]*evaluation.Flag) error, + onError func(error), ) error { api.lock.Lock() defer api.lock.Unlock() @@ -72,12 +72,12 @@ func (api *flagConfigStreamApiV2) Connect( endpoint.Path = "sdk/stream/v1/flags" // Create Stream. - stream := api.newSseStreamFactory("Api-Key " + api.DeploymentKey, endpoint.String(), api.connectionTimeout, streamApiKeepaliveTimeout, streamApiReconnInterval, streamApiMaxJitter) + stream := api.newSseStreamFactory("Api-Key "+api.DeploymentKey, endpoint.String(), api.connectionTimeout, streamApiKeepaliveTimeout, streamApiReconnInterval, streamApiMaxJitter) streamMsgCh := make(chan StreamEvent) streamErrCh := make(chan error) - closeStream := func () { + closeStream := func() { stream.Cancel() close(streamMsgCh) close(streamErrCh) @@ -85,26 +85,26 @@ func (api *flagConfigStreamApiV2) Connect( // Connect. err = stream.Connect(streamMsgCh, streamErrCh) - if (err != nil) { + if err != nil { return err } // Retrieve first flag configs and parse it. // If any error here means init error. - select{ + select { case msg := <-streamMsgCh: // Parse message and verify data correct. flags, err := parseData(msg.data) - if (err != nil) { + if err != nil { closeStream() return errors.New("flag config stream api corrupt data, cause: " + err.Error()) } - if (onInitUpdate != nil) { + if onInitUpdate != nil { err = onInitUpdate(flags) - } else if (onUpdate != nil) { + } else if onUpdate != nil { err = onUpdate(flags) } - if (err != nil) { + if err != nil { closeStream() return err } @@ -126,11 +126,11 @@ func (api *flagConfigStreamApiV2) Connect( api.lock.Lock() defer api.lock.Unlock() closeStream() - if (api.stopCh == stopCh) { + if api.stopCh == stopCh { api.stopCh = nil } close(stopCh) - if (onError != nil) { + if onError != nil { onError(err) } } @@ -138,21 +138,21 @@ func (api *flagConfigStreamApiV2) Connect( // Retrieve and pass on message forever until stopCh closes. go func() { for { - select{ + select { case <-stopCh: // Channel returns immediately when closed. Note the local channel is referred here, so it's guaranteed to not be nil. closeStream() return case msg := <-streamMsgCh: // Parse message and verify data correct. flags, err := parseData(msg.data) - if (err != nil) { + if err != nil { // Error, close everything. closeAllAndNotify(errors.New("stream corrupt data, cause: " + err.Error())) return } - if (onUpdate != nil) { + if onUpdate != nil { // Deliver async. Don't care about any errors. - go func() {onUpdate(flags)}() + go func() { onUpdate(flags) }() } case err := <-streamErrCh: // Error, close everything. @@ -181,7 +181,7 @@ func parseData(data []byte) (map[string]*evaluation.Flag, error) { } func (api *flagConfigStreamApiV2) closeInternal() { - if (api.stopCh != nil) { + if api.stopCh != nil { close(api.stopCh) api.stopCh = nil } @@ -191,4 +191,4 @@ func (api *flagConfigStreamApiV2) Close() { defer api.lock.Unlock() api.closeInternal() -} \ No newline at end of file +} diff --git a/pkg/experiment/local/flag_config_stream_api_test.go b/pkg/experiment/local/flag_config_stream_api_test.go index 670ebb8..3097579 100644 --- a/pkg/experiment/local/flag_config_stream_api_test.go +++ b/pkg/experiment/local/flag_config_stream_api_test.go @@ -13,22 +13,22 @@ import ( type mockSseStream struct { // Params - authToken string - url string + authToken string + url string connectionTimeout time.Duration - keepaliveTimeout time.Duration - reconnInterval time.Duration - maxJitter time.Duration + keepaliveTimeout time.Duration + reconnInterval time.Duration + maxJitter time.Duration // Channels to emit messages to simulate new events received through stream. - messageCh chan(StreamEvent) - errorCh chan(error) + messageCh chan (StreamEvent) + errorCh chan (error) // Channel to tell there's a connection call. chConnected chan bool } -func (s *mockSseStream) Connect(messageCh chan(StreamEvent), errorCh chan(error)) error { +func (s *mockSseStream) Connect(messageCh chan (StreamEvent), errorCh chan (error)) error { s.messageCh = messageCh s.errorCh = errorCh @@ -39,12 +39,11 @@ func (s *mockSseStream) Connect(messageCh chan(StreamEvent), errorCh chan(error) func (s *mockSseStream) Cancel() { } -func (s *mockSseStream) setNewESFactory(f func (httpClient *http.Client, url string, headers map[string]string) EventSource) { +func (s *mockSseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) { } - -func (s *mockSseStream) newSseStreamFactory ( - authToken, +func (s *mockSseStream) newSseStreamFactory( + authToken, url string, connectionTimeout time.Duration, keepaliveTimeout time.Duration, @@ -65,7 +64,7 @@ var FLAG_1, _ = parseData(FLAG_1_STR) func TestFlagConfigStreamApi(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1 * time.Second) + api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -85,13 +84,13 @@ func TestFlagConfigStreamApi(t *testing.T) { receivedMsgCh <- m return nil }, - func(err error) {receivedErrCh <- err}, + func(err error) { receivedErrCh <- err }, ) assert.Nil(t, err) - go func() {sse.messageCh <- StreamEvent{data: FLAG_1_STR}}() + go func() { sse.messageCh <- StreamEvent{data: FLAG_1_STR} }() assert.Equal(t, FLAG_1, <-receivedMsgCh) - go func() {sse.messageCh <- StreamEvent{data: FLAG_1_STR}}() + go func() { sse.messageCh <- StreamEvent{data: FLAG_1_STR} }() assert.Equal(t, FLAG_1, <-receivedMsgCh) api.Close() @@ -99,7 +98,7 @@ func TestFlagConfigStreamApi(t *testing.T) { func TestFlagConfigStreamApiErrorNoInitialFlags(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1 * time.Second) + api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory go func() { @@ -112,7 +111,7 @@ func TestFlagConfigStreamApiErrorNoInitialFlags(t *testing.T) { func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1 * time.Second) + api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -125,16 +124,16 @@ func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) { assert.Fail(t, "Bad message went through") }() err := api.Connect( - func(m map[string]*evaluation.Flag) error {receivedMsgCh <- m; return nil}, - func(m map[string]*evaluation.Flag) error {receivedMsgCh <- m; return nil}, - func(err error) {receivedErrCh <- err}, + func(m map[string]*evaluation.Flag) error { receivedMsgCh <- m; return nil }, + func(m map[string]*evaluation.Flag) error { receivedMsgCh <- m; return nil }, + func(err error) { receivedErrCh <- err }, ) assert.Equal(t, "flag config stream api corrupt data", strings.Split(err.Error(), ", cause: ")[0]) } func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1 * time.Second) + api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -147,16 +146,16 @@ func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) { assert.Fail(t, "Bad message went through") }() err := api.Connect( - func(m map[string]*evaluation.Flag) error {return errors.New("bad update")}, - func(m map[string]*evaluation.Flag) error {receivedMsgCh <- m; return nil}, - func(err error) {receivedErrCh <- err}, + func(m map[string]*evaluation.Flag) error { return errors.New("bad update") }, + func(m map[string]*evaluation.Flag) error { receivedMsgCh <- m; return nil }, + func(err error) { receivedErrCh <- err }, ) assert.Equal(t, errors.New("bad update"), err) } func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1 * time.Second) + api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -168,9 +167,9 @@ func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *te assert.Equal(t, FLAG_1, <-receivedMsgCh) // Should hang as no updates was received. }() err := api.Connect( - func(m map[string]*evaluation.Flag) error {receivedMsgCh <- m; return nil}, - func(m map[string]*evaluation.Flag) error {return errors.New("bad update")}, - func(err error) {receivedErrCh <- err}, + func(m map[string]*evaluation.Flag) error { receivedMsgCh <- m; return nil }, + func(m map[string]*evaluation.Flag) error { return errors.New("bad update") }, + func(err error) { receivedErrCh <- err }, ) assert.Nil(t, err) // Send an update, this should call onUpdate cb which fails. @@ -181,7 +180,7 @@ func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *te func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1 * time.Second) + api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -193,17 +192,17 @@ func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) { assert.Equal(t, FLAG_1, <-receivedMsgCh) }() err := api.Connect( - func(m map[string]*evaluation.Flag) error {receivedMsgCh <- m; return nil}, - func(m map[string]*evaluation.Flag) error {receivedMsgCh <- m; return nil}, - func(err error) {receivedErrCh <- err}, + func(m map[string]*evaluation.Flag) error { receivedMsgCh <- m; return nil }, + func(m map[string]*evaluation.Flag) error { receivedMsgCh <- m; return nil }, + func(err error) { receivedErrCh <- err }, ) assert.Nil(t, err) - go func() {sse.errorCh <- errors.New("error1")}() + go func() { sse.errorCh <- errors.New("error1") }() assert.Equal(t, errors.New("error1"), <-receivedErrCh) // The message channel should be closed. - defer mutePanic(nil); + defer mutePanic(nil) sse.messageCh <- StreamEvent{data: FLAG_1_STR} assert.Fail(t, "Unexpected message after error") } diff --git a/pkg/experiment/local/flag_config_updater.go b/pkg/experiment/local/flag_config_updater.go index 0217710..553c7e7 100644 --- a/pkg/experiment/local/flag_config_updater.go +++ b/pkg/experiment/local/flag_config_updater.go @@ -9,20 +9,20 @@ import ( ) type flagConfigUpdater interface { - // Start the updater. There can be multiple calls. - // If start fails, it should return err. The caller should handle error. - // If some other async error happened while updating (after already started successfully), + // Start the updater. There can be multiple calls. + // If start fails, it should return err. The caller should handle error. + // If some other async error happened while updating (after already started successfully), // it should call the `func (error)` callback function. - Start(func (error)) error + Start(func(error)) error Stop() } // The base for all flag config updaters. -// Contains a method to properly update the flag configs into storage and download cohorts. +// Contains a method to properly update the flag configs into storage and download cohorts. type flagConfigUpdaterBase struct { flagConfigStorage flagConfigStorage - cohortStorage cohortStorage - cohortLoader *cohortLoader + cohortStorage cohortStorage + cohortLoader *cohortLoader log *logger.Log } @@ -34,15 +34,15 @@ func newFlagConfigUpdaterBase( ) flagConfigUpdaterBase { return flagConfigUpdaterBase{ flagConfigStorage: flagConfigStorage, - cohortStorage: cohortStorage, - cohortLoader: cohortLoader, - log: logger.New(config.Debug), + cohortStorage: cohortStorage, + cohortLoader: cohortLoader, + log: logger.New(config.Debug), } } // Updates the received flag configs into storage and download cohorts. func (u *flagConfigUpdaterBase) update(flagConfigs map[string]*evaluation.Flag) error { - + flagKeys := make(map[string]struct{}) for _, flag := range flagConfigs { flagKeys[flag.Key] = struct{}{} @@ -117,38 +117,38 @@ func (u *flagConfigUpdaterBase) deleteUnusedCohorts() { // The streamer for flag configs. It receives flag configs through server side events. type flagConfigStreamer struct { flagConfigUpdaterBase - flagConfigStreamApi flagConfigStreamApi - lock sync.Mutex + flagConfigStreamApi flagConfigStreamApi + lock sync.Mutex } func NewFlagConfigStreamer( - flagConfigStreamApi flagConfigStreamApi, - config *Config, + flagConfigStreamApi flagConfigStreamApi, + config *Config, flagConfigStorage flagConfigStorage, cohortStorage cohortStorage, cohortLoader *cohortLoader, ) flagConfigUpdater { return &flagConfigStreamer{ - flagConfigStreamApi: flagConfigStreamApi, + flagConfigStreamApi: flagConfigStreamApi, flagConfigUpdaterBase: newFlagConfigUpdaterBase(flagConfigStorage, cohortStorage, cohortLoader, config), } } -func (s *flagConfigStreamer) Start(onError func (error)) error { +func (s *flagConfigStreamer) Start(onError func(error)) error { s.lock.Lock() defer s.lock.Unlock() s.stopInternal() return s.flagConfigStreamApi.Connect( - func (flags map[string]*evaluation.Flag) error { + func(flags map[string]*evaluation.Flag) error { return s.update(flags) }, - func (flags map[string]*evaluation.Flag) error { + func(flags map[string]*evaluation.Flag) error { return s.update(flags) }, - func (err error) { + func(err error) { s.Stop() - if (onError != nil) { + if onError != nil { onError(err) } }, @@ -169,27 +169,27 @@ func (s *flagConfigStreamer) Stop() { // On start, it polls a set of flag configs. If failed, error is returned. If success, poller starts. type flagConfigPoller struct { flagConfigUpdaterBase - flagConfigApi flagConfigApi - config *Config - poller *poller - lock sync.Mutex + flagConfigApi flagConfigApi + config *Config + poller *poller + lock sync.Mutex } func NewFlagConfigPoller( - flagConfigApi flagConfigApi, - config *Config, + flagConfigApi flagConfigApi, + config *Config, flagConfigStorage flagConfigStorage, cohortStorage cohortStorage, cohortLoader *cohortLoader, ) flagConfigUpdater { return &flagConfigPoller{ - flagConfigApi: flagConfigApi, - config: config, + flagConfigApi: flagConfigApi, + config: config, flagConfigUpdaterBase: newFlagConfigUpdaterBase(flagConfigStorage, cohortStorage, cohortLoader, config), } } -func (p *flagConfigPoller) Start(onError func (error)) error { +func (p *flagConfigPoller) Start(onError func(error)) error { p.lock.Lock() defer p.lock.Unlock() @@ -234,7 +234,7 @@ func (p *flagConfigPoller) updateFlagConfigs() error { } func (p *flagConfigPoller) stopInternal() error { - if (p.poller != nil) { + if p.poller != nil { close(p.poller.shutdown) p.poller = nil } @@ -250,37 +250,40 @@ func (p *flagConfigPoller) Stop() { // A wrapper around flag config updaters to retry and fallback. // If the main updater fails, it will fallback to the fallback updater and main updater enters retry loop. type FlagConfigFallbackRetryWrapper struct { - log *logger.Log - mainUpdater flagConfigUpdater - fallbackUpdater flagConfigUpdater - retryDelay time.Duration - maxJitter time.Duration - retryTimer *time.Timer - lock sync.Mutex + log *logger.Log + mainUpdater flagConfigUpdater + fallbackUpdater flagConfigUpdater + retryDelay time.Duration + maxJitter time.Duration + retryTimer *time.Timer + lock sync.Mutex } + func NewFlagConfigFallbackRetryWrapper( - mainUpdater flagConfigUpdater, - fallbackUpdater flagConfigUpdater, - retryDelay time.Duration, - maxJitter time.Duration, + mainUpdater flagConfigUpdater, + fallbackUpdater flagConfigUpdater, + retryDelay time.Duration, + maxJitter time.Duration, debug bool, ) flagConfigUpdater { return &FlagConfigFallbackRetryWrapper{ - log: logger.New(debug), - mainUpdater: mainUpdater, + log: logger.New(debug), + mainUpdater: mainUpdater, fallbackUpdater: fallbackUpdater, - retryDelay: retryDelay, - maxJitter: maxJitter, + retryDelay: retryDelay, + maxJitter: maxJitter, } } -// Start tries to start main updater first. -// If it failed, start the fallback updater. -// If fallback updater failed as well, return error. -// If fallback updater succeed, main updater enters retry, return ok. +// Start tries to start main updater first. +// +// If it failed, start the fallback updater. +// If fallback updater failed as well, return error. +// If fallback updater succeed, main updater enters retry, return ok. +// // Since the wrapper retries, so there will never be error case. // Thus, onError will never be called. -func (w *FlagConfigFallbackRetryWrapper) Start(onError func (error)) error { +func (w *FlagConfigFallbackRetryWrapper) Start(onError func(error)) error { // if (mainUpdater is FlagConfigFallbackRetryWrapper) { // throw Error("Do not use FlagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.") // } @@ -288,37 +291,37 @@ func (w *FlagConfigFallbackRetryWrapper) Start(onError func (error)) error { w.lock.Lock() defer w.lock.Unlock() - if (w.retryTimer != nil) { + if w.retryTimer != nil { w.retryTimer.Stop() w.retryTimer = nil } - err := w.mainUpdater.Start(func (err error) { + err := w.mainUpdater.Start(func(err error) { w.log.Error("main updater updating err, starting fallback if available. error: ", err) - go func() {w.scheduleRetry()}() // Don't care if poller start error or not, always retry. - if (w.fallbackUpdater != nil) { + go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry. + if w.fallbackUpdater != nil { w.fallbackUpdater.Start(nil) } }) - if (err == nil) { + if err == nil { // Main start success, stop fallback. - if (w.fallbackUpdater != nil) { + if w.fallbackUpdater != nil { w.fallbackUpdater.Stop() } return nil } w.log.Debug("main updater start err, starting fallback. error: ", err) - if (w.fallbackUpdater == nil) { + if w.fallbackUpdater == nil { // No fallback, main start failed is wrapper start fail return err } err = w.fallbackUpdater.Start(nil) - if (err != nil) { + if err != nil { w.log.Debug("fallback updater start failed. error: ", err) return err } - go func() {w.scheduleRetry()}() + go func() { w.scheduleRetry() }() return nil } @@ -326,12 +329,12 @@ func (w *FlagConfigFallbackRetryWrapper) Stop() { w.lock.Lock() defer w.lock.Unlock() - if (w.retryTimer != nil) { + if w.retryTimer != nil { w.retryTimer.Stop() w.retryTimer = nil } w.mainUpdater.Stop() - if (w.fallbackUpdater != nil) { + if w.fallbackUpdater != nil { w.fallbackUpdater.Stop() } } @@ -340,7 +343,7 @@ func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() { w.lock.Lock() defer w.lock.Unlock() - if (w.retryTimer != nil) { + if w.retryTimer != nil { w.retryTimer.Stop() w.retryTimer = nil } @@ -348,26 +351,26 @@ func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() { w.lock.Lock() defer w.lock.Unlock() - if (w.retryTimer != nil) { + if w.retryTimer != nil { w.retryTimer = nil } w.log.Debug("main updater retry start") - err := w.mainUpdater.Start(func (error) { - go func() {w.scheduleRetry()}() // Don't care if poller start error or not, always retry. - if (w.fallbackUpdater != nil) { + err := w.mainUpdater.Start(func(error) { + go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry. + if w.fallbackUpdater != nil { w.fallbackUpdater.Start(nil) } }) - if (err == nil) { + if err == nil { // Main start success, stop fallback. w.log.Debug("main updater retry start success") - if (w.fallbackUpdater != nil) { + if w.fallbackUpdater != nil { w.fallbackUpdater.Stop() } return } - - go func() {w.scheduleRetry()}() + + go func() { w.scheduleRetry() }() }) -} \ No newline at end of file +} diff --git a/pkg/experiment/local/flag_config_updater_test.go b/pkg/experiment/local/flag_config_updater_test.go index 6142083..c95b108 100644 --- a/pkg/experiment/local/flag_config_updater_test.go +++ b/pkg/experiment/local/flag_config_updater_test.go @@ -28,7 +28,7 @@ func TestFlagConfigPoller(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return FLAG_1, nil } - poller.Start(func (e error) { + poller.Start(func(e error) { errorCh <- e }) // Start should block for first poll. assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. @@ -37,7 +37,7 @@ func TestFlagConfigPoller(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return map[string]*evaluation.Flag{}, nil } - time.Sleep(1100 * time.Millisecond) // Sleep for poller to poll. + time.Sleep(1100 * time.Millisecond) // Sleep for poller to poll. assert.Equal(t, map[string]*evaluation.Flag{}, flagConfigStorage.getFlagConfigs()) // Test flags empty in storage. // Stop poller, make sure there's no more poll. @@ -49,7 +49,6 @@ func TestFlagConfigPoller(t *testing.T) { time.Sleep(1100 * time.Millisecond) // Sleep for poller to poll. } - func TestFlagConfigPollerStartFail(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestPollerObjs() @@ -60,7 +59,7 @@ func TestFlagConfigPollerStartFail(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return nil, errors.New("start error") } - err := poller.Start(func (e error) { + err := poller.Start(func(e error) { errorCh <- e }) // Start should block for first poll. assert.Equal(t, errors.New("start error"), err) // Test flags in storage. @@ -76,7 +75,7 @@ func TestFlagConfigPollerPollingFail(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return FLAG_1, nil } - poller.Start(func (e error) { + poller.Start(func(e error) { errorCh <- e }) // Start should block for first poll. assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. @@ -85,8 +84,8 @@ func TestFlagConfigPollerPollingFail(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return nil, errors.New("flag error") } - time.Sleep(1100 * time.Millisecond) // Sleep for poller to poll. - assert.Equal(t, errors.New("flag error"), <- errorCh) // Error callback called. + time.Sleep(1100 * time.Millisecond) // Sleep for poller to poll. + assert.Equal(t, errors.New("flag error"), <-errorCh) // Error callback called. // Make sure there's no more poll. api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { @@ -99,25 +98,25 @@ func TestFlagConfigPollerPollingFail(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return map[string]*evaluation.Flag{}, nil } - poller.Start(func (e error) { + poller.Start(func(e error) { errorCh <- e }) assert.Equal(t, map[string]*evaluation.Flag{}, flagConfigStorage.getFlagConfigs()) // Test flags in storage. } - type mockFlagConfigStreamApi struct { connectFunc func( - func (map[string]*evaluation.Flag) error, - func (map[string]*evaluation.Flag) error, - func (error), + func(map[string]*evaluation.Flag) error, + func(map[string]*evaluation.Flag) error, + func(error), ) error closeFunc func() } + func (api *mockFlagConfigStreamApi) Connect( - onInitUpdate func (map[string]*evaluation.Flag) error, - onUpdate func (map[string]*evaluation.Flag) error, - onError func (error), + onInitUpdate func(map[string]*evaluation.Flag) error, + onUpdate func(map[string]*evaluation.Flag) error, + onError func(error), ) error { return api.connectFunc(onInitUpdate, onUpdate, onError) } @@ -138,11 +137,11 @@ func TestFlagConfigStreamer(t *testing.T) { streamer := NewFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) - var updateCb func (map[string]*evaluation.Flag) error + var updateCb func(map[string]*evaluation.Flag) error api.connectFunc = func( - onInitUpdate func (map[string]*evaluation.Flag) error, - onUpdate func (map[string]*evaluation.Flag) error, - onError func (error), + onInitUpdate func(map[string]*evaluation.Flag) error, + onUpdate func(map[string]*evaluation.Flag) error, + onError func(error), ) error { onInitUpdate(FLAG_1) updateCb = onUpdate @@ -153,7 +152,7 @@ func TestFlagConfigStreamer(t *testing.T) { } // Streamer start normal. - streamer.Start(func (e error) { + streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. @@ -167,7 +166,7 @@ func TestFlagConfigStreamer(t *testing.T) { assert.Nil(t, updateCb) // Make sure stream Close is called. // Streamer start again. - streamer.Start(func (e error) { + streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. @@ -182,9 +181,9 @@ func TestFlagConfigStreamerStartFail(t *testing.T) { errorCh := make(chan error) api.connectFunc = func( - onInitUpdate func (map[string]*evaluation.Flag) error, - onUpdate func (map[string]*evaluation.Flag) error, - onError func (error), + onInitUpdate func(map[string]*evaluation.Flag) error, + onUpdate func(map[string]*evaluation.Flag) error, + onError func(error), ) error { return errors.New("api connect error") } @@ -192,7 +191,7 @@ func TestFlagConfigStreamerStartFail(t *testing.T) { } // Streamer start. - err := streamer.Start(func (e error) { + err := streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags, which is error. assert.Equal(t, errors.New("api connect error"), err) @@ -204,12 +203,12 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) { streamer := NewFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) - var updateCb func (map[string]*evaluation.Flag) error - var errorCb func (error) + var updateCb func(map[string]*evaluation.Flag) error + var errorCb func(error) api.connectFunc = func( - onInitUpdate func (map[string]*evaluation.Flag) error, - onUpdate func (map[string]*evaluation.Flag) error, - onError func (error), + onInitUpdate func(map[string]*evaluation.Flag) error, + onUpdate func(map[string]*evaluation.Flag) error, + onError func(error), ) error { onInitUpdate(FLAG_1) updateCb = onUpdate @@ -222,20 +221,20 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) { } // Streamer start normal. - streamer.Start(func (e error) { + streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. // Stream error. - go func() {errorCb(errors.New("stream error"))} () + go func() { errorCb(errors.New("stream error")) }() assert.Equal(t, errors.New("stream error"), <-errorCh) // Error callback is called. - assert.Nil(t, updateCb) // Make sure stream Close is called. + assert.Nil(t, updateCb) // Make sure stream Close is called. assert.Nil(t, errorCb) // Streamer start again. - flagConfigStorage.removeIf(func (f *evaluation.Flag) bool {return true}) - streamer.Start(func (e error) { + flagConfigStorage.removeIf(func(f *evaluation.Flag) bool { return true }) + streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. @@ -244,30 +243,30 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) { } type mockFlagConfigUpdater struct { - startFunc func (func (error)) error - stopFunc func () + startFunc func(func(error)) error + stopFunc func() } -func (u *mockFlagConfigUpdater) Start(f func (error)) error { return u.startFunc(f) } -func (u *mockFlagConfigUpdater) Stop() { u.stopFunc() } +func (u *mockFlagConfigUpdater) Start(f func(error)) error { return u.startFunc(f) } +func (u *mockFlagConfigUpdater) Stop() { u.stopFunc() } func TestFlagConfigFallbackRetryWrapper(t *testing.T) { main := mockFlagConfigUpdater{} - var mainOnError func (error) - main.startFunc = func (onError func (error)) error { + var mainOnError func(error) + main.startFunc = func(onError func(error)) error { mainOnError = onError return nil } - main.stopFunc = func () { - mainOnError = nil + main.stopFunc = func() { + mainOnError = nil } - fallback := mockFlagConfigUpdater{} - fallback.startFunc = func (onError func (error)) error { + fallback := mockFlagConfigUpdater{} + fallback.startFunc = func(onError func(error)) error { return nil } - fallback.stopFunc = func () { + fallback.stopFunc = func() { } - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true) + w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -278,21 +277,21 @@ func TestFlagConfigFallbackRetryWrapper(t *testing.T) { func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { main := mockFlagConfigUpdater{} - var mainOnError func (error) - main.startFunc = func (onError func (error)) error { + var mainOnError func(error) + main.startFunc = func(onError func(error)) error { mainOnError = onError return errors.New("main start error") } - main.stopFunc = func () { - mainOnError = nil + main.stopFunc = func() { + mainOnError = nil } - fallback := mockFlagConfigUpdater{} - fallback.startFunc = func (onError func (error)) error { + fallback := mockFlagConfigUpdater{} + fallback.startFunc = func(onError func(error)) error { return errors.New("fallback start error") } - fallback.stopFunc = func () { + fallback.stopFunc = func() { } - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true) + w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) err := w.Start(nil) assert.Equal(t, errors.New("fallback start error"), err) assert.NotNil(t, mainOnError) @@ -305,23 +304,23 @@ func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) { main := mockFlagConfigUpdater{} - var mainOnError func (error) - main.startFunc = func (onError func (error)) error { + var mainOnError func(error) + main.startFunc = func(onError func(error)) error { mainOnError = onError return errors.New("main start error") } - main.stopFunc = func () { - mainOnError = nil + main.stopFunc = func() { + mainOnError = nil } fallback := mockFlagConfigUpdater{} fallbackStopCh := make(chan bool) - fallback.startFunc = func (onError func (error)) error { + fallback.startFunc = func(onError func(error)) error { return nil } - fallback.stopFunc = func () { - go func() {fallbackStopCh <- true} () + fallback.stopFunc = func() { + go func() { fallbackStopCh <- true }() } - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true) + w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -332,64 +331,67 @@ func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T assert.NotNil(t, mainOnError) // Main started called. mainOnError = nil select { - case <-fallbackStopCh: assert.Fail(t, "Unexpected fallback stopped") + case <-fallbackStopCh: + assert.Fail(t, "Unexpected fallback stopped") default: } // Test next retry success. - main.startFunc = func (onError func (error)) error { + main.startFunc = func(onError func(error)) error { mainOnError = onError return nil } time.Sleep(1100 * time.Millisecond) assert.NotNil(t, mainOnError) // Main errored. - <-fallbackStopCh // Fallback stopped. + <-fallbackStopCh // Fallback stopped. w.Stop() } func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { main := mockFlagConfigUpdater{} - var mainOnError func (error) - main.startFunc = func (onError func (error)) error { + var mainOnError func(error) + main.startFunc = func(onError func(error)) error { mainOnError = onError return nil } - main.stopFunc = func () { - mainOnError = nil + main.stopFunc = func() { + mainOnError = nil } fallback := mockFlagConfigUpdater{} fallbackStartCh := make(chan bool) fallbackStopCh := make(chan bool) - fallback.startFunc = func (onError func (error)) error { - go func() {fallbackStartCh <- true} () + fallback.startFunc = func(onError func(error)) error { + go func() { fallbackStartCh <- true }() return nil } - fallback.stopFunc = func () {} - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true) + fallback.stopFunc = func() {} + w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) // Start success err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) select { - case <-fallbackStartCh: assert.Fail(t, "Unexpected fallback started") + case <-fallbackStartCh: + assert.Fail(t, "Unexpected fallback started") default: } // Test main updating failed, fallback. - fallback.stopFunc = func () { // Start tracking fallback stops (Start() may call stops). - go func() {fallbackStopCh <- true} () + fallback.stopFunc = func() { // Start tracking fallback stops (Start() may call stops). + go func() { fallbackStopCh <- true }() } mainOnError(errors.New("main updating error")) mainOnError = nil <-fallbackStartCh // Fallbacks started. select { - case <-fallbackStopCh: assert.Fail(t, "Unexpected fallback stopped") + case <-fallbackStopCh: + assert.Fail(t, "Unexpected fallback stopped") default: } // Test retry start fail as main updating fail. - main.startFunc = func (onError func (error)) error { + main.startFunc = func(onError func(error)) error { mainOnError = onError return errors.New("main start error") } @@ -397,20 +399,23 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { assert.NotNil(t, mainOnError) // Main started called. mainOnError = nil select { // Test no changes made to fallback updater. - case <-fallbackStartCh: assert.Fail(t, "Unexpected fallback started") - case <-fallbackStopCh: assert.Fail(t, "Unexpected fallback stopped") + case <-fallbackStartCh: + assert.Fail(t, "Unexpected fallback started") + case <-fallbackStopCh: + assert.Fail(t, "Unexpected fallback stopped") default: } // Test next retry success. - main.startFunc = func (onError func (error)) error { + main.startFunc = func(onError func(error)) error { mainOnError = onError return nil } time.Sleep(1100 * time.Millisecond) assert.NotNil(t, mainOnError) // Main errored. select { - case <-fallbackStartCh: assert.Fail(t, "Unexpected fallback stopped") + case <-fallbackStartCh: + assert.Fail(t, "Unexpected fallback stopped") default: } <-fallbackStopCh // Fallback stopped. @@ -421,15 +426,15 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { main := mockFlagConfigUpdater{} - var mainOnError func (error) - main.startFunc = func (onError func (error)) error { + var mainOnError func(error) + main.startFunc = func(onError func(error)) error { mainOnError = onError return nil } - main.stopFunc = func () { - mainOnError = nil + main.stopFunc = func() { + mainOnError = nil } - w := NewFlagConfigFallbackRetryWrapper(&main, nil, 1 * time.Second, 0, true) + w := NewFlagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -463,4 +468,4 @@ func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { assert.Nil(t, mainOnError) w.Stop() -} \ No newline at end of file +} diff --git a/pkg/experiment/local/stream.go b/pkg/experiment/local/stream.go index b7c2f1a..40e7869 100644 --- a/pkg/experiment/local/stream.go +++ b/pkg/experiment/local/stream.go @@ -36,7 +36,7 @@ func newEventSource(httpClient *http.Client, url string, headers map[string]stri client.Connection = httpClient client.Headers = headers sse.ClientMaxBufferSize(1 << 32)(client) - client.ReconnectStrategy = &backoff.StopBackOff{}; + client.ReconnectStrategy = &backoff.StopBackOff{} return client } @@ -48,41 +48,41 @@ type Stream interface { Connect(messageCh chan StreamEvent, errorCh chan error) error Cancel() // For testing. - setNewESFactory(f func (httpClient *http.Client, url string, headers map[string]string) EventSource) + setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) } -func (s *SseStream) setNewESFactory(f func (httpClient *http.Client, url string, headers map[string]string) EventSource) { +func (s *SseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) { s.newESFactory = f } type SseStream struct { - AuthToken string - url string - connectionTimeout time.Duration - keepaliveTimeout time.Duration - reconnInterval time.Duration - maxJitter time.Duration + AuthToken string + url string + connectionTimeout time.Duration + keepaliveTimeout time.Duration + reconnInterval time.Duration + maxJitter time.Duration lock sync.Mutex cancelClientContext *context.CancelFunc - newESFactory func (httpClient *http.Client, url string, headers map[string]string) EventSource + newESFactory func(httpClient *http.Client, url string, headers map[string]string) EventSource } func NewSseStream( - authToken, + authToken, url string, - connectionTimeout time.Duration, - keepaliveTimeout time.Duration, - reconnInterval time.Duration, - maxJitter time.Duration, + connectionTimeout time.Duration, + keepaliveTimeout time.Duration, + reconnInterval time.Duration, + maxJitter time.Duration, ) Stream { return &SseStream{ - AuthToken: authToken, - url: url, + AuthToken: authToken, + url: url, connectionTimeout: connectionTimeout, - keepaliveTimeout: keepaliveTimeout, - reconnInterval: reconnInterval, - maxJitter: maxJitter, - newESFactory: newEventSource, + keepaliveTimeout: keepaliveTimeout, + reconnInterval: reconnInterval, + maxJitter: maxJitter, + newESFactory: newEventSource, } } @@ -104,17 +104,17 @@ func (s *SseStream) connectInternal( transport := &http.Transport{ Dial: (&net.Dialer{ - Timeout: s.connectionTimeout, + Timeout: s.connectionTimeout, }).Dial, - TLSHandshakeTimeout: s.connectionTimeout, + TLSHandshakeTimeout: s.connectionTimeout, ResponseHeaderTimeout: s.connectionTimeout, } // The http client timeout includes reading body, which is the entire SSE lifecycle until SSE is closed. httpClient := &http.Client{Transport: transport, Timeout: s.reconnInterval + s.maxJitter} // Max time for this connection. - + client := s.newESFactory(httpClient, s.url, map[string]string{ - "Authorization": s.AuthToken, + "Authorization": s.AuthToken, "X-Amp-Exp-Library": fmt.Sprintf("experiment-go-server/%v", experiment.VERSION), }) @@ -123,7 +123,7 @@ func (s *SseStream) connectInternal( esConnectErrCh := make(chan error) esDisconnectCh := make(chan bool) // Redirect on disconnect to a channel. - client.OnDisconnect(func (s *sse.Client) { + client.OnDisconnect(func(s *sse.Client) { select { case <-ctx.Done(): // Cancelled. return @@ -132,19 +132,19 @@ func (s *SseStream) connectInternal( } }) // Redirect on connect to a channel. - client.OnConnect(func (s *sse.Client) { + client.OnConnect(func(s *sse.Client) { select { case <-ctx.Done(): // Cancelled. return default: - go func() {connectCh <- true} () + go func() { connectCh <- true }() } }) go func() { // Subscribe to messages using channel. // This should be a non blocking call, but unsure how long it takes. err := client.SubscribeChanRawWithContext(ctx, esMsgCh) - if (err != nil) { + if err != nil { esConnectErrCh <- err } }() @@ -153,7 +153,7 @@ func (s *SseStream) connectInternal( s.lock.Lock() defer s.lock.Unlock() cancel() - if (s.cancelClientContext == &cancel) { + if s.cancelClientContext == &cancel { s.cancelClientContext = nil } } @@ -183,14 +183,14 @@ func (s *SseStream) connectInternal( select { case <-ctx.Done(): // Cancelled. return - case <- esDisconnectCh: // Disconnected. + case <-esDisconnectCh: // Disconnected. cancelWithLock() defer mutePanic(nil) errorCh <- errors.New("stream disconnected error") return case event := <-esMsgCh: // Message received. - if (len(event.Data) == 1 && event.Data[0] == STREAM_KEEP_ALIVE_BYTE) { - // Keep alive. + if len(event.Data) == 1 && event.Data[0] == STREAM_KEEP_ALIVE_BYTE { + // Keep alive. continue } // Possible write to closed channel @@ -203,10 +203,10 @@ func (s *SseStream) connectInternal( errorCh <- errors.New("stream keepalive timed out") } } - } () + }() // Reconnect after interval. - time.AfterFunc(randTimeDuration(s.reconnInterval, s.maxJitter), func () { + time.AfterFunc(randTimeDuration(s.reconnInterval, s.maxJitter), func() { select { case <-ctx.Done(): // Cancelled. return @@ -223,7 +223,7 @@ func (s *SseStream) connectInternal( func (s *SseStream) Cancel() { s.lock.Lock() defer s.lock.Unlock() - if (s.cancelClientContext != nil) { + if s.cancelClientContext != nil { (*(s.cancelClientContext))() s.cancelClientContext = nil } diff --git a/pkg/experiment/local/stream_test.go b/pkg/experiment/local/stream_test.go index f8e8276..13a8278 100644 --- a/pkg/experiment/local/stream_test.go +++ b/pkg/experiment/local/stream_test.go @@ -13,16 +13,16 @@ import ( type mockEventSource struct { httpClient *http.Client - url string - headers map[string]string + url string + headers map[string]string subscribeChanError error - chConnected chan bool + chConnected chan bool - ctx context.Context + ctx context.Context messageChan chan *sse.Event - onDisCb sse.ConnCallback - onConnCb sse.ConnCallback + onDisCb sse.ConnCallback + onConnCb sse.ConnCallback } func (s *mockEventSource) OnDisconnect(fn sse.ConnCallback) { @@ -49,7 +49,7 @@ func (s *mockEventSource) mockEventSourceFactory(httpClient *http.Client, url st func TestStream(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("authToken", "url", 2 * time.Second, 4 * time.Second, 6 * time.Second, 1 * time.Second) + client := NewSseStream("authToken", "url", 2*time.Second, 4*time.Second, 6*time.Second, 1*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) @@ -66,13 +66,13 @@ func TestStream(t *testing.T) { // Signal connected. s.onConnCb(nil) - + // Send update 1, ensure received. - go func() {s.messageChan <- &sse.Event{Data: []byte("data1")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte("data1")} }() assert.Equal(t, []byte("data1"), (<-messageCh).data) // Send keep alive, not passed down, checked later along with updates 2 and 3. - go func() {s.messageChan <- &sse.Event{Data: []byte(" ")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte(" ")} }() // Send update 2 and 3, ensure received in order. go func() { @@ -87,7 +87,7 @@ func TestStream(t *testing.T) { assert.True(t, errors.Is(s.ctx.Err(), context.Canceled)) // No message is passed through after cancel even it's received. - go func() {s.messageChan <- &sse.Event{Data: []byte("data4")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte("data4")} }() // Ensure no message after cancel. select { @@ -106,16 +106,16 @@ func TestStream(t *testing.T) { func TestStreamConnTimeout(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2 * time.Second, 4 * time.Second, 6 * time.Second, 1 * time.Second) + client := NewSseStream("", "", 2*time.Second, 4*time.Second, 6*time.Second, 1*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) - // Make connection. + // Make connection. client.Connect(messageCh, errorCh) <-s.chConnected - // Wait for timeout to reach. - time.Sleep(2 * time.Second + 10 * time.Millisecond) + // Wait for timeout to reach. + time.Sleep(2*time.Second + 10*time.Millisecond) // Check that context cancelled and error received. assert.True(t, errors.Is(s.ctx.Err(), context.Canceled)) assert.Equal(t, errors.New("stream connection timeout"), <-errorCh) @@ -123,59 +123,59 @@ func TestStreamConnTimeout(t *testing.T) { func TestStreamKeepAliveTimeout(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2 * time.Second, 1 * time.Second, 6 * time.Second, 1 * time.Second) + client := NewSseStream("", "", 2*time.Second, 1*time.Second, 6*time.Second, 1*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) - + // Make connection. client.Connect(messageCh, errorCh) <-s.chConnected s.onConnCb(nil) // Send keepalive 1 and wait. - go func() {s.messageChan <- &sse.Event{Data: []byte(" ")}}() - time.Sleep(1 * time.Second - 10 * time.Millisecond) + go func() { s.messageChan <- &sse.Event{Data: []byte(" ")} }() + time.Sleep(1*time.Second - 10*time.Millisecond) assert.False(t, errors.Is(s.ctx.Err(), context.Canceled)) // Send keepalive 2 and wait. - go func() {s.messageChan <- &sse.Event{Data: []byte(" ")}}() - time.Sleep(1 * time.Second - 10 * time.Millisecond) + go func() { s.messageChan <- &sse.Event{Data: []byte(" ")} }() + time.Sleep(1*time.Second - 10*time.Millisecond) assert.False(t, errors.Is(s.ctx.Err(), context.Canceled)) // Send data and wait, data should reset keepalive. - go func() {s.messageChan <- &sse.Event{Data: []byte("data1")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte("data1")} }() assert.Equal(t, []byte("data1"), (<-messageCh).data) - time.Sleep(1 * time.Second - 10 * time.Millisecond) + time.Sleep(1*time.Second - 10*time.Millisecond) assert.False(t, errors.Is(s.ctx.Err(), context.Canceled)) // Send data ensure stream is open. - go func() {s.messageChan <- &sse.Event{Data: []byte("data1")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte("data1")} }() assert.Equal(t, []byte("data1"), (<-messageCh).data) assert.False(t, errors.Is(s.ctx.Err(), context.Canceled)) // Wait for keepalive to timeout, stream should close. - time.Sleep(1 * time.Second + 10 * time.Millisecond) + time.Sleep(1*time.Second + 10*time.Millisecond) assert.Equal(t, errors.New("stream keepalive timed out"), <-errorCh) - assert.True(t, errors.Is(s.ctx.Err(), context.Canceled)) + assert.True(t, errors.Is(s.ctx.Err(), context.Canceled)) } func TestStreamReconnectsTimeout(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2 * time.Second, 3 * time.Second, 2 * time.Second, 0 * time.Second) + client := NewSseStream("", "", 2*time.Second, 3*time.Second, 2*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) - + // Make connection. client.Connect(messageCh, errorCh) <-s.chConnected s.onConnCb(nil) - go func() {s.messageChan <- &sse.Event{Data: []byte("data1")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte("data1")} }() assert.Equal(t, []byte("data1"), (<-messageCh).data) // Sleep for reconnect to timeout, data should pass through. - time.Sleep(2 * time.Second + 100 * time.Millisecond) + time.Sleep(2*time.Second + 100*time.Millisecond) <-s.chConnected s.onConnCb(nil) - go func() {s.messageChan <- &sse.Event{Data: []byte(" ")}}() - go func() {s.messageChan <- &sse.Event{Data: []byte("data2")}}() + go func() { s.messageChan <- &sse.Event{Data: []byte(" ")} }() + go func() { s.messageChan <- &sse.Event{Data: []byte("data2")} }() assert.Equal(t, []byte("data2"), (<-messageCh).data) assert.False(t, errors.Is(s.ctx.Err(), context.Canceled)) // Cancel stream, should cancel context. @@ -193,11 +193,11 @@ func TestStreamReconnectsTimeout(t *testing.T) { func TestStreamConnectAndCancelImmediately(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2 * time.Second, 3 * time.Second, 2 * time.Second, 0 * time.Second) + client := NewSseStream("", "", 2*time.Second, 3*time.Second, 2*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) - + // Make connection and cancel immediately. client.Connect(messageCh, errorCh) client.Cancel() @@ -214,7 +214,7 @@ func TestStreamConnectAndCancelImmediately(t *testing.T) { func TestStreamChannelCloseOk(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 1 * time.Second, 1 * time.Second, 1 * time.Second, 0 * time.Second) + client := NewSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) @@ -227,7 +227,7 @@ func TestStreamChannelCloseOk(t *testing.T) { client.Connect(messageCh, errorCh) <-s.chConnected s.onConnCb(nil) - + // Test no message received for closed channel. s.messageChan <- &sse.Event{Data: []byte("data1")} assert.True(t, errors.Is(s.ctx.Err(), context.Canceled)) @@ -248,11 +248,11 @@ func TestStreamChannelCloseOk(t *testing.T) { func TestStreamDisconnectErrorPasses(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 1 * time.Second, 1 * time.Second, 1 * time.Second, 0 * time.Second) + client := NewSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) - + // Make connection. client.Connect(messageCh, errorCh) <-s.chConnected @@ -274,11 +274,11 @@ func TestStreamDisconnectErrorPasses(t *testing.T) { func TestStreamConnectErrorPasses(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 1 * time.Second, 1 * time.Second, 1 * time.Second, 0 * time.Second) + client := NewSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) messageCh := make(chan StreamEvent) errorCh := make(chan error) - + // Make connection. s.subscribeChanError = errors.New("some error occurred") client.Connect(messageCh, errorCh) @@ -296,4 +296,4 @@ func TestStreamConnectErrorPasses(t *testing.T) { case <-time.After(2 * time.Second): // No message received within the timeout, as expected } -} \ No newline at end of file +} diff --git a/pkg/experiment/local/util.go b/pkg/experiment/local/util.go index 5f32969..88d506f 100644 --- a/pkg/experiment/local/util.go +++ b/pkg/experiment/local/util.go @@ -30,17 +30,17 @@ func difference(set1, set2 map[string]struct{}) map[string]struct{} { } func randTimeDuration(base time.Duration, jitter time.Duration) time.Duration { - if (jitter == 0) { + if jitter == 0 { return base } dmin := base.Nanoseconds() - jitter.Nanoseconds() - if (dmin < 0) { + if dmin < 0 { dmin = 0 } dmiddle := base.Nanoseconds() - if (dmiddle > math.MaxInt64 - jitter.Nanoseconds()) { + if dmiddle > math.MaxInt64-jitter.Nanoseconds() { dmiddle = math.MaxInt64 - jitter.Nanoseconds() } dmax := dmiddle + jitter.Nanoseconds() - return time.Duration(dmin + rand.Int64N(dmax - dmin)) -} \ No newline at end of file + return time.Duration(dmin + rand.Int64N(dmax-dmin)) +}