Skip to content

Commit

Permalink
try to repro
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 17, 2024
1 parent 5e49706 commit 36feacd
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 9 deletions.
41 changes: 41 additions & 0 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ func (c *consumer) Begin(ctx context.Context) error {
groupID := kafkaConsumerGroupID(c.moduleName, c.verb)
log.FromContext(ctx).Infof("Subscribing to topic %s for %s with offset %v", c.kafkaTopicID(), groupID, config.Consumer.Offsets.Initial)

c.checkOffsets(ctx)
// if groupID == "subscriber.consume" {
// return nil
// }
group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err)
}
c.checkOffsets(ctx)

go c.watchErrors(ctx, group)
go c.subscribe(ctx, group)
return nil
Expand Down Expand Up @@ -225,3 +231,38 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int)
observability.Calls.Request(ctx, req.Verb, start, optional.None[string]())
return nil
}

func (c *consumer) checkOffsets(ctx context.Context) {
groupId := kafkaConsumerGroupID(c.moduleName, c.verb)

Check warning on line 236 in backend/runner/pubsub/consumer.go

View workflow job for this annotation

GitHub Actions / Lint

var-naming: var groupId should be groupID (revive)
logger := log.FromContext(ctx).Scope(groupId + ".offsets")
client, err := sarama.NewClient(c.verb.Runtime.Subscription.KafkaBrokers, sarama.NewConfig())
if err != nil {
logger.Errorf(err, "failed to create client")
}
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
logger.Errorf(err, "failed to create cluster admin")
return
}
groups, err := admin.DescribeConsumerGroups([]string{groupId})
if err != nil {
logger.Errorf(err, "failed to describe consumer groups")
return
}
if len(groups) == 0 {
logger.Infof("no consumer group found")
return
}
resp, err := admin.ListConsumerGroupOffsets(groupId, nil)
if err != nil {
logger.Errorf(err, "failed to list consumer group offsets")
return
}
outStrs := []string{}
for topic, partitions := range resp.Blocks {
for partition, block := range partitions {
outStrs = append(outStrs, fmt.Sprintf("%s:%d:%d", topic, partition, block.Offset))
}
}
logger.Infof("offsets:\n%s", strings.Join(outStrs, "\n"))
}
5 changes: 3 additions & 2 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
)

func TestPubSub(t *testing.T) {
t.Skip("Skipping flaky test")
// t.Skip("Skipping flaky test")
calls := 20
events := calls * 10
in.Run(t,
in.WithLanguages("java", "go", "kotlin"),
// in.WithLanguages("java", "go", "kotlin"),
in.WithLanguages("java"),
in.WithPubSub(),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
Expand Down
3 changes: 3 additions & 0 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/rpc"
)
Expand Down Expand Up @@ -76,10 +77,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller
})
if err != nil {
timelineEvent.Error = optional.Some(err.Error())
log.FromContext(ctx).Errorf(err, "Failed to publish message")
return fmt.Errorf("failed to publish message: %w", err)
}
timelineEvent.Partition = int(partition)
timelineEvent.Offset = int(offset)
p.timelineClient.Publish(ctx, timelineEvent)
log.FromContext(ctx).Infof("Published successfully (%v:%v:%v)", p.topic.Runtime.TopicID, partition, offset)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ public class Subscriber {

@Subscription(topic = TestTopicTopic.class, from = FromOffset.BEGINNING)
void consume(PubSubEvent event) throws Exception {
Log.infof("Subscriber is consuming %s", event.getTime());
Log.infof("consume: %s", event.getTime());
}

@Subscription(topic = TestTopicTopic.class, from = FromOffset.LATEST)
void consumeFromLatest(PubSubEvent event) throws Exception {
Log.infof("consumeFromLatest: %s", event.getTime());
}

@Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ class Subscriber {
throw RuntimeException("always error: event " + event.time)
}

@Subscription(topic = Topic2Topic::class, from = FromOffset.BEGINNING)
@Retry(count = 1, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catchAny")
fun consumeButFailAndCatchAny(event: PubSubEvent) {
throw RuntimeException("always error: event " + event.time)
}

@Verb
@VerbName("catch")
fun catchVerb(req: CatchRequest<PubSubEvent?>) {
Expand Down
5 changes: 5 additions & 0 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
logger.Errorf(err, "could not create FTL dev Config")
}
}
err = rpc.Wait(ctx, backoff.Backoff{}, time.Second*10, client)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to ping dev endpoint: %w", err)
}
} else {
err := download.ArtefactsFromOCI(ctx, s.controllerClient, key, deploymentDir, s.storage)
if err != nil {
Expand Down

0 comments on commit 36feacd

Please sign in to comment.