Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 5, 2024
1 parent db7ca30 commit 27436cb
Show file tree
Hide file tree
Showing 49 changed files with 1,531 additions and 3,605 deletions.
396 changes: 94 additions & 302 deletions backend/controller/console/console.go

Large diffs are not rendered by default.

45 changes: 25 additions & 20 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/alecthomas/kong"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
"github.com/jackc/pgx/v5"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
Expand All @@ -44,16 +45,18 @@ import (
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
deploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1"
leaseconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
frontend "github.com/TBD54566975/ftl/frontend/console"
"github.com/TBD54566975/ftl/internal/configuration"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
Expand Down Expand Up @@ -155,7 +158,7 @@ func Start(
logger.Debugf("Advertising as %s", config.Advertise)

admin := admin.NewAdminService(cm, sm, svc.dal)
console := console.NewService(svc.dal, svc.timeline, admin)
console := console.NewService(svc.dal, admin)

g, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -205,7 +208,6 @@ type Service struct {
tasks *scheduledtask.Scheduler
pubSub *pubsub.Service
registry artefacts.Service
timeline *timeline.Service
controllerListListeners []ControllerListListener

// Map from runnerKey.String() to client.
Expand Down Expand Up @@ -270,13 +272,11 @@ func New(
}
svc.registry = storage

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc), timelineSvc)
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub
svc.dal = dal.New(ctx, conn, encryption, pubSub, svc.registry)

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)
svc.deploymentLogsSink = newDeploymentLogsSink(ctx)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

s.timeline.EnqueueEvent(ctx, &timeline.Log{
timeline.Publish(ctx, timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Time: msg.TimeStamp.AsTime(),
Expand Down Expand Up @@ -844,7 +844,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl
module := req.Msg.Topic.Module
route, ok := sstate.routes[module]
if ok {
s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{
timeline.Publish(ctx, timeline.PubSubPublish{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
Time: now,
Expand Down Expand Up @@ -956,16 +956,16 @@ func (s *Service) callWithRequest(
if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
s.timeline.EnqueueEvent(ctx, callEvent)
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
}

err = validateCallBody(req.Msg.Body, verb, sch)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body"))
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
s.timeline.EnqueueEvent(ctx, callEvent)
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.Publish(ctx, callEvent)
return nil, err
}

Expand All @@ -982,15 +982,15 @@ func (s *Service) callWithRequest(
var resp *connect.Response[ftlv1.CallResponse]
if err == nil {
resp = connect.NewResponse(response.Msg)
callEvent.Response = either.LeftOf[error](resp.Msg)
callEvent.Response = result.Ok(resp.Msg)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
} else {
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
logger.Errorf(err, "Call failed to verb %s for deployment %s", verbRef.String(), route.Deployment)
}

s.timeline.EnqueueEvent(ctx, callEvent)
timeline.Publish(ctx, callEvent)
return resp, err
}

Expand Down Expand Up @@ -1197,7 +1197,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if e, ok := err.Get(); ok {
errStr = optional.Some(e.Error())
}
s.timeline.EnqueueEvent(ctx, &timeline.AsyncExecute{
timeline.Publish(ctx, timeline.AsyncExecute{
DeploymentKey: route.Deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Expand Down Expand Up @@ -1718,12 +1718,17 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
return time.Hour, nil
}

removed, err := s.timeline.DeleteOldEvents(ctx, timeline.EventTypeCall, *s.config.EventLogRetention)
// TODO: move this to timeline service completely?
client := rpc.ClientFromContext[timelinev1connect.TimelineServiceClient](ctx)
resp, err := client.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{
EventType: timelinepb.EventType_EVENT_TYPE_CALL,
AgeSeconds: int64(s.config.EventLogRetention.Seconds()),
}))
if err != nil {
return 0, fmt.Errorf("failed to prune call events: %w", err)
}
if removed > 0 {
logger.Debugf("Pruned %d call events older than %s", removed, s.config.EventLogRetention)
if resp.Msg.DeletedCount > 0 {
logger.Debugf("Pruned %d call events older than %s", resp.Msg.DeletedCount, s.config.EventLogRetention)
}

// Prune every 5% of the retention period.
Expand Down
4 changes: 1 addition & 3 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -24,8 +23,7 @@ func TestNoCallToAcquire(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSvc := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSvc)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
Expand Down
41 changes: 11 additions & 30 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/leases/dbleaser"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -373,21 +373,11 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return libdal.TranslatePGError(err)
}
}
var payload api.EncryptedTimelineColumn
err = d.encryption.EncryptJSON(map[string]interface{}{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
}, &payload)
if err != nil {
return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err)
}
err = tx.db.InsertTimelineDeploymentUpdatedEvent(ctx, dalsql.InsertTimelineDeploymentUpdatedEventParams{
DeploymentKey: key,
Payload: payload,
timeline.Publish(ctx, timeline.DeploymentUpdated{
DeploymentKey: key,
MinReplicas: minReplicas,
PrevMinReplicas: int(deployment.MinReplicas),
})
if err != nil {
return libdal.TranslatePGError(err)
}

return nil
}
Expand Down Expand Up @@ -443,25 +433,16 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

var payload api.EncryptedTimelineColumn
err = d.encryption.EncryptJSON(map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
}, &payload)
if err != nil {
return fmt.Errorf("replace deployment failed to encrypt payload: %w", err)
}

err = tx.db.InsertTimelineDeploymentCreatedEvent(ctx, dalsql.InsertTimelineDeploymentCreatedEventParams{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
Payload: payload,
timeline.Publish(ctx, timeline.DeploymentCreated{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
MinReplicas: minReplicas,
ReplacedDeployment: replacedDeploymentKey,
})
if err != nil {
return fmt.Errorf("replace deployment failed to create event: %w", libdal.TranslatePGError(err))
}

return nil
}

Expand Down
7 changes: 2 additions & 5 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/timeline"

dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
Expand All @@ -31,8 +30,7 @@ func TestDAL(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, artefacts.NewForTesting())

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down Expand Up @@ -191,8 +189,7 @@ func TestCreateArtefactConflict(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

dal := New(ctx, conn, encryption, pubSub, artefacts.NewForTesting())

Expand Down
89 changes: 0 additions & 89 deletions backend/controller/dal/internal/sql/deployment_queries.sql.go

This file was deleted.

2 changes: 0 additions & 2 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 27436cb

Please sign in to comment.