Skip to content

Commit

Permalink
lint/small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
worstell committed Dec 5, 2024
1 parent 7bf6535 commit 998e046
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 7 deletions.
5 changes: 5 additions & 0 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dal
import (
"bytes"
"context"
"net/http"
"sync"
"testing"
"time"
Expand All @@ -12,6 +13,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"

dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
Expand All @@ -20,12 +22,15 @@ import (
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/sha256"
)

func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
timelineClient := timelinev1connect.NewTimelineServiceClient(http.DefaultClient, "http://localhost:8080")
ctx = rpc.ContextWithClient(ctx, timelineClient)
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion backend/timeline/events_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (a AsyncExecute) ToReq() (*timelinepb.CreateEventRequest, error) {
Timestamp: timestamppb.New(a.Time),
Error: a.Error.Ptr(),
Duration: durationpb.New(time.Since(a.Time)),
VerbRef: (&a.Verb).ToProto().(*schemapb.Ref), //nolint:forceassert
VerbRef: (&a.Verb).ToProto().(*schemapb.Ref), //nolint:forcetypeassert
AsyncEventType: asyncExecuteEventTypeToProto(a.EventType),
},
},
Expand Down
4 changes: 2 additions & 2 deletions backend/timeline/events_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c Call) ToReq() (*timelinepb.CreateEventRequest, error) {
}
var sourceVerb *schemapb.Ref
if len(c.Callers) > 0 {
sourceVerb = c.Callers[0].ToProto().(*schemapb.Ref) //nolint:forceassert
sourceVerb = c.Callers[0].ToProto().(*schemapb.Ref) //nolint:forcetypeassert
}

return &timelinepb.CreateEventRequest{
Expand All @@ -58,7 +58,7 @@ func (c Call) ToReq() (*timelinepb.CreateEventRequest, error) {
Response: string(responseBody),
Error: respError,
SourceVerbRef: sourceVerb,
DestinationVerbRef: c.DestVerb.ToProto().(*schemapb.Ref), //nolint:forceassert
DestinationVerbRef: c.DestVerb.ToProto().(*schemapb.Ref), //nolint:forcetypeassert
Duration: durationpb.New(time.Since(c.StartTime)),
Request: string(c.Request.GetBody()),
Stack: stack,
Expand Down
2 changes: 1 addition & 1 deletion backend/timeline/events_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (i Ingress) ToReq() (*timelinepb.CreateEventRequest, error) {
DeploymentKey: i.DeploymentKey.String(),
RequestKey: &requestKey,
Timestamp: timestamppb.New(i.StartTime),
VerbRef: i.Verb.ToProto().(*schemapb.Ref), //nolint:forceassert
VerbRef: i.Verb.ToProto().(*schemapb.Ref), //nolint:forcetypeassert
Method: i.RequestMethod,
Path: i.RequestPath,
StatusCode: int32(i.ResponseStatus),
Expand Down
2 changes: 1 addition & 1 deletion backend/timeline/events_pubsub_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (p PubSubPublish) ToReq() (*timelinepb.CreateEventRequest, error) {
PubsubPublish: &timelinepb.PubSubPublishEvent{
DeploymentKey: p.DeploymentKey.String(),
RequestKey: p.RequestKey.Ptr(),
VerbRef: (&p.SourceVerb).ToProto().(*schemapb.Ref), //nolint:forceassert
VerbRef: (&p.SourceVerb).ToProto().(*schemapb.Ref), //nolint:forcetypeassert
Timestamp: timestamppb.New(p.Time),
Duration: durationpb.New(time.Since(p.Time)),
Topic: p.Topic,
Expand Down
7 changes: 5 additions & 2 deletions backend/timeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func Start(ctx context.Context, config Config, schemaEventSource schemaeventsour
config.SetDefaults()

logger := log.FromContext(ctx).Scope("timeline")
svc := &service{}
svc := &service{
events: make([]*timelinepb.Event, 0),
nextID: 0,
}

logger.Debugf("Timeline service listening on: %s", config.Bind)
err := rpc.Serve(ctx, config.Bind,
Expand Down Expand Up @@ -134,7 +137,7 @@ func (s *service) GetTimeline(ctx context.Context, req *connect.Request[timeline
_, didNotMatchAFilter := slices.Find(filters, func(filter TimelineFilter) bool {
return !filter(event)
})
for didNotMatchAFilter {
if didNotMatchAFilter {
continue
}
results = append(results, s.events[i])
Expand Down
15 changes: 15 additions & 0 deletions internal/integration/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/TBD54566975/ftl/internal/dsn"
ftlexec "github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/log"
Expand Down Expand Up @@ -473,6 +474,20 @@ func VerifySchemaVerb(module string, verb string, check func(ctx context.Context
}
}

// VerifyTimeline lets you test the current timeline
func VerifyTimeline(filters []*timelinepb.GetTimelineRequest_Filter, check func(ctx context.Context, t testing.TB, events []*timelinepb.Event)) Action {
return func(t testing.TB, ic TestContext) {
resp, err := ic.Timeline.GetTimeline(ic, connect.NewRequest(&timelinepb.GetTimelineRequest{
Filters: filters,
}))
if err != nil {
t.Errorf("failed to get timeline: %v", err)
return
}
check(ic.Context, t, resp.Msg.Events)
}
}

// Fail expects the next action to Fail.
func Fail(next Action, msg string, args ...any) Action {
return func(t testing.TB, ic TestContext) {
Expand Down
17 changes: 17 additions & 0 deletions internal/integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
provisionerconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/provisioner/v1beta1/provisionerpbconnect"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/provisioner/scaling/k8sscaling"
Expand Down Expand Up @@ -301,6 +302,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
var console pbconsoleconnect.ConsoleServiceClient
var provisioner provisionerconnect.ProvisionerServiceClient
var schema ftlv1connect.SchemaServiceClient
var timeline timelinev1connect.TimelineServiceClient
if opts.startController {
Infof("Starting ftl cluster")

Expand All @@ -324,6 +326,9 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
}
ctx = startProcess(ctx, t, tmpDir, opts.devMode, args...)
}
if opts.startTimeline {
timeline = rpc.Dial(timelinev1connect.NewTimelineServiceClient, "http://localhost:8894", log.Debug)
}
if opts.startController || opts.kube {
controller = rpc.Dial(ftlv1connect.NewControllerServiceClient, "http://localhost:8892", log.Debug)
console = rpc.Dial(pbconsoleconnect.NewConsoleServiceClient, "http://localhost:8892", log.Debug)
Expand Down Expand Up @@ -353,10 +358,21 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
}
defer dumpKubePods(ctx, ic.kubeClient, ic.kubeNamespace)

if opts.startTimeline {
ic.Timeline = timeline

Infof("Waiting for timeline to be ready")
ic.AssertWithRetry(t, func(t testing.TB, ic TestContext) {
_, err := ic.Timeline.Ping(ic, connect.NewRequest(&ftlv1.PingRequest{}))
assert.NoError(t, err)
})
}

if opts.startController || opts.kube {
ic.Controller = controller
ic.Schema = schema
ic.Console = console
ic.Timeline = timeline

Infof("Waiting for controller to be ready")
ic.AssertWithRetry(t, func(t testing.TB, ic TestContext) {
Expand Down Expand Up @@ -436,6 +452,7 @@ type TestContext struct {
Schema ftlv1connect.SchemaServiceClient
Console pbconsoleconnect.ConsoleServiceClient
Verbs ftlv1connect.VerbServiceClient
Timeline timelinev1connect.TimelineServiceClient

realT *testing.T
}
Expand Down

0 comments on commit 998e046

Please sign in to comment.