Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: publish to old pubsub impl and kafka #3609

Merged
merged 4 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestPubSub(t *testing.T) {
calls := 20
events := calls * 10
in.Run(t,
in.WithLanguages("go"),
in.WithLanguages("java", "go", "kotlin"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
Expand All @@ -45,7 +45,7 @@ func TestPubSub(t *testing.T) {

func TestConsumptionDelay(t *testing.T) {
in.Run(t,
in.WithLanguages("go"),
in.WithLanguages("go", "java"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestConsumptionDelay(t *testing.T) {
func TestRetry(t *testing.T) {
retriesPerCall := 2
in.Run(t,
in.WithLanguages("go"),
in.WithLanguages("java", "go"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ import io.quarkus.logging.Log
import xyz.block.ftl.*
import java.time.ZonedDateTime

class PartitionMapper : TopicPartitionMapper<PubSubEvent> {
override fun getPartitionKey(event: PubSubEvent): String {
return event.time.toString()
}
}

class Publisher {
@Export
@Topic("testTopic")
interface TestTopic : WriteableTopic<PubSubEvent?>
interface TestTopic : WriteableTopic<PubSubEvent, PartitionMapper>

@Topic("localTopic")
interface LocalTopic : WriteableTopic<PubSubEvent?>
interface LocalTopic : WriteableTopic<PubSubEvent, PartitionMapper>

@Export
@Topic("topic2")
interface Topic2 : WriteableTopic<PubSubEvent?>
interface Topic2 : WriteableTopic<PubSubEvent, PartitionMapper>

@Verb
@Throws(Exception::class)
Expand Down
31 changes: 29 additions & 2 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package pubsub

import (
"context"
"fmt"

"connectrpc.com/connect"
"github.com/IBM/sarama"

deploymentpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
ftldeploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
)

type publisher struct {
module string
topic *schema.Topic
producer sarama.SyncProducer
}

func newPublisher(t *schema.Topic) (*publisher, error) {
func newPublisher(module string, t *schema.Topic) (*publisher, error) {
if t.Runtime == nil {
return nil, fmt.Errorf("topic %s has no runtime", t.Name)
}
Expand All @@ -31,12 +38,16 @@ func newPublisher(t *schema.Topic) (*publisher, error) {
return nil, fmt.Errorf("failed to create producer for topic %s: %w", t.Name, err)
}
return &publisher{
module: module,
topic: t,
producer: producer,
}, nil
}

func (p *publisher) publish(data []byte, key string) error {
func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.RefKey) error {
if err := p.publishToController(ctx, data, caller); err != nil {
return err
}
_, _, err := p.producer.SendMessage(&sarama.ProducerMessage{
Topic: p.topic.Runtime.TopicID,
Value: sarama.ByteEncoder(data),
Expand All @@ -47,3 +58,19 @@ func (p *publisher) publish(data []byte, key string) error {
}
return nil
}

// publishToController publishes the data to the controller (old pubsub implementation)
//
// This is to keep pubsub working while we transition fully to Kafka for pubsub.
func (p *publisher) publishToController(ctx context.Context, data []byte, caller schema.RefKey) error {
client := rpc.ClientFromContext[ftldeploymentconnect.DeploymentServiceClient](ctx)
_, err := client.PublishEvent(ctx, connect.NewRequest(&deploymentpb.PublishEventRequest{
Topic: &schemapb.Ref{Module: p.module, Name: p.topic.Name},
Caller: caller.Name,
Body: data,
}))
if err != nil {
return fmt.Errorf("failed to publish event to controller: %w", err)
}
return nil
}
8 changes: 6 additions & 2 deletions backend/runner/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ pbconnect.PublishServiceHandler = (*Service)(nil)
func New(module *schema.Module) (*Service, error) {
publishers := map[string]*publisher{}
for t := range sl.FilterVariants[*schema.Topic](module.Decls) {
publisher, err := newPublisher(t)
publisher, err := newPublisher(module.Name, t)
if err != nil {
return nil, err
}
Expand All @@ -42,7 +42,11 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[pb.Publ
if !ok {
return nil, fmt.Errorf("topic %s not found", req.Msg.Topic.Name)
}
err := publisher.publish(req.Msg.Body, req.Msg.Key)
caller, err := schema.ParseRef(req.Msg.Caller)
if err != nil {
return nil, fmt.Errorf("could not parse caller: %w", err)
}
err = publisher.publish(ctx, req.Msg.Body, req.Msg.Key, caller.ToRefKey())
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
s.pubSub = pubSub

deploymentServiceClient := rpc.Dial(ftldeploymentconnect.NewDeploymentServiceClient, s.config.ControllerEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, deploymentServiceClient)

leaseServiceClient := rpc.Dial(ftlleaseconnect.NewLeaseServiceClient, s.config.ControllerEndpoint.String(), log.Error)

s.proxy = proxy.New(deploymentServiceClient, leaseServiceClient)
Expand Down
28 changes: 0 additions & 28 deletions go-runtime/internal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"connectrpc.com/connect"
"github.com/puzpuzpuz/xsync/v3"

ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
deploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
pubpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/publish/v1"
pubconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/publish/v1/publishpbconnect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
Expand Down Expand Up @@ -57,32 +55,6 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any
if topic.Module != caller.Module {
return fmt.Errorf("can not publish to another module's topic: %s", topic)
}
if err := publishToFTL(ctx, topic, event, caller); err != nil {
return err
}
return publishToModule(ctx, topic, event, key, caller)
}

func publishToFTL(ctx context.Context, topic *schema.Ref, event any, caller schema.RefKey) error {
// TODO: remove this once we have other pubsub moved over to kafka
// For now we are publishing to both systems.
client := rpc.ClientFromContext[deploymentconnect.DeploymentServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event to controller: %w", err)
}
_, err = client.PublishEvent(ctx, connect.NewRequest(&ftldeployment.PublishEventRequest{
Topic: topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert
Caller: caller.Name,
Body: body,
}))
if err != nil {
return fmt.Errorf("failed to publish event to controller: %w", err)
}
return nil
}

func publishToModule(ctx context.Context, topic *schema.Ref, event any, key string, caller schema.RefKey) error {
client := rpc.ClientFromContext[pubconnect.PublishServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
Expand Down
Loading