Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Oct 5, 2024
1 parent fc4230b commit a0b51e1
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 332 deletions.
4 changes: 2 additions & 2 deletions pkg/experiment/local/assignment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestToEvent(t *testing.T) {
},
},
"flag-key-2": {
Key: "control",
Key: "control",
Metadata: map[string]interface{}{
"default": true,
"default": true,
"segmentName": "All Other Users",
"flagVersion": float64(12),
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func Initialize(apiKey string, config *Config) *Client {
flagStreamApi = NewFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout)
}
deploymentRunner = newDeploymentRunner(
config,
NewFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout),
config,
NewFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout),
flagStreamApi, flagConfigStorage, cohortStorage, cohortLoader)
client = &Client{
log: log,
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/client_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func init() {
}
streamClient = Initialize("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz",
&Config{
StreamUpdates: true,
StreamServerUrl: "https://stream.lab.amplitude.com",
StreamUpdates: true,
StreamServerUrl: "https://stream.lab.amplitude.com",
CohortSyncConfig: &cohortSyncConfig,
})
err = streamClient.Start()
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/cohort_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type cohortLoader struct {
log *logger.Log
log *logger.Log
cohortDownloadApi cohortDownloadApi
cohortStorage cohortStorage
jobs sync.Map
Expand Down Expand Up @@ -122,4 +122,4 @@ func (cl *cohortLoader) downloadCohorts(cohortIDs map[string]struct{}) {
if len(errorMessages) > 0 {
cl.log.Error("One or more cohorts failed to download:\n%s", strings.Join(errorMessages, "\n"))
}
}
}
12 changes: 6 additions & 6 deletions pkg/experiment/local/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ type Config struct {
ServerZone ServerZone
FlagConfigPollerInterval time.Duration
FlagConfigPollerRequestTimeout time.Duration
StreamUpdates bool
StreamServerUrl string
StreamFlagConnTimeout time.Duration
StreamUpdates bool
StreamServerUrl string
StreamFlagConnTimeout time.Duration
AssignmentConfig *AssignmentConfig
CohortSyncConfig *CohortSyncConfig
}
Expand All @@ -50,9 +50,9 @@ var DefaultConfig = &Config{
ServerZone: USServerZone,
FlagConfigPollerInterval: 30 * time.Second,
FlagConfigPollerRequestTimeout: 10 * time.Second,
StreamUpdates: false,
StreamServerUrl: "https://stream.lab.amplitude.com",
StreamFlagConnTimeout: 1500 * time.Millisecond,
StreamUpdates: false,
StreamServerUrl: "https://stream.lab.amplitude.com",
StreamFlagConnTimeout: 1500 * time.Millisecond,
}

var DefaultAssignmentConfig = &AssignmentConfig{
Expand Down
6 changes: 3 additions & 3 deletions pkg/experiment/local/deployment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func newDeploymentRunner(
cohortLoader *cohortLoader,
) *deploymentRunner {
flagConfigUpdater := NewFlagConfigFallbackRetryWrapper(NewFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug)
if (flagConfigStreamApi != nil) {
if flagConfigStreamApi != nil {
flagConfigUpdater = NewFlagConfigFallbackRetryWrapper(NewFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug)
}
dr := &deploymentRunner{
config: config,
flagConfigStorage: flagConfigStorage,
cohortLoader: cohortLoader,
flagConfigUpdater: flagConfigUpdater,
poller: newPoller(),
poller: newPoller(),
log: logger.New(config.Debug),
}
return dr
Expand All @@ -47,7 +47,7 @@ func (dr *deploymentRunner) start() error {
dr.lock.Lock()
defer dr.lock.Unlock()
err := dr.flagConfigUpdater.Start(nil)
if (err != nil) {
if err != nil {
return err
}

Expand Down
74 changes: 37 additions & 37 deletions pkg/experiment/local/flag_config_stream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ const streamApiReconnInterval = 15 * time.Minute

type flagConfigStreamApi interface {
Connect(
onInitUpdate func (map[string]*evaluation.Flag) error,
onUpdate func (map[string]*evaluation.Flag) error,
onError func (error),
onInitUpdate func(map[string]*evaluation.Flag) error,
onUpdate func(map[string]*evaluation.Flag) error,
onError func(error),
) error
Close()
}

type flagConfigStreamApiV2 struct {
DeploymentKey string
ServerURL string
connectionTimeout time.Duration
stopCh chan bool
lock sync.Mutex
newSseStreamFactory func (
authToken,
DeploymentKey string
ServerURL string
connectionTimeout time.Duration
stopCh chan bool
lock sync.Mutex
newSseStreamFactory func(
authToken,
url string,
connectionTimeout time.Duration,
keepaliveTimeout time.Duration,
Expand All @@ -40,24 +40,24 @@ type flagConfigStreamApiV2 struct {
}

func NewFlagConfigStreamApiV2(
deploymentKey string,
serverURL string,
connectionTimeout time.Duration,
deploymentKey string,
serverURL string,
connectionTimeout time.Duration,
) *flagConfigStreamApiV2 {
return &flagConfigStreamApiV2{
DeploymentKey: deploymentKey,
ServerURL: serverURL,
connectionTimeout: connectionTimeout,
stopCh: nil,
lock: sync.Mutex{},
DeploymentKey: deploymentKey,
ServerURL: serverURL,
connectionTimeout: connectionTimeout,
stopCh: nil,
lock: sync.Mutex{},
newSseStreamFactory: NewSseStream,
}
}

func (api *flagConfigStreamApiV2) Connect(
onInitUpdate func (map[string]*evaluation.Flag) error,
onUpdate func (map[string]*evaluation.Flag) error,
onError func (error),
onInitUpdate func(map[string]*evaluation.Flag) error,
onUpdate func(map[string]*evaluation.Flag) error,
onError func(error),
) error {
api.lock.Lock()
defer api.lock.Unlock()
Expand All @@ -72,39 +72,39 @@ func (api *flagConfigStreamApiV2) Connect(
endpoint.Path = "sdk/stream/v1/flags"

// Create Stream.
stream := api.newSseStreamFactory("Api-Key " + api.DeploymentKey, endpoint.String(), api.connectionTimeout, streamApiKeepaliveTimeout, streamApiReconnInterval, streamApiMaxJitter)
stream := api.newSseStreamFactory("Api-Key "+api.DeploymentKey, endpoint.String(), api.connectionTimeout, streamApiKeepaliveTimeout, streamApiReconnInterval, streamApiMaxJitter)

streamMsgCh := make(chan StreamEvent)
streamErrCh := make(chan error)

closeStream := func () {
closeStream := func() {
stream.Cancel()
close(streamMsgCh)
close(streamErrCh)
}

// Connect.
err = stream.Connect(streamMsgCh, streamErrCh)
if (err != nil) {
if err != nil {
return err
}

// Retrieve first flag configs and parse it.
// If any error here means init error.
select{
select {
case msg := <-streamMsgCh:
// Parse message and verify data correct.
flags, err := parseData(msg.data)
if (err != nil) {
if err != nil {
closeStream()
return errors.New("flag config stream api corrupt data, cause: " + err.Error())
}
if (onInitUpdate != nil) {
if onInitUpdate != nil {
err = onInitUpdate(flags)
} else if (onUpdate != nil) {
} else if onUpdate != nil {
err = onUpdate(flags)
}
if (err != nil) {
if err != nil {
closeStream()
return err
}
Expand All @@ -126,33 +126,33 @@ func (api *flagConfigStreamApiV2) Connect(
api.lock.Lock()
defer api.lock.Unlock()
closeStream()
if (api.stopCh == stopCh) {
if api.stopCh == stopCh {
api.stopCh = nil
}
close(stopCh)
if (onError != nil) {
if onError != nil {
onError(err)
}
}

// Retrieve and pass on message forever until stopCh closes.
go func() {
for {
select{
select {
case <-stopCh: // Channel returns immediately when closed. Note the local channel is referred here, so it's guaranteed to not be nil.
closeStream()
return
case msg := <-streamMsgCh:
// Parse message and verify data correct.
flags, err := parseData(msg.data)
if (err != nil) {
if err != nil {
// Error, close everything.
closeAllAndNotify(errors.New("stream corrupt data, cause: " + err.Error()))
return
}
if (onUpdate != nil) {
if onUpdate != nil {
// Deliver async. Don't care about any errors.
go func() {onUpdate(flags)}()
go func() { onUpdate(flags) }()
}
case err := <-streamErrCh:
// Error, close everything.
Expand Down Expand Up @@ -181,7 +181,7 @@ func parseData(data []byte) (map[string]*evaluation.Flag, error) {
}

func (api *flagConfigStreamApiV2) closeInternal() {
if (api.stopCh != nil) {
if api.stopCh != nil {
close(api.stopCh)
api.stopCh = nil
}
Expand All @@ -191,4 +191,4 @@ func (api *flagConfigStreamApiV2) Close() {
defer api.lock.Unlock()

api.closeInternal()
}
}
Loading

0 comments on commit a0b51e1

Please sign in to comment.