Skip to content

Commit

Permalink
feat: stream events to timeline service in batches (#3683)
Browse files Browse the repository at this point in the history
- Each timeline client now batches events to publish to timeline service
- Timeline service no longer guarantees that events are in time order.
- Steaming fixes:
    - Streaming now uses event id for paging
- Streaming now works when order is set to descending (each batch is
identical to ascending now, but in reverse order)
- Added integration test to ensure streaming works with the paging logic

closes [Rate limit and batch new events when publishing to timeline
#3671](#3671)

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
matt2e and github-actions[bot] authored Dec 9, 2024
1 parent f5faf04 commit 4c88c60
Show file tree
Hide file tree
Showing 36 changed files with 958 additions and 588 deletions.
8 changes: 4 additions & 4 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/buildengine"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/slices"
)
Expand Down Expand Up @@ -549,7 +549,7 @@ func buildGraph(sch *schema.Schema, module *schema.Module, out map[string][]stri
}

func (c *ConsoleService) GetTimeline(ctx context.Context, req *connect.Request[timelinepb.GetTimelineRequest]) (*connect.Response[timelinepb.GetTimelineResponse], error) {
client := rpc.ClientFromContext[timelinev1connect.TimelineServiceClient](ctx)
client := timeline.ClientFromContext(ctx)
resp, err := client.GetTimeline(ctx, connect.NewRequest(req.Msg))
if err != nil {
return nil, fmt.Errorf("failed to get timeline from service: %w", err)
Expand All @@ -558,7 +558,7 @@ func (c *ConsoleService) GetTimeline(ctx context.Context, req *connect.Request[t
}

func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Request[timelinepb.StreamTimelineRequest], out *connect.ServerStream[timelinepb.StreamTimelineResponse]) error {
client := rpc.ClientFromContext[timelinev1connect.TimelineServiceClient](ctx)
client := timeline.ClientFromContext(ctx)
stream, err := client.StreamTimeline(ctx, req)
if err != nil {
return fmt.Errorf("failed to stream timeline from service: %w", err)
Expand All @@ -576,7 +576,7 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
}
return nil
}
func (c *ConsoleService) CreateEvent(ctx context.Context, req *connect.Request[timelinepb.CreateEventRequest]) (*connect.Response[timelinepb.CreateEventResponse], error) {
func (c *ConsoleService) CreateEvents(ctx context.Context, req *connect.Request[timelinepb.CreateEventsRequest]) (*connect.Response[timelinepb.CreateEventsResponse], error) {
return nil, fmt.Errorf("not implemented")
}

Expand Down
14 changes: 7 additions & 7 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

timeline.Publish(ctx, timeline.Log{
timeline.ClientFromContext(ctx).Publish(ctx, timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Time: msg.TimeStamp.AsTime(),
Expand Down Expand Up @@ -828,7 +828,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl
routes := s.routeTable.Current()
route, ok := routes.GetDeployment(module).Get()
if ok {
timeline.Publish(ctx, timeline.PubSubPublish{
timeline.ClientFromContext(ctx).Publish(ctx, timeline.PubSubPublish{
DeploymentKey: route,
RequestKey: requestKey,
Time: now,
Expand Down Expand Up @@ -942,15 +942,15 @@ func (s *Service) callWithRequest(
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.Publish(ctx, callEvent)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
}

err = validateCallBody(req.Msg.Body, verb, sch)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body"))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.Publish(ctx, callEvent)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
return nil, err
}

Expand All @@ -975,7 +975,7 @@ func (s *Service) callWithRequest(
logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module)
}

timeline.Publish(ctx, callEvent)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
return resp, err
}

Expand Down Expand Up @@ -1187,7 +1187,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if e, ok := err.Get(); ok {
errStr = optional.Some(e.Error())
}
timeline.Publish(ctx, timeline.AsyncExecute{
timeline.ClientFromContext(ctx).Publish(ctx, timeline.AsyncExecute{
DeploymentKey: deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Expand Down Expand Up @@ -1609,7 +1609,7 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
return time.Hour, nil
}

client := rpc.ClientFromContext[timelinev1connect.TimelineServiceClient](ctx)
client := timeline.ClientFromContext(ctx)
resp, err := client.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{
EventType: timelinepb.EventType_EVENT_TYPE_CALL,
AgeSeconds: int64(s.config.EventLogRetention.Seconds()),
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return libdal.TranslatePGError(err)
}
}
timeline.Publish(ctx, timeline.DeploymentUpdated{
timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentUpdated{
DeploymentKey: key,
MinReplicas: minReplicas,
PrevMinReplicas: int(deployment.MinReplicas),
Expand Down Expand Up @@ -272,7 +272,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

timeline.Publish(ctx, timeline.DeploymentCreated{
timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentCreated{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
Expand Down
11 changes: 5 additions & 6 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package dal
import (
"bytes"
"context"
"net/http"
"net/url"
"sync"
"testing"
"time"
Expand All @@ -13,23 +13,23 @@ import (
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
"github.com/TBD54566975/ftl/backend/timeline"

dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/sha256"
)

func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
timelineClient := timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080")
ctx = rpc.ContextWithClient(ctx, timelineClient)
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)
ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint))
conn := sqltest.OpenForTesting(ctx, t)

pubSub := pubsub.New(ctx, conn, optional.None[pubsub.AsyncCallListener]())
Expand All @@ -38,7 +38,6 @@ func TestDAL(t *testing.T) {
var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
var testSHA = sha256.Sum(testContent)

var err error
deploymentChangesCh := dal.DeploymentChanges.Subscribe(nil)
deploymentChanges := []DeploymentNotification{}
wg := errgroup.Group{}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
errorStr = optional.Some(entry.Error.Error())
}

timeline.Publish(ctx, &timeline.Log{
timeline.ClientFromContext(ctx).Publish(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/pubsub/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
for _, subscription := range subs {
now := time.Now().UTC()
enqueueTimelineEvent := func(destVerb optional.Option[schema.RefKey], err optional.Option[string]) {
timeline.Publish(ctx, &timeline.PubSubConsume{
timeline.ClientFromContext(ctx).Publish(ctx, &timeline.PubSubConsume{
DeploymentKey: subscription.DeploymentKey,
RequestKey: subscription.RequestKey,
Time: now,
Expand Down
2 changes: 1 addition & 1 deletion backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func scheduleNext(ctx context.Context, cronQueue []cronJob) (time.Duration, bool
if len(cronQueue) == 0 {
return 0, false
}
timeline.Publish(ctx, timeline.CronScheduled{
timeline.ClientFromContext(ctx).Publish(ctx, timeline.CronScheduled{
DeploymentKey: model.NewDeploymentKey(cronQueue[0].module),
Verb: schema.Ref{Module: cronQueue[0].module, Name: cronQueue[0].verb.Name},
ScheduledAt: cronQueue[0].next,
Expand Down
11 changes: 6 additions & 5 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cron

import (
"context"
"net/http"
"net/url"
"os"
"sort"
"testing"
Expand All @@ -15,12 +15,11 @@ import (
"github.com/alecthomas/types/optional"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)
Expand Down Expand Up @@ -65,7 +64,9 @@ func TestCron(t *testing.T) {
})

ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
ctx = rpc.ContextWithClient(ctx, timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080"))
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)
ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint))
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
t.Cleanup(cancel)

Expand Down Expand Up @@ -112,6 +113,6 @@ done:
},
}, requests, assert.Exclude[*schemapb.Position]())

err := wg.Wait()
err = wg.Wait()
assert.IsError(t, err, context.Canceled)
}
4 changes: 2 additions & 2 deletions backend/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
_, err = w.Write(responseBody)
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
timeline.Publish(r.Context(), ingressEvent)
timeline.ClientFromContext(r.Context()).Publish(r.Context(), ingressEvent)
} else {
logger.Errorf(err, "could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
Expand All @@ -166,7 +166,7 @@ func recordIngressErrorEvent(
) {
ingressEvent.ResponseStatus = statusCode
ingressEvent.Error = optional.Some(errorMsg)
timeline.Publish(ctx, ingressEvent)
timeline.ClientFromContext(ctx).Publish(ctx, ingressEvent)
}

// Copied from the Apache-licensed connect-go source.
Expand Down
7 changes: 4 additions & 3 deletions backend/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ import (
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/go-runtime/encoding"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
)

Expand Down Expand Up @@ -68,7 +67,9 @@ func TestIngress(t *testing.T) {
}

ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx = rpc.ContextWithClient(ctx, timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080"))
timelineEndpoint, err := url.Parse("http://localhost:8080")
assert.NoError(t, err)
ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint))
assert.NoError(t, err)

for _, test := range []struct {
Expand Down
Loading

0 comments on commit 4c88c60

Please sign in to comment.