diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index 8adc53dc64..a45bd5d1a6 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -15,11 +15,12 @@ import ( ) type publisher struct { + module string topic *schema.Topic producer sarama.SyncProducer } -func newPublisher(t *schema.Topic) (*publisher, error) { +func newPublisher(module string, t *schema.Topic) (*publisher, error) { if t.Runtime == nil { return nil, fmt.Errorf("topic %s has no runtime", t.Name) } @@ -37,6 +38,7 @@ func newPublisher(t *schema.Topic) (*publisher, error) { return nil, fmt.Errorf("failed to create producer for topic %s: %w", t.Name, err) } return &publisher{ + module: module, topic: t, producer: producer, }, nil @@ -63,7 +65,7 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller func (p *publisher) publishToController(ctx context.Context, data []byte, caller schema.RefKey) error { client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx) _, err := client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{ - Topic: p.topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert + Topic: &schemapb.Ref{Module: p.module, Name: p.topic.Name}, Caller: caller.Name, Body: data, })) diff --git a/backend/runner/pubsub/pubsub.go b/backend/runner/pubsub/pubsub.go index 52c1b59ab4..443c42b299 100644 --- a/backend/runner/pubsub/pubsub.go +++ b/backend/runner/pubsub/pubsub.go @@ -22,7 +22,7 @@ var _ pbconnect.PublishServiceHandler = (*Service)(nil) func New(module *schema.Module) (*Service, error) { publishers := map[string]*publisher{} for t := range sl.FilterVariants[*schema.Topic](module.Decls) { - publisher, err := newPublisher(t) + publisher, err := newPublisher(module.Name, t) if err != nil { return nil, err }