Skip to content

Commit

Permalink
feat: log name of the calling verb to PublishEvent in otel metric att…
Browse files Browse the repository at this point in the history
…ributes (#2219)

Add `ftl.pubsub.publish.caller.verb.name` attribute to
`ftl.pubsub.published` metric. Only the verb name, not module, is needed
because the module will be the same as the topic's module, which is
already logged (`ftl.module.name`)

```
Metric #0
Descriptor:
     -> Name: ftl.pubsub.published
     -> Description: the number of times that an event is published to a topic
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.module.name: Str(echo)
     -> ftl.pubsub.publish.caller.verb.name: Str(Echo)
     -> ftl.pubsub.topic.name: Str(echotopic)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-31 17:28:47.451585 +0000 UTC
Timestamp: 2024-07-31 17:29:12.44603 +0000 UTC
Value: 1
```

Issue: #2194
  • Loading branch information
deniseli authored Jul 31, 2024
1 parent 22f31f4 commit 8c48e11
Show file tree
Hide file tree
Showing 8 changed files with 658 additions and 625 deletions.
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
// Publish the event.
err := s.dal.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Body)
err := s.dal.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err))
}
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error {
func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error {
err := d.db.PublishEventForTopic(ctx, sql.PublishEventForTopicParams{
Key: model.NewTopicEventKey(module, topic),
Module: module,
Topic: topic,
Payload: payload,
})
observability.PubSub.Published(ctx, module, topic, err)
observability.PubSub.Published(ctx, module, topic, caller, err)
if err != nil {
return dalerrs.TranslatePGError(err)
}
Expand Down
4 changes: 3 additions & 1 deletion backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
const (
pubsubMeterName = "ftl.pubsub"
pubsubTopicNameAttr = "ftl.pubsub.topic.name"
pubsubCallerVerbNameAttr = "ftl.pubsub.publish.caller.verb.name"
pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref"
pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name"
pubsubSinkRefAttr = "ftl.pubsub.sink.ref"
Expand Down Expand Up @@ -75,10 +76,11 @@ func handleInitCounterError(errs error, err error, counterName string) error {
return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err))
}

func (m *PubSubMetrics) Published(ctx context.Context, module, topic string, maybeErr error) {
func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicNameAttr, topic),
attribute.String(pubsubCallerVerbNameAttr, caller),
attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil),
}

Expand Down
1,241 changes: 626 additions & 615 deletions backend/protos/xyz/block/ftl/v1/ftl.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions backend/protos/xyz/block/ftl/v1/ftl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ message SendFSMEventResponse {}
message PublishEventRequest {
schema.Ref topic = 1;
bytes body = 2;
// Only verb name is included because this verb will be in the same module as topic
string caller = 3;
}

message PublishEventResponse {}
Expand Down
8 changes: 8 additions & 0 deletions frontend/src/protos/xyz/block/ftl/v1/ftl_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions go-runtime/ftl/reflection/reflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,32 @@ import (

// Module returns the FTL module currently being executed.
func Module() string {
return CallingVerb().Module
}

func CallingVerb() schema.RefKey {
// Look through the stack for the outermost FTL module.
pcs := make([]uintptr, 1024)
pcs = pcs[:runtime.Callers(1, pcs)]
var module string
var verb string
for _, pc := range pcs {
pkg := strings.Split(runtime.FuncForPC(pc).Name(), ".")[0]
splitName := strings.Split(runtime.FuncForPC(pc).Name(), ".")
pkg := splitName[0]
fnName := splitName[1]
if strings.HasPrefix(pkg, "ftl/") {
module = strings.Split(pkg, "/")[1]
verb = fnName
}
}
if module == "" {
debug.PrintStack()
panic("must be called from an FTL module")
}
if strings.HasSuffix(module, "_test") {
return module[:len(module)-len("_test")]
return schema.RefKey{Module: module[:len(module)-len("_test")], Name: verb}
}
return module
return schema.RefKey{Module: module, Name: verb}
}

// TypeRef returns the Ref for a Go type.
Expand Down
8 changes: 5 additions & 3 deletions go-runtime/internal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any)
}

func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error {
if topic.Module != reflection.Module() {
caller := reflection.CallingVerb()
if topic.Module != caller.Module {
return fmt.Errorf("can not publish to another module's topic: %s", topic)
}
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
Expand All @@ -78,8 +79,9 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any
return fmt.Errorf("failed to marshal event: %w", err)
}
_, err = client.PublishEvent(ctx, connect.NewRequest(&ftlv1.PublishEventRequest{
Topic: topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert
Body: body,
Topic: topic.ToProto().(*schemapb.Ref), //nolint: forcetypeassert
Caller: caller.Name,
Body: body,
}))
if err != nil {
return fmt.Errorf("failed to publish event: %w", err)
Expand Down

0 comments on commit 8c48e11

Please sign in to comment.