diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index ea9d8638a..51f14a00b 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -126,6 +126,7 @@ func (c *consumer) subscribe(ctx context.Context) { for { select { case <-ctx.Done(): + c.group.Close() return default: } diff --git a/backend/runner/pubsub/integration_test.go b/backend/runner/pubsub/integration_test.go index c628799df..20e3d23d7 100644 --- a/backend/runner/pubsub/integration_test.go +++ b/backend/runner/pubsub/integration_test.go @@ -3,16 +3,20 @@ package pubsub import ( + "os" + "path/filepath" "strings" "sync" "testing" "time" "connectrpc.com/connect" + "github.com/IBM/sarama" "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/optional" timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1" "github.com/block/ftl/common/slices" + "github.com/block/ftl/internal/exec" in "github.com/block/ftl/internal/integration" "github.com/block/ftl/internal/model" ) @@ -84,6 +88,68 @@ func TestExternalPublishRuntimeCheck(t *testing.T) { ) } +// TestConsumerGroupMembership tests that when a runner ends, the consumer group is properly exited. +func TestConsumerGroupMembership(t *testing.T) { + var deploymentKilledTime *time.Time + in.Run(t, + in.WithLanguages("go"), + in.WithPubSub(), + in.CopyModule("publisher"), + in.CopyModule("subscriber"), + in.Deploy("publisher"), + in.Deploy("subscriber"), + + // consumer group must now have a member for each partition + checkGroupMembership("subscriber", "consumeSlow", 1), + + // publish events that will take a long time to process on the first subscriber deployment + // to test that rebalancing doesnt cause consumption to fail and skip events + in.Repeat(100, in.Call("publisher", "publishSlow", in.Obj{}, func(t testing.TB, resp in.Obj) {})), + + // Upgrade deployment + func(t testing.TB, ic in.TestContext) { + in.Infof("Modifying code") + path := filepath.Join(ic.WorkingDir(), "subscriber", "subscriber.go") + + bytes, err := os.ReadFile(path) + assert.NoError(t, err) + output := strings.ReplaceAll(string(bytes), "This deployment is TheFirstDeployment", "This deployment is TheSecondDeployment") + assert.NoError(t, os.WriteFile(path, []byte(output), 0644)) + }, + in.Deploy("subscriber"), + + // Currently old deployment runs for a little bit longer. + // During this time we expect the consumer group to have 2 members (old deployment and new deployment). + // This will probably change when we have proper draining of the old deployment. + checkGroupMembership("subscriber", "consumeSlow", 2), + func(t testing.TB, ic in.TestContext) { + in.Infof("Waiting for old deployment to be killed") + start := time.Now() + for { + assert.True(t, time.Since(start) < 15*time.Second) + ps, err := exec.Capture(ic.Context, ".", "ftl", "ps") + assert.NoError(t, err) + if strings.Count(string(ps), "dpl-subscriber-") == 1 { + // original deployment has ended + now := time.Now() + deploymentKilledTime = &now + return + } + } + }, + // Once old deployment has ended, the consumer group should only have 1 member per partition (the new deployment) + // This should happen fairly quickly. If it takes a while it could be because the previous deployment did not close + // the group properly. + checkGroupMembership("subscriber", "consumeSlow", 1), + func(t testing.TB, ic in.TestContext) { + assert.True(t, time.Since(*deploymentKilledTime) < 3*time.Second, "make sure old deployment was removed from consumer group fast enough") + }, + + // confirm that each message was consumed successfully + checkConsumed("subscriber", "consumeSlow", true, 100, optional.None[string]()), + ) +} + func publishToTestAndLocalTopics(calls int) in.Action { // do this in parallel because we want to test race conditions return func(t testing.TB, ic in.TestContext) { @@ -161,3 +227,22 @@ func checkConsumed(module, verb string, success bool, count int, needle optional } } } +func checkGroupMembership(module, subscription string, expectedCount int) in.Action { + return func(t testing.TB, ic in.TestContext) { + consumerGroup := module + "." + subscription + in.Infof("Checking group membership for %v", consumerGroup) + + client, err := sarama.NewClient(in.RedPandaBrokers, sarama.NewConfig()) + assert.NoError(t, err) + defer client.Close() + + clusterAdmin, err := sarama.NewClusterAdminFromClient(client) + assert.NoError(t, err) + defer clusterAdmin.Close() + + groups, err := clusterAdmin.DescribeConsumerGroups([]string{consumerGroup}) + assert.NoError(t, err) + assert.Equal(t, len(groups), 1) + assert.Equal(t, len(groups[0].Members), expectedCount, "expected consumer group %v to have %v members", consumerGroup, expectedCount) + } +} diff --git a/backend/runner/pubsub/testdata/go/publisher/publisher.go b/backend/runner/pubsub/testdata/go/publisher/publisher.go index 8ec3213b5..6316561f5 100644 --- a/backend/runner/pubsub/testdata/go/publisher/publisher.go +++ b/backend/runner/pubsub/testdata/go/publisher/publisher.go @@ -8,6 +8,11 @@ import ( // Import the FTL SDK. ) +type PubSubEvent struct { + Time time.Time + Haystack string +} + type PartitionMapper struct{} var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{} @@ -21,10 +26,8 @@ type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper] type LocalTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper] -type PubSubEvent struct { - Time time.Time - Haystack string -} +//ftl:export +type SlowTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper] //ftl:verb func PublishTen(ctx context.Context, topic TestTopic) error { @@ -87,3 +90,13 @@ func Local(ctx context.Context, event PubSubEvent) error { ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time) return nil } + +//ftl:verb +func PublishSlow(ctx context.Context, topic SlowTopic) error { + logger := ftl.LoggerFromContext(ctx) + t := time.Now() + logger.Infof("Publishing to slowTopic: %v", t) + return topic.Publish(ctx, PubSubEvent{ + Time: t, + }) +} diff --git a/backend/runner/pubsub/testdata/go/publisher/types.ftl.go b/backend/runner/pubsub/testdata/go/publisher/types.ftl.go index d5ce050b7..e38f0d244 100644 --- a/backend/runner/pubsub/testdata/go/publisher/types.ftl.go +++ b/backend/runner/pubsub/testdata/go/publisher/types.ftl.go @@ -2,10 +2,10 @@ package publisher import ( - "context" - "github.com/block/ftl/common/reflection" - "github.com/block/ftl/go-runtime/ftl" - "github.com/block/ftl/go-runtime/server" + "context" + "github.com/block/ftl/common/reflection" + "github.com/block/ftl/go-runtime/ftl" + "github.com/block/ftl/go-runtime/server" ) type LocalClient func(context.Context, PubSubEvent) error @@ -14,6 +14,8 @@ type PublishOneClient func(context.Context) error type PublishOneToTopic2Client func(context.Context, PublishOneToTopic2Request) error +type PublishSlowClient func(context.Context) error + type PublishTenClient func(context.Context) error type PublishTenLocalClient func(context.Context) error @@ -21,23 +23,27 @@ type PublishTenLocalClient func(context.Context) error func init() { reflection.Register( reflection.ProvideResourcesForVerb( - Local, + Local, ), reflection.ProvideResourcesForVerb( - PublishOne, + PublishOne, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "testTopic"), ), reflection.ProvideResourcesForVerb( - PublishOneToTopic2, + PublishOneToTopic2, server.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]("publisher", "topic2"), ), reflection.ProvideResourcesForVerb( - PublishTen, + PublishSlow, + server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "slowTopic"), + ), + reflection.ProvideResourcesForVerb( + PublishTen, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "testTopic"), ), reflection.ProvideResourcesForVerb( - PublishTenLocal, + PublishTenLocal, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "localTopic"), ), ) -} +} \ No newline at end of file diff --git a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go index 208acf47d..8a6dbe5d6 100644 --- a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go +++ b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go @@ -3,6 +3,7 @@ package subscriber import ( "context" "fmt" + "strings" "time" "ftl/publisher" @@ -38,3 +39,16 @@ func PublishToExternalModule(ctx context.Context) error { externalTopic := ftl.TopicHandle[publisher.PubSubEvent, ftl.SinglePartitionMap[publisher.PubSubEvent]]{Ref: reflection.Ref{Module: "publisher", Name: "testTopic"}.ToSchema()} return externalTopic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()}) } + +//ftl:verb +//ftl:subscribe publisher.slowTopic from=beginning +func ConsumeSlow(ctx context.Context, req publisher.PubSubEvent) error { + versionDescription := "This deployment is TheFirstDeployment" + if strings.Contains(versionDescription, "TheFirstDeployment") { + ftl.LoggerFromContext(ctx).Infof("ConsumeSlow first deployment (will sleep 5s): %v", req.Time) + time.Sleep(5 * time.Second) + return nil + } + ftl.LoggerFromContext(ctx).Infof("ConsumeSlow second deployment (immediate): %v", req.Time) + return nil +} diff --git a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go index 47a3957bb..708227d14 100644 --- a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go +++ b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go @@ -13,6 +13,8 @@ type ConsumeButFailAndRetryClient func(context.Context, ftlpublisher.PubSubEvent type ConsumeFromLatestClient func(context.Context, ftlpublisher.PubSubEvent) error +type ConsumeSlowClient func(context.Context, ftlpublisher.PubSubEvent) error + type PublishToExternalModuleClient func(context.Context) error func init() { @@ -26,6 +28,9 @@ func init() { reflection.ProvideResourcesForVerb( ConsumeFromLatest, ), + reflection.ProvideResourcesForVerb( + ConsumeSlow, + ), reflection.ProvideResourcesForVerb( PublishToExternalModule, ), diff --git a/internal/integration/harness.go b/internal/integration/harness.go index c123de52c..327f9c89d 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -45,7 +45,7 @@ import ( const dumpPath = "/tmp/ftl-kube-report" -var redPandaBrokers = []string{"127.0.0.1:19092"} +var RedPandaBrokers = []string{"127.0.0.1:19092"} func (i TestContext) integrationTestTimeout() time.Duration { timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s") @@ -415,7 +415,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { err = exec.CommandWithEnv(ctx, log.Debug, rootDir, envars, "docker", "compose", "-f", "internal/dev/docker-compose.redpanda.yml", "-p", "ftl", "up", "-d", "--wait").RunBuffered(ctx) assert.NoError(t, err) - client, err := sarama.NewClient(redPandaBrokers, sarama.NewConfig()) + client, err := sarama.NewClient(RedPandaBrokers, sarama.NewConfig()) assert.NoError(t, err) defer client.Close()