diff --git a/backend/runner/pubsub/integration_test.go b/backend/runner/pubsub/integration_test.go index 8fe16db851..b15049c36e 100644 --- a/backend/runner/pubsub/integration_test.go +++ b/backend/runner/pubsub/integration_test.go @@ -3,11 +3,13 @@ package pubsub import ( + "encoding/json" "strings" "testing" "time" "connectrpc.com/connect" + "github.com/IBM/sarama" timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1" "github.com/TBD54566975/ftl/common/slices" in "github.com/TBD54566975/ftl/internal/integration" @@ -41,6 +43,7 @@ func TestRetry(t *testing.T) { retriesPerCall := 2 in.Run(t, in.WithLanguages("java", "go"), + in.WithPubSub(), in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), @@ -72,6 +75,46 @@ func TestExternalPublishRuntimeCheck(t *testing.T) { ) } +// TestConsumerGroupMembership tests that when a runner ends, the consumer group is properly exited. +func TestConsumerGroupMembership(t *testing.T) { + in.Run(t, + in.WithLanguages("java", "go"), + in.WithPubSub(), + in.CopyModule("publisher"), + in.CopyModule("subscriber"), + in.Deploy("publisher"), + in.Deploy("subscriber"), + + // consumer group must now have a member + checkGroupMembership("subscriber", "consume", true), + + // Stop subscriber deployment + func(t testing.TB, ic in.TestContext) { + in.ExecWithOutput("ftl", []string{"ps", "--json"}, func(jsonStr string) { + // parse newline delimted json + decoder := json.NewDecoder(strings.NewReader(jsonStr)) + for decoder.More() { + var deployment map[string]any + if err := decoder.Decode(&deployment); err != nil { + assert.NoError(t, err) + } + depName, ok := deployment["deployment"].(string) + assert.True(t, ok) + if strings.Contains(depName, "subscriber") { + in.Exec("ftl", "kill", depName)(t, ic) + return + } + } + assert.True(t, false, "subscriber deployment not found") + })(t, ic) + }, + + in.Sleep(time.Second*2), + + in.WithoutRetries(checkGroupMembership("subscriber", "consume", false)), + ) +} + func checkConsumed(module, verb string, success bool, count int, needle optional.Option[string]) in.Action { return func(t testing.TB, ic in.TestContext) { if needle, ok := needle.Get(); ok { @@ -130,3 +173,27 @@ func checkConsumed(module, verb string, success bool, count int, needle optional } } } + +func checkGroupMembership(module, subscription string, expected bool) in.Action { + return func(t testing.TB, ic in.TestContext) { + consumerGroup := module + "." + subscription + in.Infof("Checking group membership for %v", consumerGroup) + + client, err := sarama.NewClient(in.RedPandaBrokers, sarama.NewConfig()) + assert.NoError(t, err) + defer client.Close() + + clusterAdmin, err := sarama.NewClusterAdminFromClient(client) + assert.NoError(t, err) + defer clusterAdmin.Close() + + groups, err := clusterAdmin.DescribeConsumerGroups([]string{consumerGroup}) + assert.NoError(t, err) + assert.Equal(t, len(groups), 1) + if expected { + assert.True(t, len(groups[0].Members) > 0, "expected consumer group %v to have members", consumerGroup) + } else { + assert.False(t, len(groups[0].Members) > 0, "expected consumer group %v to have no members", consumerGroup) + } + } +} diff --git a/internal/integration/actions.go b/internal/integration/actions.go index 1808be05f3..20fd649955 100644 --- a/internal/integration/actions.go +++ b/internal/integration/actions.go @@ -24,6 +24,7 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" // SQL driver "github.com/kballard/go-shellquote" "github.com/otiai10/copy" + "github.com/puzpuzpuz/xsync/v3" "k8s.io/client-go/kubernetes" "github.com/block/scaffolder" @@ -307,6 +308,19 @@ func Sleep(duration time.Duration) Action { } } +func WithoutRetries(action Action) Action { + attempted := xsync.NewCounter() + return func(t testing.TB, ic TestContext) { + attempted.Add(1) + if attempted.Value() > 1 { + // t.Fatal("action does not support retries") + panic("action does not support retries") + } + // assert.Equal(t, attempted.Value(), 1, "action does not support retries; see original attempt's failure") + action(t, ic) + } +} + // Assert that a file exists in the working directory. func FileExists(path string) Action { return func(t testing.TB, ic TestContext) { diff --git a/internal/integration/harness.go b/internal/integration/harness.go index 46fd6c55ed..6ec88d7104 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -45,7 +45,7 @@ import ( const dumpPath = "/tmp/ftl-kube-report" -var redPandaBrokers = []string{"127.0.0.1:19092"} +var RedPandaBrokers = []string{"127.0.0.1:19092"} func (i TestContext) integrationTestTimeout() time.Duration { timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s") @@ -415,7 +415,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { err = exec.CommandWithEnv(ctx, log.Debug, rootDir, envars, "docker", "compose", "-f", "internal/dev/docker-compose.redpanda.yml", "-p", "ftl", "up", "-d", "--wait").RunBuffered(ctx) assert.NoError(t, err) - client, err := sarama.NewClient(redPandaBrokers, sarama.NewConfig()) + client, err := sarama.NewClient(RedPandaBrokers, sarama.NewConfig()) assert.NoError(t, err) defer client.Close()