Skip to content

Commit

Permalink
fix: inject timeline client instead of using ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Dec 11, 2024
1 parent b05fa0a commit a373bbf
Show file tree
Hide file tree
Showing 22 changed files with 113 additions and 106 deletions.
6 changes: 2 additions & 4 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,17 +564,15 @@ func buildGraph(sch *schema.Schema, module *schema.Module, out map[string][]stri
}

func (s *service) GetTimeline(ctx context.Context, req *connect.Request[timelinepb.GetTimelineRequest]) (*connect.Response[timelinepb.GetTimelineResponse], error) {
client := timeline.ClientFromContext(ctx)
resp, err := client.GetTimeline(ctx, connect.NewRequest(req.Msg))
resp, err := s.timelineClient.GetTimeline(ctx, connect.NewRequest(req.Msg))
if err != nil {
return nil, fmt.Errorf("failed to get timeline from service: %w", err)
}
return connect.NewResponse(resp.Msg), nil
}

func (s *service) StreamTimeline(ctx context.Context, req *connect.Request[timelinepb.StreamTimelineRequest], out *connect.ServerStream[timelinepb.StreamTimelineResponse]) error {
client := timeline.ClientFromContext(ctx)
stream, err := client.StreamTimeline(ctx, req)
stream, err := s.timelineClient.StreamTimeline(ctx, connect.NewRequest(req.Msg))
if err != nil {
return fmt.Errorf("failed to stream timeline from service: %w", err)
}
Expand Down
30 changes: 17 additions & 13 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func Start(
storage *artefacts.OCIArtefactService,
cm *cf.Manager[configuration.Configuration],
sm *cf.Manager[configuration.Secrets],
timelineClient *timeline.Client,
conn *sql.DB,
devel bool,
) error {
Expand All @@ -115,7 +116,7 @@ func Start(
logger := log.FromContext(ctx)
logger.Debugf("Starting FTL controller")

svc, err := New(ctx, conn, cm, sm, storage, config, devel)
svc, err := New(ctx, conn, cm, sm, timelineClient, storage, config, devel)
if err != nil {
return err
}
Expand Down Expand Up @@ -156,9 +157,10 @@ type Service struct {
cm *cf.Manager[configuration.Configuration]
sm *cf.Manager[configuration.Secrets]

tasks *scheduledtask.Scheduler
pubSub *pubsub.Service
storage *artefacts.OCIArtefactService
tasks *scheduledtask.Scheduler
pubSub *pubsub.Service
timelineClient *timeline.Client
storage *artefacts.OCIArtefactService

// Map from runnerKey.String() to client.
clients *ttlcache.Cache[string, clients]
Expand All @@ -175,6 +177,7 @@ func New(
conn *sql.DB,
cm *cf.Manager[configuration.Configuration],
sm *cf.Manager[configuration.Secrets],
timelineClient *timeline.Client,
storage *artefacts.OCIArtefactService,
config Config,
devel bool,
Expand All @@ -200,6 +203,7 @@ func New(
cm: cm,
sm: sm,
tasks: scheduler,
timelineClient: timelineClient,
leaser: ldb,
key: key,
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
Expand All @@ -209,10 +213,10 @@ func New(
controllerState: state.NewInMemoryState(),
}

pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState)
pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState, timelineClient)
svc.pubSub = pubSub

svc.deploymentLogsSink = newDeploymentLogsSink(ctx)
svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineClient)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
Expand Down Expand Up @@ -359,7 +363,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

timeline.ClientFromContext(ctx).Publish(ctx, timeline.Log{
s.timelineClient.Publish(ctx, timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Time: msg.TimeStamp.AsTime(),
Expand Down Expand Up @@ -459,7 +463,7 @@ func (s *Service) setDeploymentReplicas(ctx context.Context, key model.Deploymen
return fmt.Errorf("could not activate deployment: %w", err)
}
}
timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentUpdated{
s.timelineClient.Publish(ctx, timeline.DeploymentUpdated{
DeploymentKey: key,
MinReplicas: minReplicas,
PrevMinReplicas: deployment.MinReplicas,
Expand Down Expand Up @@ -519,7 +523,7 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
}
}

timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentCreated{
s.timelineClient.Publish(ctx, timeline.DeploymentCreated{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.Module,
Expand Down Expand Up @@ -901,23 +905,23 @@ func (s *Service) callWithRequest(
err = fmt.Errorf("no routes for module %q", module)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
s.timelineClient.Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodeNotFound, err)
}

if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
s.timelineClient.Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
}

err = validateCallBody(req.Msg.Body, verb, sch)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body"))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
s.timelineClient.Publish(ctx, callEvent)
return nil, err
}

Expand All @@ -942,7 +946,7 @@ func (s *Service) callWithRequest(
logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module)
}

timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
s.timelineClient.Publish(ctx, callEvent)
return resp, err
}

Expand Down
8 changes: 4 additions & 4 deletions backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (

var _ log.Sink = (*deploymentLogsSink)(nil)

func newDeploymentLogsSink(ctx context.Context) *deploymentLogsSink {
func newDeploymentLogsSink(ctx context.Context, timelineClient *timeline.Client) *deploymentLogsSink {
sink := &deploymentLogsSink{
logQueue: make(chan log.Entry, 10000),
}

// Process logs in background
go sink.processLogs(ctx)
go sink.processLogs(ctx, timelineClient)

return sink
}
Expand All @@ -40,7 +40,7 @@ func (d *deploymentLogsSink) Log(entry log.Entry) error {
return nil
}

func (d *deploymentLogsSink) processLogs(ctx context.Context) {
func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *timeline.Client) {
for {
select {
case entry := <-d.logQueue:
Expand Down Expand Up @@ -69,7 +69,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
errorStr = optional.Some(entry.Error.Error())
}

timeline.ClientFromContext(ctx).Publish(ctx, &timeline.Log{
timelineClient.Publish(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand Down
11 changes: 9 additions & 2 deletions backend/controller/pubsub/internal/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package dal

import (
"context"
"net/url"
"testing"

"github.com/alecthomas/assert/v2"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
Expand All @@ -18,9 +20,14 @@ func TestNoCallToAcquire(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)

dal := New(conn)
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)

_, _, err := dal.AcquireAsyncCall(ctx)
timelineClient := timeline.NewClient(ctx, timelineEndpoint)

dal := New(conn, timelineClient)

_, _, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, libdal.ErrNotFound)
assert.EqualError(t, err, "no pending async calls: not found")
}
Expand Down
10 changes: 6 additions & 4 deletions backend/controller/pubsub/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ import (

type DAL struct {
*libdal.Handle[DAL]
db dalsql.Querier
db dalsql.Querier
timelineClient *timeline.Client
}

func New(conn libdal.Connection) *DAL {
func New(conn libdal.Connection, timelineClient *timeline.Client) *DAL {
return &DAL{
Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL {
return &DAL{
Handle: h,
db: dalsql.New(h.Connection),
}
}),
db: dalsql.New(conn),
db: dalsql.New(conn),
timelineClient: timelineClient,
}
}

Expand Down Expand Up @@ -108,7 +110,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
for _, subscription := range subs {
now := time.Now().UTC()
enqueueTimelineEvent := func(destVerb optional.Option[schema.RefKey], err optional.Option[string]) {
timeline.ClientFromContext(ctx).Publish(ctx, &timeline.PubSubConsume{
d.timelineClient.Publish(ctx, &timeline.PubSubConsume{
DeploymentKey: subscription.DeploymentKey,
RequestKey: subscription.RequestKey,
Time: now,
Expand Down
12 changes: 7 additions & 5 deletions backend/controller/pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ type Service struct {
verbRouting *routing.VerbCallRouter
asyncCallsLock sync.Mutex
controllerState state.ControllerState
timelineClient *timeline.Client
}

func New(ctx context.Context, conn libdal.Connection, rt *routing.RouteTable, controllerState state.ControllerState) *Service {
func New(ctx context.Context, conn libdal.Connection, rt *routing.RouteTable, controllerState state.ControllerState, timelineClient *timeline.Client) *Service {
m := &Service{
dal: dal.New(conn),
dal: dal.New(conn, timelineClient),
eventPublished: make(chan struct{}),
routeTable: rt,
verbRouting: routing.NewVerbRouterFromTable(ctx, rt),
verbRouting: routing.NewVerbRouterFromTable(ctx, rt, timelineClient),
controllerState: controllerState,
timelineClient: timelineClient,
}
go m.watchEventStream(ctx)
go m.poll(ctx)
Expand Down Expand Up @@ -196,7 +198,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlpubs
routes := s.routeTable.Current()
route, ok := routes.GetDeployment(module).Get()
if ok {
timeline.ClientFromContext(ctx).Publish(ctx, timeline.PubSubPublish{
s.timelineClient.Publish(ctx, timeline.PubSubPublish{
DeploymentKey: route,
RequestKey: requestKey,
Time: now,
Expand Down Expand Up @@ -266,7 +268,7 @@ func (s *Service) ExecuteAsyncCalls(ctx context.Context) (interval time.Duration
if e, ok := err.Get(); ok {
errStr = optional.Some(e.Error())
}
timeline.ClientFromContext(ctx).Publish(ctx, timeline.AsyncExecute{
s.timelineClient.Publish(ctx, timeline.AsyncExecute{
DeploymentKey: deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Expand Down
8 changes: 4 additions & 4 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c cronJob) String() string {
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient) error {
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timeline.Client) error {
logger := log.FromContext(ctx).Scope("cron")
ctx = log.ContextWithLogger(ctx, logger)
// Map of cron jobs for each module.
Expand All @@ -55,7 +55,7 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, clien
logger.Debugf("Starting cron service")

for {
next, ok := scheduleNext(ctx, cronQueue)
next, ok := scheduleNext(ctx, cronQueue, timelineClient)
var nextCh <-chan time.Time
if ok {
logger.Debugf("Next cron job scheduled in %s", next)
Expand Down Expand Up @@ -128,11 +128,11 @@ func callCronJob(ctx context.Context, verbClient routing.CallClient, cronJob cro
}
}

func scheduleNext(ctx context.Context, cronQueue []cronJob) (time.Duration, bool) {
func scheduleNext(ctx context.Context, cronQueue []cronJob, timelineClient *timeline.Client) (time.Duration, bool) {
if len(cronQueue) == 0 {
return 0, false
}
timeline.ClientFromContext(ctx).Publish(ctx, timeline.CronScheduled{
timelineClient.Publish(ctx, timeline.CronScheduled{
DeploymentKey: model.NewDeploymentKey(cronQueue[0].module),
Verb: schema.Ref{Module: cronQueue[0].module, Name: cronQueue[0].verb.Name},
ScheduledAt: cronQueue[0].next,
Expand Down
5 changes: 3 additions & 2 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func TestCron(t *testing.T) {
ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)
ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint))

timelineClient := timeline.NewClient(ctx, timelineEndpoint)
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
t.Cleanup(cancel)

Expand All @@ -77,7 +78,7 @@ func TestCron(t *testing.T) {
requests: requestsch,
}

wg.Go(func() error { return Start(ctx, eventSource, client) })
wg.Go(func() error { return Start(ctx, eventSource, client, timelineClient) })

requests := make([]*ftlv1.CallRequest, 0, 2)

Expand Down
Loading

0 comments on commit a373bbf

Please sign in to comment.