Skip to content

Commit

Permalink
fill out more details
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 5, 2024
1 parent 411a38d commit 6d039d1
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 596 deletions.
94 changes: 27 additions & 67 deletions backend/timeline/events_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,15 @@ package timeline
import (
"time"

"github.com/alecthomas/types/optional"
"google.golang.org/protobuf/types/known/timestamppb"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/alecthomas/types/optional"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// type AsyncExecuteEvent struct {
// ID int64
// Duration time.Duration
// AsyncExecute
// }

// func (e *AsyncExecuteEvent) GetID() int64 { return e.ID }
// func (e *AsyncExecuteEvent) event() {}

type AsyncExecuteEventType string

const (
Expand All @@ -28,6 +20,20 @@ const (
AsyncExecuteEventTypePubSub AsyncExecuteEventType = "pubsub"
)

func asyncExecuteEventTypeToProto(eventType AsyncExecuteEventType) timelinepb.AsyncExecuteEventType {
switch eventType {
case AsyncExecuteEventTypeCron:
return timelinepb.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_CRON
case AsyncExecuteEventTypePubSub:
return timelinepb.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_PUBSUB
case AsyncExecuteEventTypeUnkown:
return timelinepb.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_UNSPECIFIED

default:
panic("unknown async execute event type")
}
}

type AsyncExecute struct {
DeploymentKey model.DeploymentKey
RequestKey optional.Option[string]
Expand All @@ -40,64 +46,18 @@ type AsyncExecute struct {
var _ Event = AsyncExecute{}

func (AsyncExecute) clientEvent() {}
func (a AsyncExecute) ToReq() *timelinepb.CreateEventRequest {
func (a AsyncExecute) ToReq() (*timelinepb.CreateEventRequest, error) {
return &timelinepb.CreateEventRequest{
Entry: &timelinepb.CreateEventRequest_AsyncExecute{
AsyncExecute: &timelinepb.AsyncExecuteEvent{
DeploymentKey: a.DeploymentKey.String(),
RequestKey: a.RequestKey.Ptr(),
Timestamp: timestamppb.New(a.Time),
Error: a.Error.Ptr(),
// TODO: fill in
// VerbRef *v1.Ref `protobuf:"bytes,3,opt,name=verb_ref,json=verbRef,proto3" json:"verb_ref,omitempty"`
// Duration *durationpb.Duration `protobuf:"bytes,5,opt,name=duration,proto3" json:"duration,omitempty"`
// AsyncEventType AsyncExecuteEventType `protobuf:"varint,6,opt,name=async_event_type,json=asyncEventType,proto3,enum=xyz.block.ftl.timeline.v1.AsyncExecuteEventType" json:"async_event_type,omitempty"`
DeploymentKey: a.DeploymentKey.String(),
RequestKey: a.RequestKey.Ptr(),
Timestamp: timestamppb.New(a.Time),
Error: a.Error.Ptr(),
Duration: durationpb.New(time.Since(a.Time)),
VerbRef: (&a.Verb).ToProto().(*schemapb.Ref), //nolint:forceassert
AsyncEventType: asyncExecuteEventTypeToProto(a.EventType),
},
},
}
}, nil
}

// func (e *AsyncExecute) toEvent() (Event, error) { //nolint:unparam
// return &AsyncExecuteEvent{
// AsyncExecute: *e,
// Duration: time.Since(e.Time),
// }, nil
// }

// type eventAsyncExecuteJSON struct {
// DurationMS int64 `json:"duration_ms"`
// EventType AsyncExecuteEventType `json:"event_type"`
// Error optional.Option[string] `json:"error,omitempty"`
// }

// func (s *Service) insertAsyncExecuteEvent(ctx context.Context, querier sql.Querier, event *AsyncExecuteEvent) error {
// asyncJSON := eventAsyncExecuteJSON{
// DurationMS: event.Duration.Milliseconds(),
// EventType: event.EventType,
// Error: event.Error,
// }

// data, err := json.Marshal(asyncJSON)
// if err != nil {
// return fmt.Errorf("failed to marshal async execute event: %w", err)
// }

// var payload ftlencryption.EncryptedTimelineColumn
// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
// if err != nil {
// return fmt.Errorf("failed to encrypt cron JSON: %w", err)
// }

// err = libdal.TranslatePGError(querier.InsertTimelineAsyncExecuteEvent(ctx, sql.InsertTimelineAsyncExecuteEventParams{
// DeploymentKey: event.DeploymentKey,
// RequestKey: event.RequestKey,
// TimeStamp: event.Time,
// Module: event.Verb.Module,
// Verb: event.Verb.Name,
// Payload: payload,
// }))
// if err != nil {
// return fmt.Errorf("failed to insert async execute event: %w", err)
// }
// return err
// }
204 changes: 30 additions & 174 deletions backend/timeline/events_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,17 @@ package timeline
import (
"time"

"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
"google.golang.org/protobuf/types/known/timestamppb"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// type CallEvent struct {
// ID int64
// DeploymentKey model.DeploymentKey
// RequestKey optional.Option[model.RequestKey]
// ParentRequestKey optional.Option[model.RequestKey]
// Time time.Time
// SourceVerb optional.Option[schema.Ref]
// DestVerb schema.Ref
// Duration time.Duration
// Request json.RawMessage
// Response json.RawMessage
// Error optional.Option[string]
// Stack optional.Option[string]
// }

// func (e *CallEvent) GetID() int64 { return e.ID }
// func (e *CallEvent) event() {}

// type eventCallJSON struct {
// DurationMS int64 `json:"duration_ms"`
// Request json.RawMessage `json:"request"`
// Response json.RawMessage `json:"response"`
// Error optional.Option[string] `json:"error,omitempty"`
// Stack optional.Option[string] `json:"stack,omitempty"`
// }

type Call struct {
DeploymentKey model.DeploymentKey
RequestKey model.RequestKey
Expand All @@ -51,161 +26,42 @@ type Call struct {
}

func (Call) clientEvent() {}
func (c Call) ToReq() *timelinepb.CreateEventRequest {
func (c Call) ToReq() (*timelinepb.CreateEventRequest, error) {
requestKey := c.RequestKey.String()

var respError *string
var response string
_, err := c.Response.Result()
var responseBody []byte
var stack *string
resp, err := c.Response.Result()
if err != nil {
errStr := err.Error()
respError = &errStr
} else {
// TODO: fill in
responseBody = resp.GetBody()
if callError := resp.GetError(); callError != nil {
respError = optional.Some(callError.Message).Ptr()
stack = callError.Stack
}
}
var sourceVerb *schemapb.Ref
if len(c.Callers) > 0 {
sourceVerb = c.Callers[0].ToProto().(*schemapb.Ref) //nolint:forceassert
}

return &timelinepb.CreateEventRequest{
Entry: &timelinepb.CreateEventRequest_Call{
Call: &timelinepb.CallEvent{
RequestKey: &requestKey,
DeploymentKey: c.DeploymentKey.String(),
Timestamp: timestamppb.New(c.StartTime),
Response: response,
Error: respError,
// TODO: fill in
// SourceVerbRef *v1.Ref `protobuf:"bytes,11,opt,name=source_verb_ref,json=sourceVerbRef,proto3,oneof" json:"source_verb_ref,omitempty"`
// DestinationVerbRef *v1.Ref `protobuf:"bytes,12,opt,name=destination_verb_ref,json=destinationVerbRef,proto3" json:"destination_verb_ref,omitempty"`
// Duration *durationpb.Duration `protobuf:"bytes,6,opt,name=duration,proto3" json:"duration,omitempty"`
// Request string `protobuf:"bytes,7,opt,name=request,proto3" json:"request,omitempty"`
// Stack *string `protobuf:"bytes,10,opt,name=stack,proto3,oneof" json:"stack,omitempty"`
RequestKey: &requestKey,
DeploymentKey: c.DeploymentKey.String(),
Timestamp: timestamppb.New(c.StartTime),
Response: string(responseBody),
Error: respError,
SourceVerbRef: sourceVerb,
DestinationVerbRef: c.DestVerb.ToProto().(*schemapb.Ref), //nolint:forceassert
Duration: durationpb.New(time.Since(c.StartTime)),
Request: string(c.Request.GetBody()),
Stack: stack,
},
},
}
}, nil
}

// func (c *Call) toEvent() (Event, error) { return callToCallEvent(c), nil } //nolint:unparam

// func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, callEvent *CallEvent) error {
// var sourceModule, sourceVerb optional.Option[string]
// if sr, ok := callEvent.SourceVerb.Get(); ok {
// sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name)
// }

// var requestKey optional.Option[string]
// if rn, ok := callEvent.RequestKey.Get(); ok {
// requestKey = optional.Some(rn.String())
// }

// var parentRequestKey optional.Option[string]
// if pr, ok := callEvent.ParentRequestKey.Get(); ok {
// parentRequestKey = optional.Some(pr.String())
// }

// callJSON := eventCallJSON{
// DurationMS: callEvent.Duration.Milliseconds(),
// Request: callEvent.Request,
// Response: callEvent.Response,
// Error: callEvent.Error,
// Stack: callEvent.Stack,
// }

// data, err := json.Marshal(callJSON)
// if err != nil {
// return fmt.Errorf("failed to marshal call event: %w", err)
// }

// var payload ftlencryption.EncryptedTimelineColumn
// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
// if err != nil {
// return fmt.Errorf("failed to encrypt call event: %w", err)
// }

// err = libdal.TranslatePGError(querier.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
// DeploymentKey: callEvent.DeploymentKey,
// RequestKey: requestKey,
// ParentRequestKey: parentRequestKey,
// TimeStamp: callEvent.Time,
// SourceModule: sourceModule,
// SourceVerb: sourceVerb,
// DestModule: callEvent.DestVerb.Module,
// DestVerb: callEvent.DestVerb.Name,
// Payload: payload,
// }))
// if err != nil {
// return fmt.Errorf("failed to insert call event: %w", err)
// }
// return nil
// }

// func callToCallEvent(call *Call) *CallEvent {
// var sourceVerb optional.Option[schema.Ref]
// if len(call.Callers) > 0 {
// sourceVerb = optional.Some(*call.Callers[0])
// }

// var errorStr optional.Option[string]
// var stack optional.Option[string]
// var responseBody []byte

// switch response := call.Response.(type) {
// case either.Left[*ftlv1.CallResponse, error]:
// resp := response.Get()
// responseBody = resp.GetBody()
// if callError := resp.GetError(); callError != nil {
// errorStr = optional.Some(callError.Message)
// stack = optional.Ptr(callError.Stack)
// }
// case either.Right[*ftlv1.CallResponse, error]:
// callError := response.Get()
// errorStr = optional.Some(callError.Error())
// }

// return &CallEvent{
// Time: call.StartTime,
// DeploymentKey: call.DeploymentKey,
// RequestKey: optional.Some(call.RequestKey),
// ParentRequestKey: call.ParentRequestKey,
// Duration: time.Since(call.StartTime),
// SourceVerb: sourceVerb,
// DestVerb: *call.DestVerb,
// Request: call.Request.GetBody(),
// Response: responseBody,
// Error: errorStr,
// Stack: stack,
// }
// }

// func CallEventToCallForTesting(event *CallEvent) *Call {
// var response either.Either[*ftlv1.CallResponse, error]
// if eventErr, ok := event.Error.Get(); ok {
// response = either.RightOf[*ftlv1.CallResponse](errors.New(eventErr))
// } else {
// response = either.LeftOf[error](&ftlv1.CallResponse{
// Response: &ftlv1.CallResponse_Body{
// Body: event.Response,
// },
// })
// }

// var requestKey model.RequestKey
// if key, ok := event.RequestKey.Get(); ok {
// requestKey = key
// } else {
// requestKey = model.RequestKey{}
// }

// callers := []*schema.Ref{}
// if ref, ok := event.SourceVerb.Get(); ok {
// callers = []*schema.Ref{&ref}
// }

// return &Call{
// DeploymentKey: event.DeploymentKey,
// RequestKey: requestKey,
// ParentRequestKey: event.ParentRequestKey,
// StartTime: event.Time,
// DestVerb: &event.DestVerb,
// Callers: callers,
// Request: &ftlv1.CallRequest{Body: event.Request},
// Response: response,
// }
// }
10 changes: 1 addition & 9 deletions backend/timeline/events_cron.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
package timeline

// type CronScheduledEvent struct {
// ID int64
// Duration time.Duration
// CronScheduled
// }

// func (e *CronScheduledEvent) GetID() int64 { return e.ID }
// func (e *CronScheduledEvent) event() {}

// TODO: cron service needs to call this
// type CronScheduled struct {
// DeploymentKey model.DeploymentKey
// Verb schema.Ref
Expand Down
Loading

0 comments on commit 6d039d1

Please sign in to comment.