Skip to content

Commit

Permalink
make types and func non public
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Oct 5, 2024
1 parent a0b51e1 commit adc14cd
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 93 deletions.
4 changes: 2 additions & 2 deletions pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func Initialize(apiKey string, config *Config) *Client {
}
var flagStreamApi *flagConfigStreamApiV2
if config.StreamUpdates {
flagStreamApi = NewFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout)
flagStreamApi = newFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout)
}
deploymentRunner = newDeploymentRunner(
config,
NewFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout),
newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout),
flagStreamApi, flagConfigStorage, cohortStorage, cohortLoader)
client = &Client{
log: log,
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/deployment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func newDeploymentRunner(
cohortStorage cohortStorage,
cohortLoader *cohortLoader,
) *deploymentRunner {
flagConfigUpdater := NewFlagConfigFallbackRetryWrapper(NewFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug)
flagConfigUpdater := newflagConfigFallbackRetryWrapper(newFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug)
if flagConfigStreamApi != nil {
flagConfigUpdater = NewFlagConfigFallbackRetryWrapper(NewFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug)
flagConfigUpdater = newflagConfigFallbackRetryWrapper(newFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug)
}
dr := &deploymentRunner{
config: config,
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/local/flag_config_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type flagConfigApiV2 struct {
FlagConfigPollerRequestTimeoutMillis time.Duration
}

func NewFlagConfigApiV2(deploymentKey, serverURL string, flagConfigPollerRequestTimeoutMillis time.Duration) *flagConfigApiV2 {
func newFlagConfigApiV2(deploymentKey, serverURL string, flagConfigPollerRequestTimeoutMillis time.Duration) *flagConfigApiV2 {
return &flagConfigApiV2{
DeploymentKey: deploymentKey,
ServerURL: serverURL,
Expand Down
8 changes: 4 additions & 4 deletions pkg/experiment/local/flag_config_stream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type flagConfigStreamApiV2 struct {
keepaliveTimeout time.Duration,
reconnInterval time.Duration,
maxJitter time.Duration,
) Stream
) stream
}

func NewFlagConfigStreamApiV2(
func newFlagConfigStreamApiV2(
deploymentKey string,
serverURL string,
connectionTimeout time.Duration,
Expand All @@ -50,7 +50,7 @@ func NewFlagConfigStreamApiV2(
connectionTimeout: connectionTimeout,
stopCh: nil,
lock: sync.Mutex{},
newSseStreamFactory: NewSseStream,
newSseStreamFactory: newSseStream,
}
}

Expand All @@ -74,7 +74,7 @@ func (api *flagConfigStreamApiV2) Connect(
// Create Stream.
stream := api.newSseStreamFactory("Api-Key "+api.DeploymentKey, endpoint.String(), api.connectionTimeout, streamApiKeepaliveTimeout, streamApiReconnInterval, streamApiMaxJitter)

streamMsgCh := make(chan StreamEvent)
streamMsgCh := make(chan streamEvent)
streamErrCh := make(chan error)

closeStream := func() {
Expand Down
40 changes: 20 additions & 20 deletions pkg/experiment/local/flag_config_stream_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ type mockSseStream struct {
maxJitter time.Duration

// Channels to emit messages to simulate new events received through stream.
messageCh chan (StreamEvent)
messageCh chan (streamEvent)
errorCh chan (error)

// Channel to tell there's a connection call.
chConnected chan bool
}

func (s *mockSseStream) Connect(messageCh chan (StreamEvent), errorCh chan (error)) error {
func (s *mockSseStream) Connect(messageCh chan (streamEvent), errorCh chan (error)) error {
s.messageCh = messageCh
s.errorCh = errorCh

Expand All @@ -39,7 +39,7 @@ func (s *mockSseStream) Connect(messageCh chan (StreamEvent), errorCh chan (erro
func (s *mockSseStream) Cancel() {
}

func (s *mockSseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) EventSource) {
func (s *mockSseStream) setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) eventSource) {
}

func (s *mockSseStream) newSseStreamFactory(
Expand All @@ -49,7 +49,7 @@ func (s *mockSseStream) newSseStreamFactory(
keepaliveTimeout time.Duration,
reconnInterval time.Duration,
maxJitter time.Duration,
) Stream {
) stream {
s.authToken = authToken
s.url = url
s.connectionTimeout = connectionTimeout
Expand All @@ -64,15 +64,15 @@ var FLAG_1, _ = parseData(FLAG_1_STR)

func TestFlagConfigStreamApi(t *testing.T) {
sse := mockSseStream{chConnected: make(chan bool)}
api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api.newSseStreamFactory = sse.newSseStreamFactory
receivedMsgCh := make(chan map[string]*evaluation.Flag)
receivedErrCh := make(chan error)

go func() {
// On connect.
<-sse.chConnected
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
assert.Equal(t, FLAG_1, <-receivedMsgCh)
}()
err := api.Connect(
Expand All @@ -88,17 +88,17 @@ func TestFlagConfigStreamApi(t *testing.T) {
)
assert.Nil(t, err)

go func() { sse.messageCh <- StreamEvent{data: FLAG_1_STR} }()
go func() { sse.messageCh <- streamEvent{data: FLAG_1_STR} }()
assert.Equal(t, FLAG_1, <-receivedMsgCh)
go func() { sse.messageCh <- StreamEvent{data: FLAG_1_STR} }()
go func() { sse.messageCh <- streamEvent{data: FLAG_1_STR} }()
assert.Equal(t, FLAG_1, <-receivedMsgCh)

api.Close()
}

func TestFlagConfigStreamApiErrorNoInitialFlags(t *testing.T) {
sse := mockSseStream{chConnected: make(chan bool)}
api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api.newSseStreamFactory = sse.newSseStreamFactory

go func() {
Expand All @@ -111,15 +111,15 @@ func TestFlagConfigStreamApiErrorNoInitialFlags(t *testing.T) {

func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) {
sse := mockSseStream{chConnected: make(chan bool)}
api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api.newSseStreamFactory = sse.newSseStreamFactory
receivedMsgCh := make(chan map[string]*evaluation.Flag)
receivedErrCh := make(chan error)

go func() {
// On connect.
<-sse.chConnected
sse.messageCh <- StreamEvent{data: []byte("bad data")}
sse.messageCh <- streamEvent{data: []byte("bad data")}
<-receivedMsgCh // Should hang as no good data was received.
assert.Fail(t, "Bad message went through")
}()
Expand All @@ -133,15 +133,15 @@ func TestFlagConfigStreamApiErrorCorruptInitialFlags(t *testing.T) {

func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) {
sse := mockSseStream{chConnected: make(chan bool)}
api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api.newSseStreamFactory = sse.newSseStreamFactory
receivedMsgCh := make(chan map[string]*evaluation.Flag)
receivedErrCh := make(chan error)

go func() {
// On connect.
<-sse.chConnected
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
<-receivedMsgCh // Should hang as no updates was received.
assert.Fail(t, "Bad message went through")
}()
Expand All @@ -155,15 +155,15 @@ func TestFlagConfigStreamApiErrorInitialFlagsUpdateFailStopsApi(t *testing.T) {

func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *testing.T) {
sse := mockSseStream{chConnected: make(chan bool)}
api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api.newSseStreamFactory = sse.newSseStreamFactory
receivedMsgCh := make(chan map[string]*evaluation.Flag)
receivedErrCh := make(chan error)

go func() {
// On connect.
<-sse.chConnected
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
assert.Equal(t, FLAG_1, <-receivedMsgCh) // Should hang as no updates was received.
}()
err := api.Connect(
Expand All @@ -173,22 +173,22 @@ func TestFlagConfigStreamApiErrorInitialFlagsFutureUpdateFailDoesntStopApi(t *te
)
assert.Nil(t, err)
// Send an update, this should call onUpdate cb which fails.
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
// Make sure channel is not closed.
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
}

func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) {
sse := mockSseStream{chConnected: make(chan bool)}
api := NewFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api := newFlagConfigStreamApiV2("deploymentkey", "serverurl", 1*time.Second)
api.newSseStreamFactory = sse.newSseStreamFactory
receivedMsgCh := make(chan map[string]*evaluation.Flag)
receivedErrCh := make(chan error)

go func() {
// On connect.
<-sse.chConnected
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
assert.Equal(t, FLAG_1, <-receivedMsgCh)
}()
err := api.Connect(
Expand All @@ -203,6 +203,6 @@ func TestFlagConfigStreamApiErrorDuringStreaming(t *testing.T) {

// The message channel should be closed.
defer mutePanic(nil)
sse.messageCh <- StreamEvent{data: FLAG_1_STR}
sse.messageCh <- streamEvent{data: FLAG_1_STR}
assert.Fail(t, "Unexpected message after error")
}
20 changes: 10 additions & 10 deletions pkg/experiment/local/flag_config_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type flagConfigStreamer struct {
lock sync.Mutex
}

func NewFlagConfigStreamer(
func newFlagConfigStreamer(
flagConfigStreamApi flagConfigStreamApi,
config *Config,
flagConfigStorage flagConfigStorage,
Expand Down Expand Up @@ -175,7 +175,7 @@ type flagConfigPoller struct {
lock sync.Mutex
}

func NewFlagConfigPoller(
func newFlagConfigPoller(
flagConfigApi flagConfigApi,
config *Config,
flagConfigStorage flagConfigStorage,
Expand Down Expand Up @@ -249,7 +249,7 @@ func (p *flagConfigPoller) Stop() {

// A wrapper around flag config updaters to retry and fallback.
// If the main updater fails, it will fallback to the fallback updater and main updater enters retry loop.
type FlagConfigFallbackRetryWrapper struct {
type flagConfigFallbackRetryWrapper struct {
log *logger.Log
mainUpdater flagConfigUpdater
fallbackUpdater flagConfigUpdater
Expand All @@ -259,14 +259,14 @@ type FlagConfigFallbackRetryWrapper struct {
lock sync.Mutex
}

func NewFlagConfigFallbackRetryWrapper(
func newflagConfigFallbackRetryWrapper(
mainUpdater flagConfigUpdater,
fallbackUpdater flagConfigUpdater,
retryDelay time.Duration,
maxJitter time.Duration,
debug bool,
) flagConfigUpdater {
return &FlagConfigFallbackRetryWrapper{
return &flagConfigFallbackRetryWrapper{
log: logger.New(debug),
mainUpdater: mainUpdater,
fallbackUpdater: fallbackUpdater,
Expand All @@ -283,9 +283,9 @@ func NewFlagConfigFallbackRetryWrapper(
//
// Since the wrapper retries, so there will never be error case.
// Thus, onError will never be called.
func (w *FlagConfigFallbackRetryWrapper) Start(onError func(error)) error {
// if (mainUpdater is FlagConfigFallbackRetryWrapper) {
// throw Error("Do not use FlagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.")
func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error {
// if (mainUpdater is flagConfigFallbackRetryWrapper) {
// throw Error("Do not use flagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.")
// }

w.lock.Lock()
Expand Down Expand Up @@ -325,7 +325,7 @@ func (w *FlagConfigFallbackRetryWrapper) Start(onError func(error)) error {
return nil
}

func (w *FlagConfigFallbackRetryWrapper) Stop() {
func (w *flagConfigFallbackRetryWrapper) Stop() {
w.lock.Lock()
defer w.lock.Unlock()

Expand All @@ -339,7 +339,7 @@ func (w *FlagConfigFallbackRetryWrapper) Stop() {
}
}

func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() {
func (w *flagConfigFallbackRetryWrapper) scheduleRetry() {
w.lock.Lock()
defer w.lock.Unlock()

Expand Down
Loading

0 comments on commit adc14cd

Please sign in to comment.