Skip to content

Commit

Permalink
feat: propagate otel spans across pubsub (#2287)
Browse files Browse the repository at this point in the history
This adds an `otel_context` column to both `topic_events` and
`async_calls`.
- on publish, the current otel context is stored in `topic_events`
- `otel_context` is propagated to the `async_calls` row on dequeue and
retries
- on execution of a row in `async_calls`, the current otel trace is
replaced with `otel_context`

Pros:
- we can see related calls all in the same trace
- pattern can be reused for all async calls (i.e. fsm)

Cons:
- storing otel specific context in the db, feels a bit gross
- there is no root span tying the retries / subscriptions together,
since the root would be the controller lifecycle. screenshot below.

An alternative would be to persist the `requestKey` in the tables
instead of `otel_context` and attach that to subscribers / retries, and
use that as a filter in DD, etc.

<img width="774" alt="Screenshot 2024-08-07 at 12 59 43 PM"
src="https://github.com/user-attachments/assets/2cd61392-0ff8-4032-8ca4-20e4aec033de">

Closes #2258
  • Loading branch information
safeer authored Aug 8, 2024
1 parent f1ec7df commit fa2636f
Show file tree
Hide file tree
Showing 21 changed files with 387 additions and 171 deletions.
38 changes: 20 additions & 18 deletions backend/controller/call_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
)

type Call struct {
deploymentKey model.DeploymentKey
requestKey model.RequestKey
startTime time.Time
destVerb *schema.Ref
callers []*schema.Ref
request *ftlv1.CallRequest
response optional.Option[*ftlv1.CallResponse]
callError optional.Option[error]
deploymentKey model.DeploymentKey
requestKey model.RequestKey
parentRequestKey optional.Option[model.RequestKey]
startTime time.Time
destVerb *schema.Ref
callers []*schema.Ref
request *ftlv1.CallRequest
response optional.Option[*ftlv1.CallResponse]
callError optional.Option[error]
}

func (s *Service) recordCall(ctx context.Context, call *Call) {
Expand All @@ -46,16 +47,17 @@ func (s *Service) recordCall(ctx context.Context, call *Call) {
}

err := s.dal.InsertCallEvent(ctx, &dal.CallEvent{
Time: call.startTime,
DeploymentKey: call.deploymentKey,
RequestKey: optional.Some(call.requestKey),
Duration: time.Since(call.startTime),
SourceVerb: sourceVerb,
DestVerb: *call.destVerb,
Request: call.request.GetBody(),
Response: responseBody,
Error: errorStr,
Stack: stack,
Time: call.startTime,
DeploymentKey: call.deploymentKey,
RequestKey: optional.Some(call.requestKey),
ParentRequestKey: call.parentRequestKey,
Duration: time.Since(call.startTime),
SourceVerb: sourceVerb,
DestVerb: *call.destVerb,
Request: call.request.GetBody(),
Response: responseBody,
Error: errorStr,
Stack: stack,
})
if err != nil {
logger.Errorf(err, "failed to record call")
Expand Down
44 changes: 33 additions & 11 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[f
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "")
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), optional.None[model.RequestKey](), "")
}

func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (resp *connect.Response[ftlv1.SendFSMEventResponse], err error) {
Expand Down Expand Up @@ -962,6 +962,7 @@ func (s *Service) callWithRequest(
ctx context.Context,
req *connect.Request[ftlv1.CallRequest],
key optional.Option[model.RequestKey],
parentKey optional.Option[model.RequestKey],
sourceAddress string,
) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()
Expand Down Expand Up @@ -1049,6 +1050,9 @@ func (s *Service) callWithRequest(
}
}

if pk, ok := parentKey.Get(); ok {
ctx = rpc.WithParentRequestKey(ctx, pk)
}
ctx = rpc.WithRequestKey(ctx, requestKey)
ctx = rpc.WithVerbs(ctx, append(callers, verbRef))
headers.AddCaller(req.Header(), schema.RefFromProto(req.Msg.Verb))
Expand All @@ -1064,14 +1068,15 @@ func (s *Service) callWithRequest(
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
}
s.recordCall(ctx, &Call{
deploymentKey: route.Deployment,
requestKey: requestKey,
startTime: start,
destVerb: verbRef,
callers: callers,
callError: optional.Nil(err),
request: req.Msg,
response: maybeResponse,
deploymentKey: route.Deployment,
requestKey: requestKey,
parentRequestKey: parentKey,
startTime: start,
destVerb: verbRef,
callers: callers,
callError: optional.Nil(err),
request: req.Msg,
response: maybeResponse,
})
return resp, err
}
Expand Down Expand Up @@ -1384,6 +1389,23 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
return 0, err
}

// Extract the otel context from the call
ctx, err = observability.ExtractTraceContextToContext(ctx, call.TraceContext)
if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
return 0, fmt.Errorf("failed to extract trace context: %w", err)
}

// Extract the request key from the call and attach it as the parent request key
parentRequestKey := optional.None[model.RequestKey]()
if prk, ok := call.ParentRequestKey.Get(); ok {
if rk, err := model.ParseRequestKey(prk); err == nil {
parentRequestKey = optional.Some(rk)
} else {
logger.Tracef("Ignoring invalid request key: %s", prk)
}
}

observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, nil)

defer func() {
Expand Down Expand Up @@ -1417,7 +1439,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
Verb: call.Verb.ToProto(),
Body: call.Request,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), parentRequestKey, s.config.Advertise.String())
var callResult either.Either[[]byte, string]
if err != nil {
logger.Warnf("Async call could not be called: %v", err)
Expand Down Expand Up @@ -1475,7 +1497,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
Verb: catchVerb.ToProto(),
Body: body,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), optional.None[model.RequestKey](), s.config.Advertise.String())
var catchResult either.Either[[]byte, string]
if err != nil {
// Could not call catch verb
Expand Down
5 changes: 2 additions & 3 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Scheduler interface {
Parallel(retry backoff.Backoff, job scheduledtask.Job)
}

type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error)
type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error)

type Service struct {
config Config
Expand Down Expand Up @@ -216,9 +216,8 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()

observability.Cron.JobStarted(ctx, job)
_, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource)
_, err = s.call(callCtx, req, optional.Some(requestKey), optional.None[model.RequestKey](), s.requestSource)

// Record execution success/failure metric now and leave post job-execution-action observability to logging
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestHashRing(t *testing.T) {
err = mockDal.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], p optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
return &connect.Response[ftlv1.CallResponse]{}, nil
})

Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func testServiceWithDal(ctx context.Context, t *testing.T, dal DAL, parentDAL Pa
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

_ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
_ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], p optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(r.Msg.Verb)

verbCallCountLock.Lock()
Expand Down
37 changes: 21 additions & 16 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ func ParseAsyncOrigin(origin string) (AsyncOrigin, error) {
}

type AsyncCall struct {
*Lease // May be nil
ID int64
Origin AsyncOrigin
Verb schema.RefKey
CatchVerb optional.Option[schema.RefKey]
Request json.RawMessage
ScheduledAt time.Time
QueueDepth int64
*Lease // May be nil
ID int64
Origin AsyncOrigin
Verb schema.RefKey
CatchVerb optional.Option[schema.RefKey]
Request json.RawMessage
ScheduledAt time.Time
QueueDepth int64
ParentRequestKey optional.Option[string]
TraceContext []byte

Error optional.Option[string]

Expand Down Expand Up @@ -128,6 +130,8 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
Lease: lease,
ScheduledAt: row.ScheduledAt,
QueueDepth: row.QueueDepth,
ParentRequestKey: row.ParentRequestKey,
TraceContext: row.TraceContext,
RemainingAttempts: row.RemainingAttempts,
Error: row.Error,
Backoff: row.Backoff,
Expand Down
21 changes: 13 additions & 8 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,10 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if rn, ok := call.RequestKey.Get(); ok {
requestKey = optional.Some(rn.String())
}
var parentRequestKey optional.Option[string]
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
Expand All @@ -1153,14 +1157,15 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
return fmt.Errorf("failed to encrypt call payload: %w", err)
}
return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{
DeploymentKey: call.DeploymentKey,
RequestKey: requestKey,
TimeStamp: call.Time,
SourceModule: sourceModule,
SourceVerb: sourceVerb,
DestModule: call.DestVerb.Module,
DestVerb: call.DestVerb.Name,
Payload: payload,
DeploymentKey: call.DeploymentKey,
RequestKey: requestKey,
ParentRequestKey: parentRequestKey,
TimeStamp: call.Time,
SourceModule: sourceModule,
SourceVerb: sourceVerb,
DestModule: call.DestVerb.Module,
DestVerb: call.DestVerb.Name,
Payload: payload,
}))
}

Expand Down
23 changes: 12 additions & 11 deletions backend/controller/dal/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ func (e *LogEvent) GetID() int64 { return e.ID }
func (e *LogEvent) event() {}

type CallEvent struct {
ID int64
DeploymentKey model.DeploymentKey
RequestKey optional.Option[model.RequestKey]
Time time.Time
SourceVerb optional.Option[schema.Ref]
DestVerb schema.Ref
Duration time.Duration
Request json.RawMessage
Response json.RawMessage
Error optional.Option[string]
Stack optional.Option[string]
ID int64
DeploymentKey model.DeploymentKey
RequestKey optional.Option[model.RequestKey]
ParentRequestKey optional.Option[model.RequestKey]
Time time.Time
SourceVerb optional.Option[schema.Ref]
DestVerb schema.Ref
Duration time.Duration
Request json.RawMessage
Response json.RawMessage
Error optional.Option[string]
Stack optional.Option[string]
}

func (e *CallEvent) GetID() int64 { return e.ID }
Expand Down
33 changes: 28 additions & 5 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/slices"
)

Expand All @@ -23,12 +24,32 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller st
if err != nil {
return fmt.Errorf("failed to encrypt payload: %w", err)
}

// Store the current otel context with the event
jsonOc, err := observability.RetrieveTraceContextFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve trace context: %w", err)
}

// Store the request key that initiated this publish, this will eventually
// become the parent request key of the subscriber call
requestKey := ""
if rk, err := rpc.RequestKeyFromContext(ctx); err == nil {
if rk, ok := rk.Get(); ok {
requestKey = rk.String()
}
} else {
return fmt.Errorf("failed to get request key: %w", err)
}

err = d.db.PublishEventForTopic(ctx, sql.PublishEventForTopicParams{
Key: model.NewTopicEventKey(module, topic),
Module: module,
Topic: topic,
Caller: caller,
Payload: encryptedPayload,
Key: model.NewTopicEventKey(module, topic),
Module: module,
Topic: topic,
Caller: caller,
Payload: encryptedPayload,
RequestKey: requestKey,
TraceContext: jsonOc,
})
observability.PubSub.Published(ctx, module, topic, caller, err)
if err != nil {
Expand Down Expand Up @@ -111,6 +132,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
RemainingAttempts: subscriber.RetryAttempts,
Backoff: subscriber.Backoff,
MaxBackoff: subscriber.MaxBackoff,
ParentRequestKey: nextCursor.RequestKey,
TraceContext: nextCursor.TraceContext,
CatchVerb: subscriber.CatchVerb,
})
observability.AsyncCalls.Created(ctx, subscriber.Sink, subscriber.CatchVerb, origin.String(), int64(subscriber.RetryAttempts), err)
Expand Down
Loading

0 comments on commit fa2636f

Please sign in to comment.