From 60fcd33ecd4dabac8bf6e5a69c5dbff20465317f Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 6 Dec 2024 08:49:34 +1100 Subject: [PATCH 1/4] fix: publish to old pubsub impl and kafka # Conflicts: # backend/runner/runner.go # go-runtime/internal/impl.go --- backend/controller/pubsub/integration_test.go | 6 ++-- backend/runner/pubsub/publisher.go | 27 +++++++++++++++++- backend/runner/pubsub/pubsub.go | 6 +++- go-runtime/internal/impl.go | 28 ------------------- 4 files changed, 34 insertions(+), 33 deletions(-) diff --git a/backend/controller/pubsub/integration_test.go b/backend/controller/pubsub/integration_test.go index bc257dcc03..52e951c390 100644 --- a/backend/controller/pubsub/integration_test.go +++ b/backend/controller/pubsub/integration_test.go @@ -19,7 +19,7 @@ func TestPubSub(t *testing.T) { calls := 20 events := calls * 10 in.Run(t, - in.WithLanguages("go"), + in.WithLanguages("java", "go", "kotlin"), in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), @@ -45,7 +45,7 @@ func TestPubSub(t *testing.T) { func TestConsumptionDelay(t *testing.T) { in.Run(t, - in.WithLanguages("go"), + in.WithLanguages("go", "java"), in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), @@ -89,7 +89,7 @@ func TestConsumptionDelay(t *testing.T) { func TestRetry(t *testing.T) { retriesPerCall := 2 in.Run(t, - in.WithLanguages("go"), + in.WithLanguages("java", "go"), in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index 2b1e58be1a..8adc53dc64 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -1,10 +1,16 @@ package pubsub import ( + "context" "fmt" + "connectrpc.com/connect" "github.com/IBM/sarama" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/schema" ) @@ -36,7 +42,10 @@ func newPublisher(t *schema.Topic) (*publisher, error) { }, nil } -func (p *publisher) publish(data []byte, key string) error { +func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.RefKey) error { + if err := p.publishToController(ctx, data, caller); err != nil { + return err + } _, _, err := p.producer.SendMessage(&sarama.ProducerMessage{ Topic: p.topic.Runtime.TopicID, Value: sarama.ByteEncoder(data), @@ -47,3 +56,19 @@ func (p *publisher) publish(data []byte, key string) error { } return nil } + +// publishToController publishes the data to the controller (old pubsub implementation) +// +// This is to keep pubsub working while we transition fully to Kafka for pubsub. +func (p *publisher) publishToController(ctx context.Context, data []byte, caller schema.RefKey) error { + client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx) + _, err := client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{ + Topic: p.topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert + Caller: caller.Name, + Body: data, + })) + if err != nil { + return fmt.Errorf("failed to publish event to controller: %w", err) + } + return nil +} diff --git a/backend/runner/pubsub/pubsub.go b/backend/runner/pubsub/pubsub.go index 455977424c..52c1b59ab4 100644 --- a/backend/runner/pubsub/pubsub.go +++ b/backend/runner/pubsub/pubsub.go @@ -42,7 +42,11 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[pb.Publ if !ok { return nil, fmt.Errorf("topic %s not found", req.Msg.Topic.Name) } - err := publisher.publish(req.Msg.Body, req.Msg.Key) + caller, err := schema.ParseRef(req.Msg.Caller) + if err != nil { + return nil, fmt.Errorf("could not parse caller: %w", err) + } + err = publisher.publish(ctx, req.Msg.Body, req.Msg.Key, caller.ToRefKey()) if err != nil { return nil, err } diff --git a/go-runtime/internal/impl.go b/go-runtime/internal/impl.go index 9577b416df..9575e4705d 100644 --- a/go-runtime/internal/impl.go +++ b/go-runtime/internal/impl.go @@ -10,8 +10,6 @@ import ( "connectrpc.com/connect" "github.com/puzpuzpuz/xsync/v3" - ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" - deploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect" pubpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/publish/v1" pubconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/publish/v1/publishpbconnect" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" @@ -57,32 +55,6 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any if topic.Module != caller.Module { return fmt.Errorf("can not publish to another module's topic: %s", topic) } - if err := publishToFTL(ctx, topic, event, caller); err != nil { - return err - } - return publishToModule(ctx, topic, event, key, caller) -} - -func publishToFTL(ctx context.Context, topic *schema.Ref, event any, caller schema.RefKey) error { - // TODO: remove this once we have other pubsub moved over to kafka - // For now we are publishing to both systems. - client := rpc.ClientFromContext[deploymentconnect.DeploymentServiceClient](ctx) - body, err := encoding.Marshal(event) - if err != nil { - return fmt.Errorf("failed to marshal event to controller: %w", err) - } - _, err = client.PublishEvent(ctx, connect.NewRequest(&ftldeployment.PublishEventRequest{ - Topic: topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert - Caller: caller.Name, - Body: body, - })) - if err != nil { - return fmt.Errorf("failed to publish event to controller: %w", err) - } - return nil -} - -func publishToModule(ctx context.Context, topic *schema.Ref, event any, key string, caller schema.RefKey) error { client := rpc.ClientFromContext[pubconnect.PublishServiceClient](ctx) body, err := encoding.Marshal(event) if err != nil { From c50327fc40cda695362059fb7a45e13ed51cda8c Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Wed, 4 Dec 2024 10:03:23 +1100 Subject: [PATCH 2/4] fix ref --- backend/runner/pubsub/publisher.go | 6 ++++-- backend/runner/pubsub/pubsub.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index 8adc53dc64..a45bd5d1a6 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -15,11 +15,12 @@ import ( ) type publisher struct { + module string topic *schema.Topic producer sarama.SyncProducer } -func newPublisher(t *schema.Topic) (*publisher, error) { +func newPublisher(module string, t *schema.Topic) (*publisher, error) { if t.Runtime == nil { return nil, fmt.Errorf("topic %s has no runtime", t.Name) } @@ -37,6 +38,7 @@ func newPublisher(t *schema.Topic) (*publisher, error) { return nil, fmt.Errorf("failed to create producer for topic %s: %w", t.Name, err) } return &publisher{ + module: module, topic: t, producer: producer, }, nil @@ -63,7 +65,7 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller func (p *publisher) publishToController(ctx context.Context, data []byte, caller schema.RefKey) error { client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx) _, err := client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{ - Topic: p.topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert + Topic: &schemapb.Ref{Module: p.module, Name: p.topic.Name}, Caller: caller.Name, Body: data, })) diff --git a/backend/runner/pubsub/pubsub.go b/backend/runner/pubsub/pubsub.go index 52c1b59ab4..443c42b299 100644 --- a/backend/runner/pubsub/pubsub.go +++ b/backend/runner/pubsub/pubsub.go @@ -22,7 +22,7 @@ var _ pbconnect.PublishServiceHandler = (*Service)(nil) func New(module *schema.Module) (*Service, error) { publishers := map[string]*publisher{} for t := range sl.FilterVariants[*schema.Topic](module.Decls) { - publisher, err := newPublisher(t) + publisher, err := newPublisher(module.Name, t) if err != nil { return nil, err } From 371ff21633ec9cdf92d2394b976df98922b62d39 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 6 Dec 2024 08:55:46 +1100 Subject: [PATCH 3/4] call new deployment client --- backend/runner/pubsub/publisher.go | 8 ++++---- backend/runner/runner.go | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index a45bd5d1a6..0b75066afd 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -7,9 +7,9 @@ import ( "connectrpc.com/connect" "github.com/IBM/sarama" + deploymentpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" + ftldeploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/schema" ) @@ -63,8 +63,8 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller // // This is to keep pubsub working while we transition fully to Kafka for pubsub. func (p *publisher) publishToController(ctx context.Context, data []byte, caller schema.RefKey) error { - client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx) - _, err := client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{ + client := rpc.ClientFromContext[ftldeploymentconnect.DeploymentServiceClient](ctx) + _, err := client.PublishEvent(ctx, connect.NewRequest(&deploymentpb.PublishEventRequest{ Topic: &schemapb.Ref{Module: p.module, Name: p.topic.Name}, Caller: caller.Name, Body: data, diff --git a/backend/runner/runner.go b/backend/runner/runner.go index 4a6503079a..054573e4c8 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -362,6 +362,8 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s s.pubSub = pubSub deploymentServiceClient := rpc.Dial(ftldeploymentconnect.NewDeploymentServiceClient, s.config.ControllerEndpoint.String(), log.Error) + ctx = rpc.ContextWithClient(ctx, deploymentServiceClient) + leaseServiceClient := rpc.Dial(ftlleaseconnect.NewLeaseServiceClient, s.config.ControllerEndpoint.String(), log.Error) s.proxy = proxy.New(deploymentServiceClient, leaseServiceClient) From c7e2bacc0732426e567bc283b91d7098a3debb79 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 6 Dec 2024 16:23:22 +1100 Subject: [PATCH 4/4] fix kotlin example --- .../xyz/block/ftl/java/test/publisher/Publisher.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/backend/controller/pubsub/testdata/kotlin/publisher/src/main/kotlin/xyz/block/ftl/java/test/publisher/Publisher.kt b/backend/controller/pubsub/testdata/kotlin/publisher/src/main/kotlin/xyz/block/ftl/java/test/publisher/Publisher.kt index 0549c44787..b9c3312424 100644 --- a/backend/controller/pubsub/testdata/kotlin/publisher/src/main/kotlin/xyz/block/ftl/java/test/publisher/Publisher.kt +++ b/backend/controller/pubsub/testdata/kotlin/publisher/src/main/kotlin/xyz/block/ftl/java/test/publisher/Publisher.kt @@ -4,17 +4,23 @@ import io.quarkus.logging.Log import xyz.block.ftl.* import java.time.ZonedDateTime +class PartitionMapper : TopicPartitionMapper { + override fun getPartitionKey(event: PubSubEvent): String { + return event.time.toString() + } +} + class Publisher { @Export @Topic("testTopic") - interface TestTopic : WriteableTopic + interface TestTopic : WriteableTopic @Topic("localTopic") - interface LocalTopic : WriteableTopic + interface LocalTopic : WriteableTopic @Export @Topic("topic2") - interface Topic2 : WriteableTopic + interface Topic2 : WriteableTopic @Verb @Throws(Exception::class)