Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close consumer group #3815

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *consumer) subscribe(ctx context.Context) {
for {
select {
case <-ctx.Done():
c.group.Close()
return
default:
}
Expand Down
85 changes: 85 additions & 0 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
package pubsub

import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"connectrpc.com/connect"
"github.com/IBM/sarama"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/exec"
in "github.com/block/ftl/internal/integration"
"github.com/block/ftl/internal/model"
)
Expand Down Expand Up @@ -84,6 +88,68 @@ func TestExternalPublishRuntimeCheck(t *testing.T) {
)
}

// TestConsumerGroupMembership tests that when a runner ends, the consumer group is properly exited.
func TestConsumerGroupMembership(t *testing.T) {
var deploymentKilledTime *time.Time
in.Run(t,
in.WithLanguages("go"),
in.WithPubSub(),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// consumer group must now have a member for each partition
checkGroupMembership("subscriber", "consumeSlow", 1),

// publish events that will take a long time to process on the first subscriber deployment
// to test that rebalancing doesnt cause consumption to fail and skip events
in.Repeat(100, in.Call("publisher", "publishSlow", in.Obj{}, func(t testing.TB, resp in.Obj) {})),

// Upgrade deployment
func(t testing.TB, ic in.TestContext) {
in.Infof("Modifying code")
path := filepath.Join(ic.WorkingDir(), "subscriber", "subscriber.go")

bytes, err := os.ReadFile(path)
assert.NoError(t, err)
output := strings.ReplaceAll(string(bytes), "This deployment is TheFirstDeployment", "This deployment is TheSecondDeployment")
assert.NoError(t, os.WriteFile(path, []byte(output), 0644))
},
in.Deploy("subscriber"),

// Currently old deployment runs for a little bit longer.
// During this time we expect the consumer group to have 2 members (old deployment and new deployment).
// This will probably change when we have proper draining of the old deployment.
checkGroupMembership("subscriber", "consumeSlow", 2),
func(t testing.TB, ic in.TestContext) {
in.Infof("Waiting for old deployment to be killed")
start := time.Now()
for {
assert.True(t, time.Since(start) < 15*time.Second)
ps, err := exec.Capture(ic.Context, ".", "ftl", "ps")
assert.NoError(t, err)
if strings.Count(string(ps), "dpl-subscriber-") == 1 {
// original deployment has ended
now := time.Now()
deploymentKilledTime = &now
return
}
}
},
// Once old deployment has ended, the consumer group should only have 1 member per partition (the new deployment)
// This should happen fairly quickly. If it takes a while it could be because the previous deployment did not close
// the group properly.
checkGroupMembership("subscriber", "consumeSlow", 1),
func(t testing.TB, ic in.TestContext) {
assert.True(t, time.Since(*deploymentKilledTime) < 3*time.Second, "make sure old deployment was removed from consumer group fast enough")
},

// confirm that each message was consumed successfully
checkConsumed("subscriber", "consumeSlow", true, 100, optional.None[string]()),
)
}

func publishToTestAndLocalTopics(calls int) in.Action {
// do this in parallel because we want to test race conditions
return func(t testing.TB, ic in.TestContext) {
Expand Down Expand Up @@ -161,3 +227,22 @@ func checkConsumed(module, verb string, success bool, count int, needle optional
}
}
}
func checkGroupMembership(module, subscription string, expectedCount int) in.Action {
return func(t testing.TB, ic in.TestContext) {
consumerGroup := module + "." + subscription
in.Infof("Checking group membership for %v", consumerGroup)

client, err := sarama.NewClient(in.RedPandaBrokers, sarama.NewConfig())
assert.NoError(t, err)
defer client.Close()

clusterAdmin, err := sarama.NewClusterAdminFromClient(client)
assert.NoError(t, err)
defer clusterAdmin.Close()

groups, err := clusterAdmin.DescribeConsumerGroups([]string{consumerGroup})
assert.NoError(t, err)
assert.Equal(t, len(groups), 1)
assert.Equal(t, len(groups[0].Members), expectedCount, "expected consumer group %v to have %v members", consumerGroup, expectedCount)
}
}
21 changes: 17 additions & 4 deletions backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
// Import the FTL SDK.
)

type PubSubEvent struct {
Time time.Time
Haystack string
}

type PartitionMapper struct{}

var _ ftl.TopicPartitionMap[PubSubEvent] = PartitionMapper{}
Expand All @@ -21,10 +26,8 @@ type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type LocalTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type PubSubEvent struct {
Time time.Time
Haystack string
}
//ftl:export
type SlowTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

//ftl:verb
func PublishTen(ctx context.Context, topic TestTopic) error {
Expand Down Expand Up @@ -87,3 +90,13 @@ func Local(ctx context.Context, event PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time)
return nil
}

//ftl:verb
func PublishSlow(ctx context.Context, topic SlowTopic) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing to slowTopic: %v", t)
return topic.Publish(ctx, PubSubEvent{
Time: t,
})
}
26 changes: 16 additions & 10 deletions backend/runner/pubsub/testdata/go/publisher/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions backend/runner/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package subscriber
import (
"context"
"fmt"
"strings"
"time"

"ftl/publisher"
Expand Down Expand Up @@ -38,3 +39,16 @@ func PublishToExternalModule(ctx context.Context) error {
externalTopic := ftl.TopicHandle[publisher.PubSubEvent, ftl.SinglePartitionMap[publisher.PubSubEvent]]{Ref: reflection.Ref{Module: "publisher", Name: "testTopic"}.ToSchema()}
return externalTopic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
}

//ftl:verb
//ftl:subscribe publisher.slowTopic from=beginning
func ConsumeSlow(ctx context.Context, req publisher.PubSubEvent) error {
versionDescription := "This deployment is TheFirstDeployment"
if strings.Contains(versionDescription, "TheFirstDeployment") {
ftl.LoggerFromContext(ctx).Infof("ConsumeSlow first deployment (will sleep 5s): %v", req.Time)
time.Sleep(5 * time.Second)
return nil
}
ftl.LoggerFromContext(ctx).Infof("ConsumeSlow second deployment (immediate): %v", req.Time)
return nil
}
5 changes: 5 additions & 0 deletions backend/runner/pubsub/testdata/go/subscriber/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (

const dumpPath = "/tmp/ftl-kube-report"

var redPandaBrokers = []string{"127.0.0.1:19092"}
var RedPandaBrokers = []string{"127.0.0.1:19092"}

func (i TestContext) integrationTestTimeout() time.Duration {
timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s")
Expand Down Expand Up @@ -415,7 +415,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
err = exec.CommandWithEnv(ctx, log.Debug, rootDir, envars, "docker", "compose", "-f", "internal/dev/docker-compose.redpanda.yml", "-p", "ftl", "up", "-d", "--wait").RunBuffered(ctx)
assert.NoError(t, err)

client, err := sarama.NewClient(redPandaBrokers, sarama.NewConfig())
client, err := sarama.NewClient(RedPandaBrokers, sarama.NewConfig())
assert.NoError(t, err)
defer client.Close()

Expand Down
Loading