Skip to content

Commit

Permalink
add stream client test, fix stream concurrency, prioritize ctx check
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Oct 5, 2024
1 parent 77f57a1 commit 2e93085
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 80 deletions.
157 changes: 157 additions & 0 deletions pkg/experiment/local/client_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package local

import (
"log"
"os"
"testing"

"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
)

var streamClient *Client

func init() {
err := godotenv.Load()
if err != nil {
log.Printf("Error loading .env file: %v", err)
}
projectApiKey := os.Getenv("API_KEY")
secretKey := os.Getenv("SECRET_KEY")
cohortSyncConfig := CohortSyncConfig{
ApiKey: projectApiKey,
SecretKey: secretKey,
}
streamClient = Initialize("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz",
&Config{
StreamUpdates: true,
StreamServerUrl: "https://stream.lab.amplitude.com",
CohortSyncConfig: &cohortSyncConfig,
})
err = streamClient.Start()
if err != nil {
panic(err)
}
}

func TestMakeSureStreamEnabled(t *testing.T) {
assert.True(t, streamClient.config.StreamUpdates)
}

func TestStreamEvaluate(t *testing.T) {
user := &experiment.User{UserId: "test_user"}
result, err := streamClient.Evaluate(user, nil)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Payload != "payload" {
t.Fatalf("Unexpected variant %v", variant)
}
variant = result["sdk-ci-test"]
if variant.Key != "" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamEvaluateV2AllFlags(t *testing.T) {
user := &experiment.User{UserId: "test_user"}
result, err := streamClient.EvaluateV2(user, nil)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Payload != "payload" {
t.Fatalf("Unexpected variant %v", variant)
}
variant = result["sdk-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamFlagMetadataLocalFlagKey(t *testing.T) {
md := streamClient.FlagMetadata("sdk-local-evaluation-ci-test")
if md["evaluationMode"] != "local" {
t.Fatalf("Unexpected metadata %v", md)
}
}

func TestStreamEvaluateV2Cohort(t *testing.T) {
targetedUser := &experiment.User{UserId: "12345"}
nonTargetedUser := &experiment.User{UserId: "not_targeted"}
flagKeys := []string{"sdk-local-evaluation-user-cohort-ci-test"}
result, err := streamClient.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-user-cohort-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = streamClient.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-user-cohort-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamEvaluateV2GroupCohort(t *testing.T) {
targetedUser := &experiment.User{
UserId: "12345",
DeviceId: "device_id",
Groups: map[string][]string{
"org id": {"1"},
}}
nonTargetedUser := &experiment.User{
UserId: "12345",
DeviceId: "device_id",
Groups: map[string][]string{
"org id": {"not_targeted"},
}}
flagKeys := []string{"sdk-local-evaluation-group-cohort-ci-test"}
result, err := streamClient.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-group-cohort-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = streamClient.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-group-cohort-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}
49 changes: 0 additions & 49 deletions pkg/experiment/local/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"log"
"os"
"testing"
"time"

"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
Expand Down Expand Up @@ -232,51 +231,3 @@ func TestEvaluateV2GroupCohort(t *testing.T) {
t.Fatalf("Unexpected variant %v", variant)
}
}


func TestMain(t *testing.T) {
client := Initialize("server-tUTqR62DZefq7c73zMpbIr1M5VDtwY8T", &Config{ServerUrl: "noserver", StreamUpdates: true, StreamServerUrl: "https://skylab-stream.stag2.amplitude.com"})
client.Start()
println(client.flagConfigStorage.getFlagConfigs(), len(client.flagConfigStorage.getFlagConfigs()))
time.Sleep(2000 * time.Millisecond)
println(client.flagConfigStorage.getFlagConfigs(), len(client.flagConfigStorage.getFlagConfigs()))

// connTimeout := 1500 * time.Millisecond
// api := NewFlagConfigStreamApiV2("server-tUTqR62DZefq7c73zMpbIr1M5VDtwY8T", "https://skylab-stream.stag2.amplitude.com", connTimeout)
// cohortStorage := newInMemoryCohortStorage()
// flagConfigStorage := newInMemoryFlagConfigStorage()
// dr := newDeploymentRunner(
// DefaultConfig,
// NewFlagConfigApiV2("server-tUTqR62DZefq7c73zMpbIr1M5VDtwY8T", "https://skylab-api.staging.amplitude.com", connTimeout),
// api,
// flagConfigStorage, cohortStorage, nil)
// println("inited")
// // time.Sleep(5000 * time.Millisecond)
// dr.start()

// for {
// fmt.Printf("%v+\n", time.Now())
// fmt.Println(flagConfigStorage.GetFlagConfigs())
// time.Sleep(5000 * time.Millisecond)
// fmt.Println(flagConfigStorage.GetFlagConfigs())
// }

// if len(os.Args) < 2 {
// fmt.Printf("error: command required\n")
// fmt.Printf("Available commands:\n" +
// " fetch\n" +
// " flags\n" +
// " evaluate\n")
// return
// }
// switch os.Args[1] {
// case "fetch":
// fetch()
// case "flags":
// flags()
// case "evaluate":
// evaluate()
// default:
// fmt.Printf("error: unknown sub-command '%v'", os.Args[1])
// }
}
4 changes: 2 additions & 2 deletions pkg/experiment/local/deployment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func newDeploymentRunner(
cohortStorage cohortStorage,
cohortLoader *cohortLoader,
) *deploymentRunner {
flagConfigUpdater := NewFlagConfigFallbackRetryWrapper(NewFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter)
flagConfigUpdater := NewFlagConfigFallbackRetryWrapper(NewFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug)
if (flagConfigStreamApi != nil) {
flagConfigUpdater = NewFlagConfigFallbackRetryWrapper(NewFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter)
flagConfigUpdater = NewFlagConfigFallbackRetryWrapper(NewFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug)
}
dr := &deploymentRunner{
config: config,
Expand Down
14 changes: 9 additions & 5 deletions pkg/experiment/local/flag_config_updater.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package local

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -251,6 +250,7 @@ func (p *flagConfigPoller) Stop() {
// A wrapper around flag config updaters to retry and fallback.
// If the main updater fails, it will fallback to the fallback updater and main updater enters retry loop.
type FlagConfigFallbackRetryWrapper struct {
log *logger.Log
mainUpdater flagConfigUpdater
fallbackUpdater flagConfigUpdater
retryDelay time.Duration
Expand All @@ -263,8 +263,10 @@ func NewFlagConfigFallbackRetryWrapper(
fallbackUpdater flagConfigUpdater,
retryDelay time.Duration,
maxJitter time.Duration,
debug bool,
) flagConfigUpdater {
return &FlagConfigFallbackRetryWrapper{
log: logger.New(debug),
mainUpdater: mainUpdater,
fallbackUpdater: fallbackUpdater,
retryDelay: retryDelay,
Expand All @@ -291,7 +293,8 @@ func (w *FlagConfigFallbackRetryWrapper) Start(onError func (error)) error {
w.retryTimer = nil
}

err := w.mainUpdater.Start(func (error) {
err := w.mainUpdater.Start(func (err error) {
w.log.Error("main updater updating err, starting fallback if available. error: ", err)
go func() {w.scheduleRetry()}() // Don't care if poller start error or not, always retry.
if (w.fallbackUpdater != nil) {
w.fallbackUpdater.Start(nil)
Expand All @@ -304,14 +307,14 @@ func (w *FlagConfigFallbackRetryWrapper) Start(onError func (error)) error {
}
return nil
}
fmt.Println("main start err", err)
// Logger.e("Primary flag configs start failed, start fallback. Error: ", t)
w.log.Debug("main updater start err, starting fallback. error: ", err)
if (w.fallbackUpdater == nil) {
// No fallback, main start failed is wrapper start fail
return err
}
err = w.fallbackUpdater.Start(nil)
if (err != nil) {
w.log.Debug("fallback updater start failed. error: ", err)
return err
}

Expand Down Expand Up @@ -349,6 +352,7 @@ func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() {
w.retryTimer = nil
}

w.log.Debug("main updater retry start")
err := w.mainUpdater.Start(func (error) {
go func() {w.scheduleRetry()}() // Don't care if poller start error or not, always retry.
if (w.fallbackUpdater != nil) {
Expand All @@ -357,12 +361,12 @@ func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() {
})
if (err == nil) {
// Main start success, stop fallback.
w.log.Debug("main updater retry start success")
if (w.fallbackUpdater != nil) {
w.fallbackUpdater.Stop()
}
return
}
fmt.Println("retrying failed", err)

go func() {w.scheduleRetry()}()
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/experiment/local/flag_config_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestFlagConfigFallbackRetryWrapper(t *testing.T) {
}
fallback.stopFunc = func () {
}
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0)
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true)
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
Expand All @@ -292,7 +292,7 @@ func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) {
}
fallback.stopFunc = func () {
}
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0)
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true)
err := w.Start(nil)
assert.Equal(t, errors.New("fallback start error"), err)
assert.NotNil(t, mainOnError)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T
fallback.stopFunc = func () {
go func() {fallbackStopCh <- true} ()
}
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0)
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true)
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) {
return nil
}
fallback.stopFunc = func () {}
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0)
w := NewFlagConfigFallbackRetryWrapper(&main, &fallback, 1 * time.Second, 0, true)
// Start success
err := w.Start(nil)
assert.Nil(t, err)
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) {
main.stopFunc = func () {
mainOnError = nil
}
w := NewFlagConfigFallbackRetryWrapper(&main, nil, 1 * time.Second, 0)
w := NewFlagConfigFallbackRetryWrapper(&main, nil, 1 * time.Second, 0, true)
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
Expand Down
Loading

0 comments on commit 2e93085

Please sign in to comment.