Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flaky pubsub test #3780

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,31 @@ func (c *consumer) kafkaTopicID() string {

func (c *consumer) Begin(ctx context.Context) error {
// set up config
logger := log.FromContext(ctx).Scope("subscription:" + c.verb.Name)
ctx = log.ContextWithLogger(ctx, logger)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true

var fromOffsetStr string
switch c.subscriber.FromOffset {
case schema.FromOffsetBeginning, schema.FromOffsetUnspecified:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
fromOffsetStr = "beginning"
case schema.FromOffsetLatest:
config.Consumer.Offsets.Initial = sarama.OffsetNewest
fromOffsetStr = "latest"
}

groupID := kafkaConsumerGroupID(c.moduleName, c.verb)
log.FromContext(ctx).Infof("Subscribing to topic %s for %s with offset %v", c.kafkaTopicID(), groupID, config.Consumer.Offsets.Initial)
logger.Debugf("Subscribing to %s from %s", c.kafkaTopicID(), fromOffsetStr)

group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err)
}

go c.watchErrors(ctx, group)
go c.subscribe(ctx, group)
return nil
Expand All @@ -101,7 +110,7 @@ func (c *consumer) watchErrors(ctx context.Context, group sarama.ConsumerGroup)
case <-ctx.Done():
return
case err := <-group.Errors():
logger.Errorf(err, "error in consumer group %s", c.verb.Name)
logger.Errorf(err, "Consumer group error")
}
}
}
Expand All @@ -122,9 +131,9 @@ func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) {

err := group.Consume(ctx, []string{c.kafkaTopicID()}, c)
if err != nil {
logger.Errorf(err, "consume session failed for %s", c.verb.Name)
logger.Errorf(err, "Session failed for %s", c.verb.Name)
} else {
logger.Debugf("Ending consume session for subscription %s", c.verb.Name)
logger.Debugf("Ending session")
}
}
}
Expand All @@ -133,8 +142,8 @@ func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) {
func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
logger := log.FromContext(session.Context())

partitions := session.Claims()[kafkaConsumerGroupID(c.moduleName, c.verb)]
logger.Debugf("Starting consume session for subscription %s with partitions: [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))
partitions := session.Claims()[c.kafkaTopicID()]
logger.Debugf("Starting session with partitions [%v]", strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))

return nil
}
Expand All @@ -151,19 +160,20 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
ctx := session.Context()
logger := log.FromContext(ctx)
for msg := range claim.Messages() {
logger.Debugf("%s: consuming message %v:%v", c.verb.Name, msg.Partition, msg.Offset)
logger.Debugf("Consuming message with partition %v and offset %v)", msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
logger.Errorf(err, "Error consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
break
}
if remainingRetries == 0 {
logger.Errorf(err, "%s: failed to consume message %v:%v", c.verb.Name, msg.Partition, msg.Offset)
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
break
}
logger.Errorf(err, "%s: failed to consume message %v:%v: retrying in %vs", c.verb.Name, msg.Partition, msg.Offset, int(backoff.Seconds()))
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
Expand Down
31 changes: 28 additions & 3 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package pubsub

import (
"strings"
"sync"
"testing"
"time"

Expand All @@ -17,7 +18,6 @@ import (
)

func TestPubSub(t *testing.T) {
t.Skip("Skipping flaky test")
calls := 20
events := calls * 10
in.Run(t,
Expand All @@ -27,17 +27,23 @@ func TestPubSub(t *testing.T) {
in.CopyModule("subscriber"),
in.Deploy("publisher"),

// After a deployment is "ready" it can take a second before a consumer group claims partitions.
// "publisher.local" has "from=latest" so we need that group to be ready before we start publishing
// otherwise it will start from the latest offset after claiming partitions.
in.Sleep(time.Second*1),

// publish half the events before subscriber is deployed
in.Repeat(calls/2, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),
publishToTestAndLocalTopics(calls/2),

in.Deploy("subscriber"),

// publish the other half of the events after subscriber is deployed
in.Repeat(calls/2, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),
publishToTestAndLocalTopics(calls/2),

in.Sleep(time.Second*4),

// check that there are the right amount of consumed events, depending on "from" offset option
checkConsumed("publisher", "local", true, events, optional.None[string]()),
checkConsumed("subscriber", "consume", true, events, optional.None[string]()),
checkConsumed("subscriber", "consumeFromLatest", true, events/2, optional.None[string]()),
)
Expand Down Expand Up @@ -78,6 +84,25 @@ func TestExternalPublishRuntimeCheck(t *testing.T) {
)
}

func publishToTestAndLocalTopics(calls int) in.Action {
// do this in parallel because we want to test race conditions
return func(t testing.TB, ic in.TestContext) {
actions := []in.Action{
in.Repeat(calls, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),
in.Repeat(calls, in.Call("publisher", "publishTenLocal", in.Obj{}, func(t testing.TB, resp in.Obj) {})),
}
wg := &sync.WaitGroup{}
for _, action := range actions {
wg.Add(1)
go func() {
action(t, ic)
wg.Done()
}()
}
wg.Wait()
}
}

func checkConsumed(module, verb string, success bool, count int, needle optional.Option[string]) in.Action {
return func(t testing.TB, ic in.TestContext) {
if needle, ok := needle.Get(); ok {
Expand Down
4 changes: 4 additions & 0 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/rpc"
)
Expand Down Expand Up @@ -51,6 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey
}

func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error {
logger := log.FromContext(ctx).Scope("topic:" + p.topic.Name)
requestKey, err := rpc.RequestKeyFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to get request key: %w", err)
Expand All @@ -76,10 +78,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller
})
if err != nil {
timelineEvent.Error = optional.Some(err.Error())
logger.Errorf(err, "Failed to publish message")
return fmt.Errorf("failed to publish message: %w", err)
}
timelineEvent.Partition = int(partition)
timelineEvent.Offset = int(offset)
p.timelineClient.Publish(ctx, timelineEvent)
logger.Debugf("Published to partition %v with offset %v)", partition, offset)
return nil
}
25 changes: 24 additions & 1 deletion backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func (PartitionMapper) PartitionKey(event PubSubEvent) string {
//ftl:export
type TestTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type LocalTopic = ftl.TopicHandle[PubSubEvent, PartitionMapper]

type PubSubEvent struct {
Time time.Time
Haystack string
Expand All @@ -29,7 +31,7 @@ func PublishTen(ctx context.Context, topic TestTopic) error {
logger := ftl.LoggerFromContext(ctx)
for i := 0; i < 10; i++ {
t := time.Now()
logger.Infof("Publishing %v", t)
logger.Infof("Publishing to testTopic: %v", t)
err := topic.Publish(ctx, PubSubEvent{Time: t})
if err != nil {
return err
Expand All @@ -46,6 +48,20 @@ func PublishOne(ctx context.Context, topic TestTopic) error {
return topic.Publish(ctx, PubSubEvent{Time: t})
}

//ftl:verb
func PublishTenLocal(ctx context.Context, topic LocalTopic) error {
logger := ftl.LoggerFromContext(ctx)
for i := 0; i < 10; i++ {
t := time.Now()
logger.Infof("Publishing to localTopic: %v", t)
err := topic.Publish(ctx, PubSubEvent{Time: t})
if err != nil {
return err
}
}
return nil
}

//ftl:export
type Topic2 = ftl.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]

Expand All @@ -64,3 +80,10 @@ func PublishOneToTopic2(ctx context.Context, req PublishOneToTopic2Request, topi
Haystack: req.Haystack,
})
}

//ftl:verb
//ftl:subscribe testTopic from=latest
func Local(ctx context.Context, event PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time)
return nil
}
17 changes: 14 additions & 3 deletions backend/runner/pubsub/testdata/go/publisher/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions backend/runner/pubsub/testdata/go/slow/ftl.toml

This file was deleted.

67 changes: 0 additions & 67 deletions backend/runner/pubsub/testdata/go/slow/go.mod

This file was deleted.

Loading
Loading