Skip to content

Commit

Permalink
fix: some events weren’t being published to new timeline service (#3650)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e authored Dec 6, 2024
1 parent 72efa87 commit c5f8097
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 44 deletions.
28 changes: 15 additions & 13 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/alecthomas/kong"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
"github.com/jackc/pgx/v5"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
Expand All @@ -43,7 +44,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/timeline"
oldtimeline "github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
frontend "github.com/TBD54566975/ftl/frontend/console"
"github.com/TBD54566975/ftl/internal/configuration"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
Expand Down Expand Up @@ -207,7 +209,7 @@ type Service struct {
tasks *scheduledtask.Scheduler
pubSub *pubsub.Service
registry artefacts.Service
timeline *timeline.Service
timeline *oldtimeline.Service
controllerListListeners []ControllerListListener

// Map from runnerKey.String() to client.
Expand Down Expand Up @@ -273,7 +275,7 @@ func New(
}
svc.registry = storage

timelineSvc := timeline.New(ctx, conn, encryption)
timelineSvc := oldtimeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub
Expand Down Expand Up @@ -463,7 +465,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

s.timeline.EnqueueEvent(ctx, &timeline.Log{
timeline.Publish(ctx, &timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Time: msg.TimeStamp.AsTime(),
Expand Down Expand Up @@ -901,7 +903,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl
routes := s.routeTable.Current()
route, ok := routes.GetDeployment(module).Get()
if ok {
s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{
timeline.Publish(ctx, &timeline.PubSubPublish{
DeploymentKey: route,
RequestKey: requestKey,
Time: now,
Expand Down Expand Up @@ -1018,16 +1020,16 @@ func (s *Service) callWithRequest(
if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
s.timeline.EnqueueEvent(ctx, callEvent)
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
}

err = validateCallBody(req.Msg.Body, verb, sch)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body"))
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
s.timeline.EnqueueEvent(ctx, callEvent)
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.Publish(ctx, callEvent)
return nil, err
}

Expand All @@ -1044,15 +1046,15 @@ func (s *Service) callWithRequest(
var resp *connect.Response[ftlv1.CallResponse]
if err == nil {
resp = connect.NewResponse(response.Msg)
callEvent.Response = either.LeftOf[error](resp.Msg)
callEvent.Response = result.Ok(resp.Msg)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
} else {
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module)
}

s.timeline.EnqueueEvent(ctx, callEvent)
timeline.Publish(ctx, callEvent)
return resp, err
}

Expand Down Expand Up @@ -1259,7 +1261,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if e, ok := err.Get(); ok {
errStr = optional.Some(e.Error())
}
s.timeline.EnqueueEvent(ctx, &timeline.AsyncExecute{
timeline.Publish(ctx, &timeline.AsyncExecute{
DeploymentKey: deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Expand Down
35 changes: 4 additions & 31 deletions backend/controller/timeline/timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@ package timeline
import (
"context"
stdsql "database/sql"
"time"

"github.com/alecthomas/atomic"

"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/internal/log"
)

type EventType = sql.EventType
Expand Down Expand Up @@ -41,39 +37,16 @@ type InEvent interface {
}

type Service struct {
ctx context.Context
conn *stdsql.DB
encryption *encryption.Service
events chan Event
lastDroppedError atomic.Value[time.Time]
lastFailedError atomic.Value[time.Time]
ctx context.Context
conn *stdsql.DB
encryption *encryption.Service
}

func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service {
var s *Service
events := make(chan Event, 1000)
s = &Service{
s := &Service{
ctx: ctx,
conn: conn,
encryption: encryption,
events: events,
}
return s
}

// EnqueueEvent asynchronously enqueues an event for insertion into the timeline.
func (s *Service) EnqueueEvent(ctx context.Context, inEvent InEvent) {
event, err := inEvent.toEvent()
if err != nil {
log.FromContext(ctx).Warnf("Failed to convert event to event: %v", err)
return
}
select {
case s.events <- event:
default:
if time.Since(s.lastDroppedError.Load()) > 10*time.Second {
log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event)
s.lastDroppedError.Store(time.Now())
}
}
}

0 comments on commit c5f8097

Please sign in to comment.