diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index 9c68d24b4..e2bab7837 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -85,10 +85,16 @@ func (c *consumer) Begin(ctx context.Context) error { groupID := kafkaConsumerGroupID(c.moduleName, c.verb) log.FromContext(ctx).Infof("Subscribing to topic %s for %s with offset %v", c.kafkaTopicID(), groupID, config.Consumer.Offsets.Initial) + c.checkOffsets(ctx) + // if groupID == "subscriber.consume" { + // return nil + // } group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config) if err != nil { return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err) } + c.checkOffsets(ctx) + go c.watchErrors(ctx, group) go c.subscribe(ctx, group) return nil @@ -225,3 +231,38 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) observability.Calls.Request(ctx, req.Verb, start, optional.None[string]()) return nil } + +func (c *consumer) checkOffsets(ctx context.Context) { + groupId := kafkaConsumerGroupID(c.moduleName, c.verb) + logger := log.FromContext(ctx).Scope(groupId + ".offsets") + client, err := sarama.NewClient(c.verb.Runtime.Subscription.KafkaBrokers, sarama.NewConfig()) + if err != nil { + logger.Errorf(err, "failed to create client") + } + admin, err := sarama.NewClusterAdminFromClient(client) + if err != nil { + logger.Errorf(err, "failed to create cluster admin") + return + } + groups, err := admin.DescribeConsumerGroups([]string{groupId}) + if err != nil { + logger.Errorf(err, "failed to describe consumer groups") + return + } + if len(groups) == 0 { + logger.Infof("no consumer group found") + return + } + resp, err := admin.ListConsumerGroupOffsets(groupId, nil) + if err != nil { + logger.Errorf(err, "failed to list consumer group offsets") + return + } + outStrs := []string{} + for topic, partitions := range resp.Blocks { + for partition, block := range partitions { + outStrs = append(outStrs, fmt.Sprintf("%s:%d:%d", topic, partition, block.Offset)) + } + } + logger.Infof("offsets:\n%s", strings.Join(outStrs, "\n")) +} diff --git a/backend/runner/pubsub/integration_test.go b/backend/runner/pubsub/integration_test.go index eeba87856..be100da76 100644 --- a/backend/runner/pubsub/integration_test.go +++ b/backend/runner/pubsub/integration_test.go @@ -17,11 +17,12 @@ import ( ) func TestPubSub(t *testing.T) { - t.Skip("Skipping flaky test") + // t.Skip("Skipping flaky test") calls := 20 events := calls * 10 in.Run(t, - in.WithLanguages("java", "go", "kotlin"), + // in.WithLanguages("java", "go", "kotlin"), + in.WithLanguages("java"), in.WithPubSub(), in.CopyModule("publisher"), in.CopyModule("subscriber"), diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index 105f51694..ecd8b7d26 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -10,6 +10,7 @@ import ( "github.com/block/ftl/backend/timeline" "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/model" "github.com/block/ftl/internal/rpc" ) @@ -76,10 +77,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller }) if err != nil { timelineEvent.Error = optional.Some(err.Error()) + log.FromContext(ctx).Errorf(err, "Failed to publish message") return fmt.Errorf("failed to publish message: %w", err) } timelineEvent.Partition = int(partition) timelineEvent.Offset = int(offset) p.timelineClient.Publish(ctx, timelineEvent) + log.FromContext(ctx).Infof("Published successfully (%v:%v:%v)", p.topic.Runtime.TopicID, partition, offset) return nil } diff --git a/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java b/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java index ca74499b6..baa2c89ee 100644 --- a/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java +++ b/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java @@ -12,11 +12,12 @@ public class Subscriber { @Subscription(topic = TestTopicTopic.class, from = FromOffset.BEGINNING) void consume(PubSubEvent event) throws Exception { - Log.infof("Subscriber is consuming %s", event.getTime()); + Log.infof("consume: %s", event.getTime()); } @Subscription(topic = TestTopicTopic.class, from = FromOffset.LATEST) void consumeFromLatest(PubSubEvent event) throws Exception { + Log.infof("consumeFromLatest: %s", event.getTime()); } @Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING) diff --git a/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt b/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt index b10f8fbae..6e30ddb99 100644 --- a/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt +++ b/backend/runner/pubsub/testdata/kotlin/subscriber/src/main/kotlin/xyz/block/ftl/java/test/subscriber/Subscriber.kt @@ -28,12 +28,6 @@ class Subscriber { throw RuntimeException("always error: event " + event.time) } - @Subscription(topic = Topic2Topic::class, from = FromOffset.BEGINNING) - @Retry(count = 1, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catchAny") - fun consumeButFailAndCatchAny(event: PubSubEvent) { - throw RuntimeException("always error: event " + event.time) - } - @Verb @VerbName("catch") fun catchVerb(req: CatchRequest) { diff --git a/backend/runner/runner.go b/backend/runner/runner.go index 3901abba6..20deb434a 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -397,6 +397,11 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s logger.Errorf(err, "could not create FTL dev Config") } } + err = rpc.Wait(ctx, backoff.Backoff{}, time.Second*10, client) + if err != nil { + observability.Deployment.Failure(ctx, optional.Some(key.String())) + return fmt.Errorf("failed to ping dev endpoint: %w", err) + } } else { err := download.ArtefactsFromOCI(ctx, s.controllerClient, key, deploymentDir, s.storage) if err != nil {