Skip to content

Commit

Permalink
feat: publish timeline events with new service (#3649)
Browse files Browse the repository at this point in the history
Timeline event publishing happens with convenience func:
`timeline.Publish(ctx, event)`, and all event creation is moved over to
it.

Timeline tests have been in this PR but [will come back
soon](#3634).

This PR is split from #3639.
Thanks @worstell for working on this one with me.

closes #3622
  • Loading branch information
matt2e authored Dec 6, 2024
1 parent 1b9345b commit 72efa87
Show file tree
Hide file tree
Showing 45 changed files with 938 additions and 2,185 deletions.
32 changes: 16 additions & 16 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,13 +638,13 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
sourceVerbRef = sourceVerb.ToProto().(*schemapb.Ref) //nolint:forcetypeassert
}
return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_Call{
Call: &pbtimeline.CallEvent{
RequestKey: requestKey,
DeploymentKey: event.DeploymentKey.String(),
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
SourceVerbRef: sourceVerbRef,
DestinationVerbRef: &schemapb.Ref{
Module: event.DestVerb.Module,
Expand All @@ -666,13 +666,13 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
requestKey = &rstr
}
return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_Log{
Log: &pbtimeline.LogEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
LogLevel: event.Level,
Attributes: event.Attributes,
Message: event.Message,
Expand All @@ -688,7 +688,7 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
replaced = &rstr
}
return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_DeploymentCreated{
DeploymentCreated: &pbtimeline.DeploymentCreatedEvent{
Expand All @@ -702,7 +702,7 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
}
case *timeline.DeploymentUpdatedEvent:
return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_DeploymentUpdated{
DeploymentUpdated: &pbtimeline.DeploymentUpdatedEvent{
Expand All @@ -721,7 +721,7 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
}

return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_Ingress{
Ingress: &pbtimeline.IngressEvent{
Expand All @@ -734,7 +734,7 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
Method: event.Method,
Path: event.Path,
StatusCode: int32(event.StatusCode),
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Request: string(event.Request),
RequestHeader: string(event.RequestHeader),
Expand All @@ -747,7 +747,7 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {

case *timeline.CronScheduledEvent:
return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_CronScheduled{
CronScheduled: &pbtimeline.CronScheduledEvent{
Expand All @@ -756,7 +756,7 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
Module: event.Verb.Module,
Name: event.Verb.Name,
},
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
ScheduledAt: timestamppb.New(event.ScheduledAt),
Schedule: event.Schedule,
Expand All @@ -782,13 +782,13 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
}

return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_AsyncExecute{
AsyncExecute: &pbtimeline.AsyncExecuteEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
AsyncEventType: asyncEventType,
VerbRef: &schemapb.Ref{
Module: event.Verb.Module,
Expand All @@ -807,14 +807,14 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
}

return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_PubsubPublish{
PubsubPublish: &pbtimeline.PubSubPublishEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
VerbRef: event.SourceVerb.ToProto().(*schemapb.Ref), //nolint:forcetypeassert
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Topic: event.Topic,
Request: string(event.Request),
Expand All @@ -837,15 +837,15 @@ func eventDALToProto(event timeline.Event) *pbtimeline.Event {
}

return &pbtimeline.Event{
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbtimeline.Event_PubsubConsume{
PubsubConsume: &pbtimeline.PubSubConsumeEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
DestVerbModule: &destVerbModule,
DestVerbName: &destVerbName,
TimeStamp: timestamppb.New(event.Time),
Timestamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Topic: event.Topic,
Error: event.Error.Ptr(),
Expand Down
16 changes: 11 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1"
leaseconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
frontend "github.com/TBD54566975/ftl/frontend/console"
Expand Down Expand Up @@ -273,11 +275,11 @@ func New(

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc), timelineSvc)
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub
svc.dal = dal.New(ctx, conn, encryption, pubSub, svc.registry)

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)
svc.deploymentLogsSink = newDeploymentLogsSink(ctx)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
Expand Down Expand Up @@ -1689,12 +1691,16 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
return time.Hour, nil
}

removed, err := s.timeline.DeleteOldEvents(ctx, timeline.EventTypeCall, *s.config.EventLogRetention)
client := rpc.ClientFromContext[timelinev1connect.TimelineServiceClient](ctx)
resp, err := client.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{
EventType: timelinepb.EventType_EVENT_TYPE_CALL,
AgeSeconds: int64(s.config.EventLogRetention.Seconds()),
}))
if err != nil {
return 0, fmt.Errorf("failed to prune call events: %w", err)
}
if removed > 0 {
logger.Debugf("Pruned %d call events older than %s", removed, s.config.EventLogRetention)
if resp.Msg.DeletedCount > 0 {
logger.Debugf("Pruned %d call events older than %s", resp.Msg.DeletedCount, s.config.EventLogRetention)
}

// Prune every 5% of the retention period.
Expand Down
4 changes: 1 addition & 3 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -24,8 +23,7 @@ func TestNoCallToAcquire(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSvc := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSvc)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
Expand Down
41 changes: 11 additions & 30 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/leases/dbleaser"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -350,21 +350,11 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return libdal.TranslatePGError(err)
}
}
var payload api.EncryptedTimelineColumn
err = d.encryption.EncryptJSON(map[string]interface{}{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
}, &payload)
if err != nil {
return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err)
}
err = tx.db.InsertTimelineDeploymentUpdatedEvent(ctx, dalsql.InsertTimelineDeploymentUpdatedEventParams{
DeploymentKey: key,
Payload: payload,
timeline.Publish(ctx, timeline.DeploymentUpdated{
DeploymentKey: key,
MinReplicas: minReplicas,
PrevMinReplicas: int(deployment.MinReplicas),
})
if err != nil {
return libdal.TranslatePGError(err)
}

return nil
}
Expand Down Expand Up @@ -420,25 +410,16 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

var payload api.EncryptedTimelineColumn
err = d.encryption.EncryptJSON(map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
}, &payload)
if err != nil {
return fmt.Errorf("replace deployment failed to encrypt payload: %w", err)
}

err = tx.db.InsertTimelineDeploymentCreatedEvent(ctx, dalsql.InsertTimelineDeploymentCreatedEventParams{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
Payload: payload,
timeline.Publish(ctx, timeline.DeploymentCreated{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
MinReplicas: minReplicas,
ReplacedDeployment: replacedDeploymentKey,
})
if err != nil {
return fmt.Errorf("replace deployment failed to create event: %w", libdal.TranslatePGError(err))
}

return nil
}

Expand Down
12 changes: 7 additions & 5 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dal
import (
"bytes"
"context"
"net/http"
"sync"
"testing"
"time"
Expand All @@ -12,7 +13,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"

dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
Expand All @@ -21,18 +22,20 @@ import (
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/sha256"
)

func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
timelineClient := timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080")
ctx = rpc.ContextWithClient(ctx, timelineClient)
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, artefacts.NewForTesting())

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down Expand Up @@ -191,8 +194,7 @@ func TestCreateArtefactConflict(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

dal := New(ctx, conn, encryption, pubSub, artefacts.NewForTesting())

Expand Down
Loading

0 comments on commit 72efa87

Please sign in to comment.