From adc14cd79ef3cf3919fb8b817d178e9d2c030c95 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Sat, 5 Oct 2024 01:27:25 -0700 Subject: [PATCH] make types and func non public --- pkg/experiment/local/client.go | 4 +- pkg/experiment/local/deployment_runner.go | 4 +- pkg/experiment/local/flag_config_api.go | 2 +- .../local/flag_config_stream_api.go | 8 ++-- .../local/flag_config_stream_api_test.go | 40 +++++++++--------- pkg/experiment/local/flag_config_updater.go | 20 ++++----- .../local/flag_config_updater_test.go | 32 +++++++------- pkg/experiment/local/stream.go | 42 +++++++++---------- pkg/experiment/local/stream_test.go | 34 +++++++-------- 9 files changed, 93 insertions(+), 93 deletions(-) diff --git a/pkg/experiment/local/client.go b/pkg/experiment/local/client.go index 47e60ff..7d3db54 100644 --- a/pkg/experiment/local/client.go +++ b/pkg/experiment/local/client.go @@ -64,11 +64,11 @@ func Initialize(apiKey string, config *Config) *Client { } var flagStreamApi *flagConfigStreamApiV2 if config.StreamUpdates { - flagStreamApi = NewFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout) + flagStreamApi = newFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout) } deploymentRunner = newDeploymentRunner( config, - NewFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), + newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), flagStreamApi, flagConfigStorage, cohortStorage, cohortLoader) client = &Client{ log: log, diff --git a/pkg/experiment/local/deployment_runner.go b/pkg/experiment/local/deployment_runner.go index dc8bf30..2a2312f 100644 --- a/pkg/experiment/local/deployment_runner.go +++ b/pkg/experiment/local/deployment_runner.go @@ -28,9 +28,9 @@ func newDeploymentRunner( cohortStorage cohortStorage, cohortLoader *cohortLoader, ) *deploymentRunner { - flagConfigUpdater := NewFlagConfigFallbackRetryWrapper(NewFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug) + flagConfigUpdater := newflagConfigFallbackRetryWrapper(newFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug) if flagConfigStreamApi != nil { - flagConfigUpdater = NewFlagConfigFallbackRetryWrapper(NewFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug) + flagConfigUpdater = newflagConfigFallbackRetryWrapper(newFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug) } dr := &deploymentRunner{ config: config, diff --git a/pkg/experiment/local/flag_config_api.go b/pkg/experiment/local/flag_config_api.go index 5e1821d..0076bfc 100644 --- a/pkg/experiment/local/flag_config_api.go +++ b/pkg/experiment/local/flag_config_api.go @@ -23,7 +23,7 @@ type flagConfigApiV2 struct { FlagConfigPollerRequestTimeoutMillis time.Duration } -func NewFlagConfigApiV2(deploymentKey, serverURL string, flagConfigPollerRequestTimeoutMillis time.Duration) *flagConfigApiV2 { +func newFlagConfigApiV2(deploymentKey, serverURL string, flagConfigPollerRequestTimeoutMillis time.Duration) *flagConfigApiV2 { return &flagConfigApiV2{ DeploymentKey: deploymentKey, ServerURL: serverURL, diff --git a/pkg/experiment/local/flag_config_stream_api.go b/pkg/experiment/local/flag_config_stream_api.go index 330a646..97259e3 100644 --- a/pkg/experiment/local/flag_config_stream_api.go +++ b/pkg/experiment/local/flag_config_stream_api.go @@ -36,10 +36,10 @@ type flagConfigStreamApiV2 struct { keepaliveTimeout time.Duration, reconnInterval time.Duration, maxJitter time.Duration, - ) Stream + ) stream } -func NewFlagConfigStreamApiV2( +func newFlagConfigStreamApiV2( deploymentKey string, serverURL string, connectionTimeout time.Duration, @@ -50,7 +50,7 @@ func NewFlagConfigStreamApiV2( connectionTimeout: connectionTimeout, stopCh: nil, lock: sync.Mutex{}, - newSseStreamFactory: NewSseStream, + newSseStreamFactory: newSseStream, } } @@ -74,7 +74,7 @@ func (api *flagConfigStreamApiV2) Connect( // Create Stream. stream := api.newSseStreamFactory("Api-Key "+api.DeploymentKey, endpoint.String(), api.connectionTimeout, streamApiKeepaliveTimeout, streamApiReconnInterval, streamApiMaxJitter) - streamMsgCh := make(chan StreamEvent) + streamMsgCh := make(chan streamEvent) streamErrCh := make(chan error) closeStream := func() { diff --git a/pkg/experiment/local/flag_config_stream_api_test.go b/pkg/experiment/local/flag_config_stream_api_test.go index 3097579..82ee94d 100644 --- a/pkg/experiment/local/flag_config_stream_api_test.go +++ b/pkg/experiment/local/flag_config_stream_api_test.go @@ -21,14 +21,14 @@ type mockSseStream struct { maxJitter time.Duration // Channels to emit messages to simulate new events received through stream. - messageCh chan (StreamEvent) + messageCh chan (streamEvent) errorCh chan (error) // Channel to tell there's a connection call. chConnected chan bool } -func (s *mockSseStream) Connect(messageCh chan (StreamEvent), errorCh chan (error)) error { +func (s *mockSseStream) Connect(messageCh chan (streamEvent), errorCh chan (error)) error { s.messageCh = messageCh s.errorCh = errorCh @@ -39,7 +39,7 @@ func (s *mockSseStream) Connect(messageCh chan (StreamEvent), errorCh chan (erro func (s *mockSseStream) Cancel() { } -func (s *mockSseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) { +func (s *mockSseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) eventSource) { } func (s *mockSseStream) newSseStreamFactory( @@ -49,7 +49,7 @@ func (s *mockSseStream) newSseStreamFactory( keepaliveTimeout time.Duration, reconnInterval time.Duration, maxJitter time.Duration, -) Stream { +) stream { s.authToken = authToken s.url = url s.connectionTimeout = connectionTimeout @@ -64,7 +64,7 @@ var FLAG_1, _ = parseData(FLAG_1_STR) func TestFlagConfigStreamApi(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) + api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -72,7 +72,7 @@ func TestFlagConfigStreamApi(t *testing.T) { go func() { // On connect. <-sse.chConnected - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} assert.Equal(t, FLAG_1, <-receivedMsgCh) }() err := api.Connect( @@ -88,9 +88,9 @@ func TestFlagConfigStreamApi(t *testing.T) { ) assert.Nil(t, err) - go func() { sse.messageCh <- StreamEvent{data: FLAG_1_STR} }() + go func() { sse.messageCh <- streamEvent{data: FLAG_1_STR} }() assert.Equal(t, FLAG_1, <-receivedMsgCh) - go func() { sse.messageCh <- StreamEvent{data: FLAG_1_STR} }() + go func() { sse.messageCh <- streamEvent{data: FLAG_1_STR} }() assert.Equal(t, FLAG_1, <-receivedMsgCh) api.Close() @@ -98,7 +98,7 @@ func TestFlagConfigStreamApi(t *testing.T) { func TestFlagConfigStreamApiErrorNoInitialFlags(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) + api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory go func() { @@ -111,7 +111,7 @@ func TestFlagConfigStreamApiErrorNoInitialFlags(t *testing.T) { func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) + api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -119,7 +119,7 @@ func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) { go func() { // On connect. <-sse.chConnected - sse.messageCh <- StreamEvent{data: []byte("bad data")} + sse.messageCh <- streamEvent{data: []byte("bad data")} <-receivedMsgCh // Should hang as no good data was received. assert.Fail(t, "Bad message went through") }() @@ -133,7 +133,7 @@ func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) { func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) + api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -141,7 +141,7 @@ func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) { go func() { // On connect. <-sse.chConnected - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} <-receivedMsgCh // Should hang as no updates was received. assert.Fail(t, "Bad message went through") }() @@ -155,7 +155,7 @@ func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) { func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) + api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -163,7 +163,7 @@ func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *te go func() { // On connect. <-sse.chConnected - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} assert.Equal(t, FLAG_1, <-receivedMsgCh) // Should hang as no updates was received. }() err := api.Connect( @@ -173,14 +173,14 @@ func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *te ) assert.Nil(t, err) // Send an update, this should call onUpdate cb which fails. - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} // Make sure channel is not closed. - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} } func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) { sse := mockSseStream{chConnected: make(chan bool)} - api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) + api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second) api.newSseStreamFactory = sse.newSseStreamFactory receivedMsgCh := make(chan map[string]*evaluation.Flag) receivedErrCh := make(chan error) @@ -188,7 +188,7 @@ func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) { go func() { // On connect. <-sse.chConnected - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} assert.Equal(t, FLAG_1, <-receivedMsgCh) }() err := api.Connect( @@ -203,6 +203,6 @@ func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) { // The message channel should be closed. defer mutePanic(nil) - sse.messageCh <- StreamEvent{data: FLAG_1_STR} + sse.messageCh <- streamEvent{data: FLAG_1_STR} assert.Fail(t, "Unexpected message after error") } diff --git a/pkg/experiment/local/flag_config_updater.go b/pkg/experiment/local/flag_config_updater.go index 553c7e7..6eac131 100644 --- a/pkg/experiment/local/flag_config_updater.go +++ b/pkg/experiment/local/flag_config_updater.go @@ -121,7 +121,7 @@ type flagConfigStreamer struct { lock sync.Mutex } -func NewFlagConfigStreamer( +func newFlagConfigStreamer( flagConfigStreamApi flagConfigStreamApi, config *Config, flagConfigStorage flagConfigStorage, @@ -175,7 +175,7 @@ type flagConfigPoller struct { lock sync.Mutex } -func NewFlagConfigPoller( +func newFlagConfigPoller( flagConfigApi flagConfigApi, config *Config, flagConfigStorage flagConfigStorage, @@ -249,7 +249,7 @@ func (p *flagConfigPoller) Stop() { // A wrapper around flag config updaters to retry and fallback. // If the main updater fails, it will fallback to the fallback updater and main updater enters retry loop. -type FlagConfigFallbackRetryWrapper struct { +type flagConfigFallbackRetryWrapper struct { log *logger.Log mainUpdater flagConfigUpdater fallbackUpdater flagConfigUpdater @@ -259,14 +259,14 @@ type FlagConfigFallbackRetryWrapper struct { lock sync.Mutex } -func NewFlagConfigFallbackRetryWrapper( +func newflagConfigFallbackRetryWrapper( mainUpdater flagConfigUpdater, fallbackUpdater flagConfigUpdater, retryDelay time.Duration, maxJitter time.Duration, debug bool, ) flagConfigUpdater { - return &FlagConfigFallbackRetryWrapper{ + return &flagConfigFallbackRetryWrapper{ log: logger.New(debug), mainUpdater: mainUpdater, fallbackUpdater: fallbackUpdater, @@ -283,9 +283,9 @@ func NewFlagConfigFallbackRetryWrapper( // // Since the wrapper retries, so there will never be error case. // Thus, onError will never be called. -func (w *FlagConfigFallbackRetryWrapper) Start(onError func(error)) error { - // if (mainUpdater is FlagConfigFallbackRetryWrapper) { - // throw Error("Do not use FlagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.") +func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error { + // if (mainUpdater is flagConfigFallbackRetryWrapper) { + // throw Error("Do not use flagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.") // } w.lock.Lock() @@ -325,7 +325,7 @@ func (w *FlagConfigFallbackRetryWrapper) Start(onError func(error)) error { return nil } -func (w *FlagConfigFallbackRetryWrapper) Stop() { +func (w *flagConfigFallbackRetryWrapper) Stop() { w.lock.Lock() defer w.lock.Unlock() @@ -339,7 +339,7 @@ func (w *FlagConfigFallbackRetryWrapper) Stop() { } } -func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() { +func (w *flagConfigFallbackRetryWrapper) scheduleRetry() { w.lock.Lock() defer w.lock.Unlock() diff --git a/pkg/experiment/local/flag_config_updater_test.go b/pkg/experiment/local/flag_config_updater_test.go index c95b108..fca3b57 100644 --- a/pkg/experiment/local/flag_config_updater_test.go +++ b/pkg/experiment/local/flag_config_updater_test.go @@ -21,7 +21,7 @@ func createTestPollerObjs() (mockFlagConfigApi, flagConfigStorage, cohortStorage func TestFlagConfigPoller(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestPollerObjs() - poller := NewFlagConfigPoller(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) + poller := newFlagConfigPoller(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) // Poller start normal. @@ -52,7 +52,7 @@ func TestFlagConfigPoller(t *testing.T) { func TestFlagConfigPollerStartFail(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestPollerObjs() - poller := NewFlagConfigPoller(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) + poller := newFlagConfigPoller(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) // Poller start normal. @@ -68,7 +68,7 @@ func TestFlagConfigPollerStartFail(t *testing.T) { func TestFlagConfigPollerPollingFail(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestPollerObjs() - poller := NewFlagConfigPoller(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) + poller := newFlagConfigPoller(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) // Poller start normal. @@ -134,7 +134,7 @@ func createTestStreamerObjs() (mockFlagConfigStreamApi, flagConfigStorage, cohor func TestFlagConfigStreamer(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestStreamerObjs() - streamer := NewFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) + streamer := newFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) var updateCb func(map[string]*evaluation.Flag) error @@ -177,7 +177,7 @@ func TestFlagConfigStreamer(t *testing.T) { func TestFlagConfigStreamerStartFail(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestStreamerObjs() - streamer := NewFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) + streamer := newFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) api.connectFunc = func( @@ -200,7 +200,7 @@ func TestFlagConfigStreamerStartFail(t *testing.T) { func TestFlagConfigStreamerStreamingFail(t *testing.T) { api, flagConfigStorage, cohortStorage, cohortLoader := createTestStreamerObjs() - streamer := NewFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) + streamer := newFlagConfigStreamer(&api, &Config{FlagConfigPollerInterval: 1 * time.Second}, flagConfigStorage, cohortStorage, cohortLoader) errorCh := make(chan error) var updateCb func(map[string]*evaluation.Flag) error @@ -250,7 +250,7 @@ type mockFlagConfigUpdater struct { func (u *mockFlagConfigUpdater) Start(f func(error)) error { return u.startFunc(f) } func (u *mockFlagConfigUpdater) Stop() { u.stopFunc() } -func TestFlagConfigFallbackRetryWrapper(t *testing.T) { +func TestflagConfigFallbackRetryWrapper(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -266,7 +266,7 @@ func TestFlagConfigFallbackRetryWrapper(t *testing.T) { } fallback.stopFunc = func() { } - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -275,7 +275,7 @@ func TestFlagConfigFallbackRetryWrapper(t *testing.T) { assert.Nil(t, mainOnError) } -func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { +func TestflagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -291,7 +291,7 @@ func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { } fallback.stopFunc = func() { } - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) err := w.Start(nil) assert.Equal(t, errors.New("fallback start error"), err) assert.NotNil(t, mainOnError) @@ -302,7 +302,7 @@ func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { assert.Nil(t, mainOnError) } -func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) { +func TestflagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -320,7 +320,7 @@ func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T fallback.stopFunc = func() { go func() { fallbackStopCh <- true }() } - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -348,7 +348,7 @@ func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T w.Stop() } -func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { +func TestflagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -366,7 +366,7 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { return nil } fallback.stopFunc = func() {} - w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) // Start success err := w.Start(nil) assert.Nil(t, err) @@ -424,7 +424,7 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { } -func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { +func TestflagConfigFallbackRetryWrapperMainOnly(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -434,7 +434,7 @@ func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { main.stopFunc = func() { mainOnError = nil } - w := NewFlagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) diff --git a/pkg/experiment/local/stream.go b/pkg/experiment/local/stream.go index 40e7869..9b0f59c 100644 --- a/pkg/experiment/local/stream.go +++ b/pkg/experiment/local/stream.go @@ -25,13 +25,13 @@ func mutePanic(f func()) { } // This is a boiled down version of sse.Client. -type EventSource interface { +type eventSource interface { OnDisconnect(fn sse.ConnCallback) OnConnect(fn sse.ConnCallback) SubscribeChanRawWithContext(ctx context.Context, ch chan *sse.Event) error } -func newEventSource(httpClient *http.Client, url string, headers map[string]string) EventSource { +func newEventSource(httpClient *http.Client, url string, headers map[string]string) eventSource { client := sse.NewClient(url) client.Connection = httpClient client.Headers = headers @@ -40,22 +40,18 @@ func newEventSource(httpClient *http.Client, url string, headers map[string]stri return client } -type StreamEvent struct { +type streamEvent struct { data []byte } -type Stream interface { - Connect(messageCh chan StreamEvent, errorCh chan error) error +type stream interface { + Connect(messageCh chan streamEvent, errorCh chan error) error Cancel() // For testing. - setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) + setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) eventSource) } -func (s *SseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) { - s.newESFactory = f -} - -type SseStream struct { +type sseStream struct { AuthToken string url string connectionTimeout time.Duration @@ -64,18 +60,18 @@ type SseStream struct { maxJitter time.Duration lock sync.Mutex cancelClientContext *context.CancelFunc - newESFactory func(httpClient *http.Client, url string, headers map[string]string) EventSource + newESFactory func(httpClient *http.Client, url string, headers map[string]string) eventSource } -func NewSseStream( +func newSseStream( authToken, url string, connectionTimeout time.Duration, keepaliveTimeout time.Duration, reconnInterval time.Duration, maxJitter time.Duration, -) Stream { - return &SseStream{ +) stream { + return &sseStream{ AuthToken: authToken, url: url, connectionTimeout: connectionTimeout, @@ -86,8 +82,12 @@ func NewSseStream( } } -func (s *SseStream) Connect( - messageCh chan StreamEvent, +func (s *sseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) eventSource) { + s.newESFactory = f +} + +func (s *sseStream) Connect( + messageCh chan streamEvent, errorCh chan error, ) error { s.lock.Lock() @@ -95,8 +95,8 @@ func (s *SseStream) Connect( return s.connectInternal(messageCh, errorCh) } -func (s *SseStream) connectInternal( - messageCh chan StreamEvent, +func (s *sseStream) connectInternal( + messageCh chan streamEvent, errorCh chan error, ) error { ctx, cancel := context.WithCancel(context.Background()) @@ -196,7 +196,7 @@ func (s *SseStream) connectInternal( // Possible write to closed channel // If channel closed, cancel. defer mutePanic(cancelWithLock) - messageCh <- StreamEvent{event.Data} + messageCh <- streamEvent{event.Data} case <-time.After(s.keepaliveTimeout): // Keep alive timeout. cancelWithLock() defer mutePanic(nil) @@ -220,7 +220,7 @@ func (s *SseStream) connectInternal( return nil } -func (s *SseStream) Cancel() { +func (s *sseStream) Cancel() { s.lock.Lock() defer s.lock.Unlock() if s.cancelClientContext != nil { diff --git a/pkg/experiment/local/stream_test.go b/pkg/experiment/local/stream_test.go index 13a8278..49bbe78 100644 --- a/pkg/experiment/local/stream_test.go +++ b/pkg/experiment/local/stream_test.go @@ -40,7 +40,7 @@ func (s *mockEventSource) SubscribeChanRawWithContext(ctx context.Context, ch ch return s.subscribeChanError } -func (s *mockEventSource) mockEventSourceFactory(httpClient *http.Client, url string, headers map[string]string) EventSource { +func (s *mockEventSource) mockEventSourceFactory(httpClient *http.Client, url string, headers map[string]string) eventSource { s.httpClient = httpClient s.url = url s.headers = headers @@ -49,9 +49,9 @@ func (s *mockEventSource) mockEventSourceFactory(httpClient *http.Client, url st func TestStream(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("authToken", "url", 2*time.Second, 4*time.Second, 6*time.Second, 1*time.Second) + client := newSseStream("authToken", "url", 2*time.Second, 4*time.Second, 6*time.Second, 1*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection. @@ -106,9 +106,9 @@ func TestStream(t *testing.T) { func TestStreamConnTimeout(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2*time.Second, 4*time.Second, 6*time.Second, 1*time.Second) + client := newSseStream("", "", 2*time.Second, 4*time.Second, 6*time.Second, 1*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection. @@ -123,9 +123,9 @@ func TestStreamConnTimeout(t *testing.T) { func TestStreamKeepAliveTimeout(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2*time.Second, 1*time.Second, 6*time.Second, 1*time.Second) + client := newSseStream("", "", 2*time.Second, 1*time.Second, 6*time.Second, 1*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection. @@ -158,9 +158,9 @@ func TestStreamKeepAliveTimeout(t *testing.T) { func TestStreamReconnectsTimeout(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2*time.Second, 3*time.Second, 2*time.Second, 0*time.Second) + client := newSseStream("", "", 2*time.Second, 3*time.Second, 2*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection. @@ -193,9 +193,9 @@ func TestStreamReconnectsTimeout(t *testing.T) { func TestStreamConnectAndCancelImmediately(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 2*time.Second, 3*time.Second, 2*time.Second, 0*time.Second) + client := newSseStream("", "", 2*time.Second, 3*time.Second, 2*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection and cancel immediately. @@ -214,9 +214,9 @@ func TestStreamConnectAndCancelImmediately(t *testing.T) { func TestStreamChannelCloseOk(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) + client := newSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Close channels. @@ -248,9 +248,9 @@ func TestStreamChannelCloseOk(t *testing.T) { func TestStreamDisconnectErrorPasses(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) + client := newSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection. @@ -274,9 +274,9 @@ func TestStreamDisconnectErrorPasses(t *testing.T) { func TestStreamConnectErrorPasses(t *testing.T) { var s = mockEventSource{chConnected: make(chan bool)} - client := NewSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) + client := newSseStream("", "", 1*time.Second, 1*time.Second, 1*time.Second, 0*time.Second) client.setNewESFactory(s.mockEventSourceFactory) - messageCh := make(chan StreamEvent) + messageCh := make(chan streamEvent) errorCh := make(chan error) // Make connection.