Skip to content

Commit

Permalink
chore: publish timeline events from cron + integration tests (#3667)
Browse files Browse the repository at this point in the history
fixes #3634
fixes #3654
  • Loading branch information
worstell authored Dec 8, 2024
1 parent 0b5f9cb commit 19278b5
Show file tree
Hide file tree
Showing 42 changed files with 2,497 additions and 6 deletions.
12 changes: 10 additions & 2 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/TBD54566975/ftl/backend/cron/observability"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -31,6 +32,7 @@ type cronJob struct {

type Config struct {
SchemaServiceEndpoint *url.URL `name:"ftl-endpoint" help:"Schema Service endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"`
}

func (c cronJob) String() string {
Expand All @@ -53,7 +55,7 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, clien
logger.Debugf("Starting cron service")

for {
next, ok := scheduleNext(cronQueue)
next, ok := scheduleNext(ctx, cronQueue)
var nextCh <-chan time.Time
if ok {
logger.Debugf("Next cron job scheduled in %s", next)
Expand Down Expand Up @@ -126,10 +128,16 @@ func callCronJob(ctx context.Context, verbClient routing.CallClient, cronJob cro
}
}

func scheduleNext(cronQueue []cronJob) (time.Duration, bool) {
func scheduleNext(ctx context.Context, cronQueue []cronJob) (time.Duration, bool) {
if len(cronQueue) == 0 {
return 0, false
}
timeline.Publish(ctx, timeline.CronScheduled{
DeploymentKey: model.NewDeploymentKey(cronQueue[0].module),
Verb: schema.Ref{Module: cronQueue[0].module, Name: cronQueue[0].verb.Name},
ScheduledAt: cronQueue[0].next,
Schedule: cronQueue[0].pattern.String(),
})
return time.Until(cronQueue[0].next), true
}

Expand Down
4 changes: 4 additions & 0 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cron

import (
"context"
"net/http"
"os"
"sort"
"testing"
Expand All @@ -14,10 +15,12 @@ import (
"github.com/alecthomas/types/optional"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)
Expand Down Expand Up @@ -62,6 +65,7 @@ func TestCron(t *testing.T) {
})

ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
ctx = rpc.ContextWithClient(ctx, timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080"))
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
t.Cleanup(cancel)

Expand Down
1 change: 1 addition & 0 deletions backend/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
_, err = w.Write(responseBody)
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
timeline.Publish(r.Context(), ingressEvent)
} else {
logger.Errorf(err, "could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
Expand Down
3 changes: 3 additions & 0 deletions backend/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/go-runtime/encoding"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
)

Expand Down Expand Up @@ -66,6 +68,7 @@ func TestIngress(t *testing.T) {
}

ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx = rpc.ContextWithClient(ctx, timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080"))
assert.NoError(t, err)

for _, test := range []struct {
Expand Down
42 changes: 42 additions & 0 deletions backend/timeline/events_cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package timeline

import (
"time"

"github.com/alecthomas/types/optional"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
)

type CronScheduled struct {
DeploymentKey model.DeploymentKey
Verb schema.Ref
Time time.Time
ScheduledAt time.Time
Schedule string
Error optional.Option[string]
}

var _ Event = CronScheduled{}

func (e CronScheduled) clientEvent() {}
func (e CronScheduled) ToReq() (*timelinepb.CreateEventRequest, error) {
return &timelinepb.CreateEventRequest{
Entry: &timelinepb.CreateEventRequest_CronScheduled{
CronScheduled: &timelinepb.CronScheduledEvent{
DeploymentKey: e.DeploymentKey.String(),
VerbRef: (&e.Verb).ToProto().(*schemapb.Ref), //nolint:forcetypeassert
Timestamp: timestamppb.New(e.Time),
ScheduledAt: timestamppb.New(e.ScheduledAt),
Schedule: e.Schedule,
Error: e.Error.Ptr(),
Duration: durationpb.New(time.Since(e.Time)),
},
},
}, nil
}
3 changes: 3 additions & 0 deletions backend/timeline/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func filtersFromRequest(req *timelinepb.GetTimelineRequest) (outFilters []Timeli
for _, filter := range req.Filters {
reqFiltersByType[reflect.TypeOf(filter.Filter)] = append(reqFiltersByType[reflect.TypeOf(filter.Filter)], filter)
}
if len(reqFiltersByType) == 0 {
return outFilters, ascending
}
for _, filters := range reqFiltersByType {
switch filters[0].Filter.(type) {
case *timelinepb.GetTimelineRequest_Filter_LogLevel:
Expand Down
142 changes: 142 additions & 0 deletions backend/timeline/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//go:build integration

package timeline

import (
"context"
"net/http"
"testing"

"github.com/alecthomas/assert/v2"

timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
in "github.com/TBD54566975/ftl/internal/integration"
)

func TestTimeline(t *testing.T) {
in.Run(t,
in.WithLanguages("go"),
in.CopyModule("cron"),
in.CopyModule("time"),
in.CopyModule("echo"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.CopyModule("ingress"),
in.Deploy("cron"),
in.Deploy("time"),
in.Deploy("echo"),
in.Deploy("publisher"),
in.Deploy("subscriber"),
in.Deploy("ingress"),

// Trigger events
in.HttpCall(http.MethodGet, "/users/123/posts/456", nil, nil, func(t testing.TB, resp *in.HTTPResponse) {}),
in.Call("echo", "echo", in.Obj{}, func(t testing.TB, response in.Obj) {}),
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),

in.SubTests(
in.SubTest{Name: "Limit", Action: in.VerifyTimeline(1, []*timelinepb.GetTimelineRequest_Filter{}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.Equal(t, 1, len(events))
})},
in.SubTest{Name: "IngressEvent", Action: in.VerifyTimeline(1000, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_INGRESS},
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.Equal(t, 1, len(events))
ingress, ok := events[0].Entry.(*timelinepb.Event_Ingress)
assert.True(t, ok, "expected ingress event")

assert.Equal(t, ingress.Ingress.VerbRef.Module, "ingress")
assert.Equal(t, ingress.Ingress.VerbRef.Name, "get")
})},
in.SubTest{Name: "CallEvent", Action: in.VerifyTimeline(1000, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_CALL},
},
},
},
{
Filter: &timelinepb.GetTimelineRequest_Filter_Call{
Call: &timelinepb.GetTimelineRequest_CallFilter{
DestModule: "echo",
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.Equal(t, 1, len(events))
call, ok := events[0].Entry.(*timelinepb.Event_Call)
assert.True(t, ok, "expected call event")

assert.Equal(t, call.Call.DestinationVerbRef.Module, "echo")
assert.Equal(t, call.Call.DestinationVerbRef.Name, "echo")
assert.Contains(t, call.Call.Response, "Hello, world!!!")
})},
in.SubTest{Name: "CronEvent", Action: in.VerifyTimeline(1, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_CRON_SCHEDULED},
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
scheduled, ok := events[0].Entry.(*timelinepb.Event_CronScheduled)
assert.True(t, ok, "expected scheduled event")

assert.Equal(t, scheduled.CronScheduled.VerbRef.Module, "cron")
assert.Equal(t, scheduled.CronScheduled.VerbRef.Name, "job")
})},
in.SubTest{Name: "PublishEvent", Action: in.VerifyTimeline(1000, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_PUBSUB_PUBLISH},
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.Equal(t, 1, len(events))
publish, ok := events[0].Entry.(*timelinepb.Event_PubsubPublish)
assert.True(t, ok, "expected publish event")

assert.Equal(t, publish.PubsubPublish.Topic, "testTopic")
assert.Equal(t, publish.PubsubPublish.VerbRef.Module, "publisher")
assert.Equal(t, publish.PubsubPublish.VerbRef.Name, "publish")
})},
in.SubTest{Name: "ConsumeEvent", Action: in.VerifyTimeline(1000, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_PUBSUB_CONSUME},
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.Equal(t, 1, len(events))
consume, ok := events[0].Entry.(*timelinepb.Event_PubsubConsume)
assert.True(t, ok, "expected consume event")

assert.Equal(t, *consume.PubsubConsume.DestVerbModule, "subscriber")
assert.Equal(t, *consume.PubsubConsume.DestVerbName, "consume")
})},
in.SubTest{Name: "DeleteOldEvents", Action: in.DeleteOldTimelineEvents(1, timelinepb.EventType_EVENT_TYPE_INGRESS,
func(ctx context.Context, t testing.TB, expectDeleted int, events []*timelinepb.Event) {
assert.Equal(t, expectDeleted, 1)
for _, event := range events {
_, ok := event.Entry.(*timelinepb.Event_Ingress)
if ok {
t.Errorf("expected no ingress events, got %v", event.Entry)
}
}
}),
},
),
)
}
7 changes: 7 additions & 0 deletions backend/timeline/testdata/go/cron/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# FTL cron example

Cron jobs will be scheduled on deploy. Expect:
```
info:cron: Frequent cron job triggered.
info:cron: Hourly cron job triggered.
```
13 changes: 13 additions & 0 deletions backend/timeline/testdata/go/cron/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cron

import (
"context"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)

//ftl:cron 2s
func Job(ctx context.Context) error {
ftl.LoggerFromContext(ctx).Infof("Frequent cron job triggered.")
return nil
}
2 changes: 2 additions & 0 deletions backend/timeline/testdata/go/cron/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "cron"
language = "go"
50 changes: 50 additions & 0 deletions backend/timeline/testdata/go/cron/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module ftl/cron

go 1.23.0

replace github.com/TBD54566975/ftl => ../../../../..

require github.com/TBD54566975/ftl v0.0.0-00010101000000-000000000000

require (
al.essio.dev/pkg/shellescape v1.5.1 // indirect
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.17.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/danieljoos/wincred v1.2.2 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/swaggest/jsonschema-go v0.3.72 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.6 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/sync v0.9.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.20.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
)
Loading

0 comments on commit 19278b5

Please sign in to comment.