From d903fe9aee8b992a3bcf6ef1904545c9a570e825 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 5 Dec 2024 17:33:49 +1100 Subject: [PATCH] fill out more details --- backend/timeline/events_async.go | 94 +++------- backend/timeline/events_call.go | 204 ++++------------------ backend/timeline/events_cron.go | 10 +- backend/timeline/events_deployment.go | 24 +-- backend/timeline/events_ingress.go | 172 +++++------------- backend/timeline/events_log.go | 52 +----- backend/timeline/events_pubsub_consume.go | 90 ++-------- backend/timeline/events_pubsub_publish.go | 82 ++------- backend/timeline/publish.go | 9 +- 9 files changed, 141 insertions(+), 596 deletions(-) diff --git a/backend/timeline/events_async.go b/backend/timeline/events_async.go index 5d8921017f..0222342c31 100644 --- a/backend/timeline/events_async.go +++ b/backend/timeline/events_async.go @@ -3,23 +3,15 @@ package timeline import ( "time" - "github.com/alecthomas/types/optional" - "google.golang.org/protobuf/types/known/timestamppb" - + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" + "github.com/alecthomas/types/optional" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) -// type AsyncExecuteEvent struct { -// ID int64 -// Duration time.Duration -// AsyncExecute -// } - -// func (e *AsyncExecuteEvent) GetID() int64 { return e.ID } -// func (e *AsyncExecuteEvent) event() {} - type AsyncExecuteEventType string const ( @@ -28,6 +20,20 @@ const ( AsyncExecuteEventTypePubSub AsyncExecuteEventType = "pubsub" ) +func asyncExecuteEventTypeToProto(eventType AsyncExecuteEventType) timelinepb.AsyncExecuteEventType { + switch eventType { + case AsyncExecuteEventTypeCron: + return timelinepb.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_CRON + case AsyncExecuteEventTypePubSub: + return timelinepb.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_PUBSUB + case AsyncExecuteEventTypeUnkown: + return timelinepb.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_UNSPECIFIED + + default: + panic("unknown async execute event type") + } +} + type AsyncExecute struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] @@ -40,64 +46,18 @@ type AsyncExecute struct { var _ Event = AsyncExecute{} func (AsyncExecute) clientEvent() {} -func (a AsyncExecute) ToReq() *timelinepb.CreateEventRequest { +func (a AsyncExecute) ToReq() (*timelinepb.CreateEventRequest, error) { return &timelinepb.CreateEventRequest{ Entry: &timelinepb.CreateEventRequest_AsyncExecute{ AsyncExecute: &timelinepb.AsyncExecuteEvent{ - DeploymentKey: a.DeploymentKey.String(), - RequestKey: a.RequestKey.Ptr(), - Timestamp: timestamppb.New(a.Time), - Error: a.Error.Ptr(), - // TODO: fill in - // VerbRef *v1.Ref `protobuf:"bytes,3,opt,name=verb_ref,json=verbRef,proto3" json:"verb_ref,omitempty"` - // Duration *durationpb.Duration `protobuf:"bytes,5,opt,name=duration,proto3" json:"duration,omitempty"` - // AsyncEventType AsyncExecuteEventType `protobuf:"varint,6,opt,name=async_event_type,json=asyncEventType,proto3,enum=xyz.block.ftl.timeline.v1.AsyncExecuteEventType" json:"async_event_type,omitempty"` + DeploymentKey: a.DeploymentKey.String(), + RequestKey: a.RequestKey.Ptr(), + Timestamp: timestamppb.New(a.Time), + Error: a.Error.Ptr(), + Duration: durationpb.New(time.Since(a.Time)), + VerbRef: (&a.Verb).ToProto().(*schemapb.Ref), //nolint:forceassert + AsyncEventType: asyncExecuteEventTypeToProto(a.EventType), }, }, - } + }, nil } - -// func (e *AsyncExecute) toEvent() (Event, error) { //nolint:unparam -// return &AsyncExecuteEvent{ -// AsyncExecute: *e, -// Duration: time.Since(e.Time), -// }, nil -// } - -// type eventAsyncExecuteJSON struct { -// DurationMS int64 `json:"duration_ms"` -// EventType AsyncExecuteEventType `json:"event_type"` -// Error optional.Option[string] `json:"error,omitempty"` -// } - -// func (s *Service) insertAsyncExecuteEvent(ctx context.Context, querier sql.Querier, event *AsyncExecuteEvent) error { -// asyncJSON := eventAsyncExecuteJSON{ -// DurationMS: event.Duration.Milliseconds(), -// EventType: event.EventType, -// Error: event.Error, -// } - -// data, err := json.Marshal(asyncJSON) -// if err != nil { -// return fmt.Errorf("failed to marshal async execute event: %w", err) -// } - -// var payload ftlencryption.EncryptedTimelineColumn -// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) -// if err != nil { -// return fmt.Errorf("failed to encrypt cron JSON: %w", err) -// } - -// err = libdal.TranslatePGError(querier.InsertTimelineAsyncExecuteEvent(ctx, sql.InsertTimelineAsyncExecuteEventParams{ -// DeploymentKey: event.DeploymentKey, -// RequestKey: event.RequestKey, -// TimeStamp: event.Time, -// Module: event.Verb.Module, -// Verb: event.Verb.Name, -// Payload: payload, -// })) -// if err != nil { -// return fmt.Errorf("failed to insert async execute event: %w", err) -// } -// return err -// } diff --git a/backend/timeline/events_call.go b/backend/timeline/events_call.go index 7052b62eee..726f217fb3 100644 --- a/backend/timeline/events_call.go +++ b/backend/timeline/events_call.go @@ -3,42 +3,17 @@ package timeline import ( "time" - "github.com/alecthomas/types/optional" - "github.com/alecthomas/types/result" - "google.golang.org/protobuf/types/known/timestamppb" - + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) -// type CallEvent struct { -// ID int64 -// DeploymentKey model.DeploymentKey -// RequestKey optional.Option[model.RequestKey] -// ParentRequestKey optional.Option[model.RequestKey] -// Time time.Time -// SourceVerb optional.Option[schema.Ref] -// DestVerb schema.Ref -// Duration time.Duration -// Request json.RawMessage -// Response json.RawMessage -// Error optional.Option[string] -// Stack optional.Option[string] -// } - -// func (e *CallEvent) GetID() int64 { return e.ID } -// func (e *CallEvent) event() {} - -// type eventCallJSON struct { -// DurationMS int64 `json:"duration_ms"` -// Request json.RawMessage `json:"request"` -// Response json.RawMessage `json:"response"` -// Error optional.Option[string] `json:"error,omitempty"` -// Stack optional.Option[string] `json:"stack,omitempty"` -// } - type Call struct { DeploymentKey model.DeploymentKey RequestKey model.RequestKey @@ -51,161 +26,42 @@ type Call struct { } func (Call) clientEvent() {} -func (c Call) ToReq() *timelinepb.CreateEventRequest { +func (c Call) ToReq() (*timelinepb.CreateEventRequest, error) { requestKey := c.RequestKey.String() var respError *string - var response string - _, err := c.Response.Result() + var responseBody []byte + var stack *string + resp, err := c.Response.Result() if err != nil { errStr := err.Error() respError = &errStr } else { - // TODO: fill in + responseBody = resp.GetBody() + if callError := resp.GetError(); callError != nil { + respError = optional.Some(callError.Message).Ptr() + stack = callError.Stack + } } + var sourceVerb *schemapb.Ref + if len(c.Callers) > 0 { + sourceVerb = c.Callers[0].ToProto().(*schemapb.Ref) //nolint:forceassert + } + return &timelinepb.CreateEventRequest{ Entry: &timelinepb.CreateEventRequest_Call{ Call: &timelinepb.CallEvent{ - RequestKey: &requestKey, - DeploymentKey: c.DeploymentKey.String(), - Timestamp: timestamppb.New(c.StartTime), - Response: response, - Error: respError, - // TODO: fill in - // SourceVerbRef *v1.Ref `protobuf:"bytes,11,opt,name=source_verb_ref,json=sourceVerbRef,proto3,oneof" json:"source_verb_ref,omitempty"` - // DestinationVerbRef *v1.Ref `protobuf:"bytes,12,opt,name=destination_verb_ref,json=destinationVerbRef,proto3" json:"destination_verb_ref,omitempty"` - // Duration *durationpb.Duration `protobuf:"bytes,6,opt,name=duration,proto3" json:"duration,omitempty"` - // Request string `protobuf:"bytes,7,opt,name=request,proto3" json:"request,omitempty"` - // Stack *string `protobuf:"bytes,10,opt,name=stack,proto3,oneof" json:"stack,omitempty"` + RequestKey: &requestKey, + DeploymentKey: c.DeploymentKey.String(), + Timestamp: timestamppb.New(c.StartTime), + Response: string(responseBody), + Error: respError, + SourceVerbRef: sourceVerb, + DestinationVerbRef: c.DestVerb.ToProto().(*schemapb.Ref), //nolint:forceassert + Duration: durationpb.New(time.Since(c.StartTime)), + Request: string(c.Request.GetBody()), + Stack: stack, }, }, - } + }, nil } - -// func (c *Call) toEvent() (Event, error) { return callToCallEvent(c), nil } //nolint:unparam - -// func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, callEvent *CallEvent) error { -// var sourceModule, sourceVerb optional.Option[string] -// if sr, ok := callEvent.SourceVerb.Get(); ok { -// sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name) -// } - -// var requestKey optional.Option[string] -// if rn, ok := callEvent.RequestKey.Get(); ok { -// requestKey = optional.Some(rn.String()) -// } - -// var parentRequestKey optional.Option[string] -// if pr, ok := callEvent.ParentRequestKey.Get(); ok { -// parentRequestKey = optional.Some(pr.String()) -// } - -// callJSON := eventCallJSON{ -// DurationMS: callEvent.Duration.Milliseconds(), -// Request: callEvent.Request, -// Response: callEvent.Response, -// Error: callEvent.Error, -// Stack: callEvent.Stack, -// } - -// data, err := json.Marshal(callJSON) -// if err != nil { -// return fmt.Errorf("failed to marshal call event: %w", err) -// } - -// var payload ftlencryption.EncryptedTimelineColumn -// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) -// if err != nil { -// return fmt.Errorf("failed to encrypt call event: %w", err) -// } - -// err = libdal.TranslatePGError(querier.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ -// DeploymentKey: callEvent.DeploymentKey, -// RequestKey: requestKey, -// ParentRequestKey: parentRequestKey, -// TimeStamp: callEvent.Time, -// SourceModule: sourceModule, -// SourceVerb: sourceVerb, -// DestModule: callEvent.DestVerb.Module, -// DestVerb: callEvent.DestVerb.Name, -// Payload: payload, -// })) -// if err != nil { -// return fmt.Errorf("failed to insert call event: %w", err) -// } -// return nil -// } - -// func callToCallEvent(call *Call) *CallEvent { -// var sourceVerb optional.Option[schema.Ref] -// if len(call.Callers) > 0 { -// sourceVerb = optional.Some(*call.Callers[0]) -// } - -// var errorStr optional.Option[string] -// var stack optional.Option[string] -// var responseBody []byte - -// switch response := call.Response.(type) { -// case either.Left[*ftlv1.CallResponse, error]: -// resp := response.Get() -// responseBody = resp.GetBody() -// if callError := resp.GetError(); callError != nil { -// errorStr = optional.Some(callError.Message) -// stack = optional.Ptr(callError.Stack) -// } -// case either.Right[*ftlv1.CallResponse, error]: -// callError := response.Get() -// errorStr = optional.Some(callError.Error()) -// } - -// return &CallEvent{ -// Time: call.StartTime, -// DeploymentKey: call.DeploymentKey, -// RequestKey: optional.Some(call.RequestKey), -// ParentRequestKey: call.ParentRequestKey, -// Duration: time.Since(call.StartTime), -// SourceVerb: sourceVerb, -// DestVerb: *call.DestVerb, -// Request: call.Request.GetBody(), -// Response: responseBody, -// Error: errorStr, -// Stack: stack, -// } -// } - -// func CallEventToCallForTesting(event *CallEvent) *Call { -// var response either.Either[*ftlv1.CallResponse, error] -// if eventErr, ok := event.Error.Get(); ok { -// response = either.RightOf[*ftlv1.CallResponse](errors.New(eventErr)) -// } else { -// response = either.LeftOf[error](&ftlv1.CallResponse{ -// Response: &ftlv1.CallResponse_Body{ -// Body: event.Response, -// }, -// }) -// } - -// var requestKey model.RequestKey -// if key, ok := event.RequestKey.Get(); ok { -// requestKey = key -// } else { -// requestKey = model.RequestKey{} -// } - -// callers := []*schema.Ref{} -// if ref, ok := event.SourceVerb.Get(); ok { -// callers = []*schema.Ref{&ref} -// } - -// return &Call{ -// DeploymentKey: event.DeploymentKey, -// RequestKey: requestKey, -// ParentRequestKey: event.ParentRequestKey, -// StartTime: event.Time, -// DestVerb: &event.DestVerb, -// Callers: callers, -// Request: &ftlv1.CallRequest{Body: event.Request}, -// Response: response, -// } -// } diff --git a/backend/timeline/events_cron.go b/backend/timeline/events_cron.go index f1f9948478..f5d5d89f12 100644 --- a/backend/timeline/events_cron.go +++ b/backend/timeline/events_cron.go @@ -1,14 +1,6 @@ package timeline -// type CronScheduledEvent struct { -// ID int64 -// Duration time.Duration -// CronScheduled -// } - -// func (e *CronScheduledEvent) GetID() int64 { return e.ID } -// func (e *CronScheduledEvent) event() {} - +// TODO: cron service needs to call this // type CronScheduled struct { // DeploymentKey model.DeploymentKey // Verb schema.Ref diff --git a/backend/timeline/events_deployment.go b/backend/timeline/events_deployment.go index 69eb8f75cf..90a7f44222 100644 --- a/backend/timeline/events_deployment.go +++ b/backend/timeline/events_deployment.go @@ -21,7 +21,7 @@ type DeploymentCreated struct { var _ Event = DeploymentCreated{} func (DeploymentCreated) clientEvent() {} -func (d DeploymentCreated) ToReq() *timelinepb.CreateEventRequest { +func (d DeploymentCreated) ToReq() (*timelinepb.CreateEventRequest, error) { var replaced *string if r, ok := d.ReplacedDeployment.Get(); ok { repl := r.String() @@ -37,17 +37,9 @@ func (d DeploymentCreated) ToReq() *timelinepb.CreateEventRequest { Replaced: replaced, }, }, - } + }, nil } -// func (e *DeploymentCreatedEvent) GetID() int64 { return e.ID } -// func (e *DeploymentCreatedEvent) event() {} - -// type eventDeploymentUpdatedJSON struct { -// MinReplicas int `json:"min_replicas"` -// PrevMinReplicas int `json:"prev_min_replicas"` -// } - type DeploymentUpdated struct { DeploymentKey model.DeploymentKey Time time.Time @@ -58,7 +50,7 @@ type DeploymentUpdated struct { var _ Event = DeploymentUpdated{} func (DeploymentUpdated) clientEvent() {} -func (d DeploymentUpdated) ToReq() *timelinepb.CreateEventRequest { +func (d DeploymentUpdated) ToReq() (*timelinepb.CreateEventRequest, error) { return &timelinepb.CreateEventRequest{ Entry: &timelinepb.CreateEventRequest_DeploymentUpdated{ DeploymentUpdated: &timelinepb.DeploymentUpdatedEvent{ @@ -67,13 +59,5 @@ func (d DeploymentUpdated) ToReq() *timelinepb.CreateEventRequest { PrevMinReplicas: int32(d.PrevMinReplicas), }, }, - } + }, nil } - -// func (e *DeploymentUpdatedEvent) GetID() int64 { return e.ID } -// func (e *DeploymentUpdatedEvent) event() {} - -// type eventDeploymentCreatedJSON struct { -// MinReplicas int `json:"min_replicas"` -// ReplacedDeployment optional.Option[model.DeploymentKey] `json:"replaced,omitempty"` -// } diff --git a/backend/timeline/events_ingress.go b/backend/timeline/events_ingress.go index d0ac9bc62a..9432765aa4 100644 --- a/backend/timeline/events_ingress.go +++ b/backend/timeline/events_ingress.go @@ -1,50 +1,20 @@ package timeline import ( + "encoding/json" + "fmt" "net/http" "time" - "github.com/alecthomas/types/optional" - "google.golang.org/protobuf/types/known/timestamppb" - + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" + "github.com/alecthomas/types/optional" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) -// type IngressEvent struct { -// ID int64 -// DeploymentKey model.DeploymentKey -// RequestKey optional.Option[model.RequestKey] -// Verb schema.Ref -// Method string -// Path string - -// StatusCode int -// Time time.Time -// Duration time.Duration -// Request json.RawMessage -// RequestHeader json.RawMessage -// Response json.RawMessage -// ResponseHeader json.RawMessage -// Error optional.Option[string] -// } - -// func (e *IngressEvent) GetID() int64 { return e.ID } -// func (e *IngressEvent) event() {} - -// type eventIngressJSON struct { -// DurationMS int64 `json:"duration_ms"` -// Method string `json:"method"` -// Path string `json:"path"` -// StatusCode int `json:"status_code"` -// Request json.RawMessage `json:"request"` -// RequestHeader json.RawMessage `json:"req_header"` -// Response json.RawMessage `json:"response"` -// ResponseHeader json.RawMessage `json:"resp_header"` -// Error optional.Option[string] `json:"error,omitempty"` -// } - type Ingress struct { DeploymentKey model.DeploymentKey RequestKey model.RequestKey @@ -63,104 +33,46 @@ type Ingress struct { var _ Event = Ingress{} func (Ingress) clientEvent() {} -func (i Ingress) ToReq() *timelinepb.CreateEventRequest { +func (i Ingress) ToReq() (*timelinepb.CreateEventRequest, error) { requestKey := i.RequestKey.String() + + requestBody := i.RequestBody + if len(requestBody) == 0 { + requestBody = []byte("{}") + } + + responseBody := i.ResponseBody + if len(responseBody) == 0 { + responseBody = []byte("{}") + } + + reqHeaderBytes, err := json.Marshal(i.RequestHeaders) + if err != nil { + return nil, fmt.Errorf("failed to marshal request header: %w", err) + } + + respHeaderBytes, err := json.Marshal(i.ResponseHeaders) + if err != nil { + return nil, fmt.Errorf("failed to marshal response header: %w", err) + } + return &timelinepb.CreateEventRequest{ Entry: &timelinepb.CreateEventRequest_Ingress{ Ingress: &timelinepb.IngressEvent{ - DeploymentKey: i.DeploymentKey.String(), - RequestKey: &requestKey, - Timestamp: timestamppb.New(i.StartTime), - // TODO: fill in - // VerbRef *v1.Ref `protobuf:"bytes,3,opt,name=verb_ref,json=verbRef,proto3" json:"verb_ref,omitempty"` - // Method string `protobuf:"bytes,4,opt,name=method,proto3" json:"method,omitempty"` - // Path string `protobuf:"bytes,5,opt,name=path,proto3" json:"path,omitempty"` - // StatusCode int32 `protobuf:"varint,7,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"` - // Duration *durationpb.Duration `protobuf:"bytes,9,opt,name=duration,proto3" json:"duration,omitempty"` - // Request string `protobuf:"bytes,10,opt,name=request,proto3" json:"request,omitempty"` - // RequestHeader string `protobuf:"bytes,11,opt,name=request_header,json=requestHeader,proto3" json:"request_header,omitempty"` - // Response string `protobuf:"bytes,12,opt,name=response,proto3" json:"response,omitempty"` - // ResponseHeader string `protobuf:"bytes,13,opt,name=response_header,json=responseHeader,proto3" json:"response_header,omitempty"` - Error: i.Error.Ptr(), + DeploymentKey: i.DeploymentKey.String(), + RequestKey: &requestKey, + Timestamp: timestamppb.New(i.StartTime), + VerbRef: i.Verb.ToProto().(*schemapb.Ref), //nolint:forceassert + Method: i.RequestMethod, + Path: i.RequestPath, + StatusCode: int32(i.ResponseStatus), + Duration: durationpb.New(time.Since(i.StartTime)), + Request: string(requestBody), + RequestHeader: string(reqHeaderBytes), + Response: string(responseBody), + ResponseHeader: string(respHeaderBytes), + Error: i.Error.Ptr(), }, }, - } + }, nil } - -// func (ingress *Ingress) toEvent() (Event, error) { -// requestBody := ingress.RequestBody -// if len(requestBody) == 0 { -// requestBody = []byte("{}") -// } - -// responseBody := ingress.ResponseBody -// if len(responseBody) == 0 { -// responseBody = []byte("{}") -// } - -// reqHeaderBytes, err := json.Marshal(ingress.RequestHeaders) -// if err != nil { -// return nil, fmt.Errorf("failed to marshal request header: %w", err) -// } - -// respHeaderBytes, err := json.Marshal(ingress.ResponseHeaders) -// if err != nil { -// return nil, fmt.Errorf("failed to marshal response header: %w", err) -// } -// return &IngressEvent{ -// DeploymentKey: ingress.DeploymentKey, -// RequestKey: optional.Some(ingress.RequestKey), -// Verb: *ingress.Verb, -// Method: ingress.RequestMethod, -// Path: ingress.RequestPath, -// StatusCode: ingress.ResponseStatus, -// Time: ingress.StartTime, -// Duration: time.Since(ingress.StartTime), -// Request: requestBody, -// RequestHeader: reqHeaderBytes, -// Response: responseBody, -// ResponseHeader: respHeaderBytes, -// Error: ingress.Error, -// }, nil -// } - -// func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *IngressEvent) error { -// ingressJSON := eventIngressJSON{ -// DurationMS: ingress.Duration.Milliseconds(), -// Method: ingress.Method, -// Path: ingress.Path, -// StatusCode: ingress.StatusCode, -// Request: ingress.Request, -// RequestHeader: ingress.RequestHeader, -// Response: ingress.Response, -// ResponseHeader: ingress.ResponseHeader, -// Error: ingress.Error, -// } - -// data, err := json.Marshal(ingressJSON) -// if err != nil { -// return fmt.Errorf("failed to marshal ingress JSON: %w", err) -// } - -// var payload ftlencryption.EncryptedTimelineColumn -// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) -// if err != nil { -// return fmt.Errorf("failed to encrypt ingress payload: %w", err) -// } - -// log.FromContext(ctx).Debugf("Inserting ingress event for %s %s", ingress.RequestKey, ingress.Path) - -// err = libdal.TranslatePGError(querier.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{ -// DeploymentKey: ingress.DeploymentKey, -// RequestKey: optional.Some(ingress.RequestKey.String()), -// TimeStamp: ingress.Time, -// Module: ingress.Verb.Module, -// Verb: ingress.Verb.Name, -// IngressType: "http", -// Payload: payload, -// })) -// if err != nil { -// return fmt.Errorf("failed to insert ingress event: %w", err) -// } -// return nil -// } diff --git a/backend/timeline/events_log.go b/backend/timeline/events_log.go index 09c915ab0a..6ab67af8d6 100644 --- a/backend/timeline/events_log.go +++ b/backend/timeline/events_log.go @@ -23,7 +23,7 @@ type Log struct { var _ Event = Log{} func (Log) clientEvent() {} -func (l Log) ToReq() *timelinepb.CreateEventRequest { +func (l Log) ToReq() (*timelinepb.CreateEventRequest, error) { var requestKey *string if r, ok := l.RequestKey.Get(); ok { key := r.String() @@ -41,53 +41,5 @@ func (l Log) ToReq() *timelinepb.CreateEventRequest { Error: l.Error.Ptr(), }, }, - } + }, nil } - -// func (l *Log) toEvent() (Event, error) { return &LogEvent{Log: *l}, nil } //nolint:unparam - -// type LogEvent struct { -// ID int64 -// Log -// } - -// func (e *LogEvent) GetID() int64 { return e.ID } -// func (e *LogEvent) event() {} - -// type eventLogJSON struct { -// Message string `json:"message"` -// Attributes map[string]string `json:"attributes"` -// Error optional.Option[string] `json:"error,omitempty"` -// } - -// func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *LogEvent) error { -// var requestKey optional.Option[string] -// if name, ok := log.RequestKey.Get(); ok { -// requestKey = optional.Some(name.String()) -// } - -// logJSON := eventLogJSON{ -// Message: log.Message, -// Attributes: log.Attributes, -// Error: log.Error, -// } - -// data, err := json.Marshal(logJSON) -// if err != nil { -// return fmt.Errorf("failed to marshal log event: %w", err) -// } - -// var encryptedPayload ftlencryption.EncryptedTimelineColumn -// err = s.encryption.EncryptJSON(json.RawMessage(data), &encryptedPayload) -// if err != nil { -// return fmt.Errorf("failed to encrypt log payload: %w", err) -// } - -// return libdal.TranslatePGError(querier.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ -// DeploymentKey: log.DeploymentKey, -// RequestKey: requestKey, -// TimeStamp: log.Time, -// Level: log.Level, -// Payload: encryptedPayload, -// })) -// } diff --git a/backend/timeline/events_pubsub_consume.go b/backend/timeline/events_pubsub_consume.go index aa7de3c5da..91149c2a26 100644 --- a/backend/timeline/events_pubsub_consume.go +++ b/backend/timeline/events_pubsub_consume.go @@ -9,17 +9,9 @@ import ( timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" + "google.golang.org/protobuf/types/known/durationpb" ) -// type PubSubConsumeEvent struct { -// ID int64 -// Duration time.Duration -// PubSubConsume -// } - -// func (e *PubSubConsumeEvent) GetID() int64 { return e.ID } -// func (e *PubSubConsumeEvent) event() {} - type PubSubConsume struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] @@ -32,74 +24,24 @@ type PubSubConsume struct { var _ Event = PubSubConsume{} func (PubSubConsume) clientEvent() {} -func (p PubSubConsume) ToReq() *timelinepb.CreateEventRequest { +func (p PubSubConsume) ToReq() (*timelinepb.CreateEventRequest, error) { + var destModule, destVerb *string + if ref, ok := p.DestVerb.Get(); ok { + destModule = &ref.Module + destVerb = &ref.Name + } return &timelinepb.CreateEventRequest{ Entry: &timelinepb.CreateEventRequest_PubsubConsume{ PubsubConsume: &timelinepb.PubSubConsumeEvent{ - DeploymentKey: p.DeploymentKey.String(), - RequestKey: p.RequestKey.Ptr(), - Timestamp: timestamppb.New(p.Time), - Topic: p.Topic, - Error: p.Error.Ptr(), - - // TODO: fill in - // DestVerbModule *string `protobuf:"bytes,3,opt,name=dest_verb_module,json=destVerbModule,proto3,oneof" json:"dest_verb_module,omitempty"` - // DestVerbName *string `protobuf:"bytes,4,opt,name=dest_verb_name,json=destVerbName,proto3,oneof" json:"dest_verb_name,omitempty"` - // Duration *durationpb.Duration `protobuf:"bytes,6,opt,name=duration,proto3" json:"duration,omitempty"` + DeploymentKey: p.DeploymentKey.String(), + RequestKey: p.RequestKey.Ptr(), + Timestamp: timestamppb.New(p.Time), + Topic: p.Topic, + Error: p.Error.Ptr(), + DestVerbModule: destModule, + DestVerbName: destVerb, + Duration: durationpb.New(time.Since(p.Time)), }, }, - } + }, nil } - -// func (e *PubSubConsume) toEvent() (Event, error) { //nolint:unparam -// return &PubSubConsumeEvent{ -// PubSubConsume: *e, -// Duration: time.Since(e.Time), -// }, nil -// } - -// type eventPubSubConsumeJSON struct { -// DurationMS int64 `json:"duration_ms"` -// Topic string `json:"topic"` -// Error optional.Option[string] `json:"error,omitempty"` -// } - -// func (s *Service) insertPubSubConsumeEvent(ctx context.Context, querier sql.Querier, event *PubSubConsumeEvent) error { -// pubsubJSON := eventPubSubConsumeJSON{ -// DurationMS: event.Duration.Milliseconds(), -// Topic: event.Topic, -// Error: event.Error, -// } - -// data, err := json.Marshal(pubsubJSON) -// if err != nil { -// return fmt.Errorf("failed to marshal pubsub event: %w", err) -// } - -// var payload ftlencryption.EncryptedTimelineColumn -// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) -// if err != nil { -// return fmt.Errorf("failed to encrypt cron JSON: %w", err) -// } - -// destModule := optional.None[string]() -// destVerb := optional.None[string]() -// if dv, ok := event.DestVerb.Get(); ok { -// destModule = optional.Some(dv.Module) -// destVerb = optional.Some(dv.Name) -// } - -// err = libdal.TranslatePGError(querier.InsertTimelinePubsubConsumeEvent(ctx, sql.InsertTimelinePubsubConsumeEventParams{ -// DeploymentKey: event.DeploymentKey, -// RequestKey: event.RequestKey, -// TimeStamp: event.Time, -// DestModule: destModule, -// DestVerb: destVerb, -// Topic: event.Topic, -// Payload: payload, -// })) -// if err != nil { -// return fmt.Errorf("failed to insert pubsub consume event: %w", err) -// } -// return err -// } diff --git a/backend/timeline/events_pubsub_publish.go b/backend/timeline/events_pubsub_publish.go index dcc0d8d964..82190e2c43 100644 --- a/backend/timeline/events_pubsub_publish.go +++ b/backend/timeline/events_pubsub_publish.go @@ -8,97 +8,39 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" deployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" ) -// type PubSubPublishEvent struct { -// ID int64 -// Duration time.Duration -// Request json.RawMessage -// PubSubPublish -// } - -// func (e *PubSubPublishEvent) GetID() int64 { return e.ID } -// func (e *PubSubPublishEvent) event() {} - type PubSubPublish struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] Time time.Time SourceVerb schema.Ref Topic string - Request *deployment.PublishEventRequest - Error optional.Option[string] + // Should this just be request body? + Request *deployment.PublishEventRequest + Error optional.Option[string] } var _ Event = PubSubPublish{} func (PubSubPublish) clientEvent() {} -func (p PubSubPublish) ToReq() *timelinepb.CreateEventRequest { +func (p PubSubPublish) ToReq() (*timelinepb.CreateEventRequest, error) { return &timelinepb.CreateEventRequest{ Entry: &timelinepb.CreateEventRequest_PubsubPublish{ PubsubPublish: &timelinepb.PubSubPublishEvent{ DeploymentKey: p.DeploymentKey.String(), RequestKey: p.RequestKey.Ptr(), - // TODO: verbRef - // VerbRef: *v1.Ref `protobuf:"bytes,3,opt,name=verb_ref,json=verbRef,proto3" json:"verb_ref,omitempty"` - Timestamp: timestamppb.New(p.Time), - Duration: durationpb.New(time.Since(p.Time)), - Topic: p.Topic, - // TODO: request - // Request: string `protobuf:"bytes,7,opt,name=request,proto3" json:"request,omitempty"` - Error: p.Error.Ptr(), + VerbRef: (&p.SourceVerb).ToProto().(*schemapb.Ref), //nolint:forceassert + Timestamp: timestamppb.New(p.Time), + Duration: durationpb.New(time.Since(p.Time)), + Topic: p.Topic, + Request: string(p.Request.Body), + Error: p.Error.Ptr(), }, }, - } + }, nil } - -// func (e *PubSubPublish) toEvent() (Event, error) { //nolint:unparam -// return &PubSubPublishEvent{ -// PubSubPublish: *e, -// Duration: time.Since(e.Time), -// }, nil -// } - -// type eventPubSubPublishJSON struct { -// DurationMS int64 `json:"duration_ms"` -// Topic string `json:"topic"` -// Request json.RawMessage `json:"request"` -// Error optional.Option[string] `json:"error,omitempty"` -// } - -// func (s *Service) insertPubSubPublishEvent(ctx context.Context, querier sql.Querier, event *PubSubPublishEvent) error { -// pubsubJSON := eventPubSubPublishJSON{ -// DurationMS: event.Duration.Milliseconds(), -// Topic: event.Topic, -// Request: event.Request, -// Error: event.Error, -// } - -// data, err := json.Marshal(pubsubJSON) -// if err != nil { -// return fmt.Errorf("failed to marshal pubsub event: %w", err) -// } - -// var payload ftlencryption.EncryptedTimelineColumn -// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) -// if err != nil { -// return fmt.Errorf("failed to encrypt cron JSON: %w", err) -// } - -// err = libdal.TranslatePGError(querier.InsertTimelinePubsubPublishEvent(ctx, sql.InsertTimelinePubsubPublishEventParams{ -// DeploymentKey: event.DeploymentKey, -// RequestKey: event.RequestKey, -// TimeStamp: event.Time, -// SourceModule: event.SourceVerb.Module, -// SourceVerb: event.SourceVerb.Name, -// Topic: event.Topic, -// Payload: payload, -// })) -// if err != nil { -// return fmt.Errorf("failed to insert pubsub publish event: %w", err) -// } -// return err -// } diff --git a/backend/timeline/publish.go b/backend/timeline/publish.go index 4b13118ef0..5c20fc7298 100644 --- a/backend/timeline/publish.go +++ b/backend/timeline/publish.go @@ -13,13 +13,18 @@ import ( //go:sumtype type Event interface { - ToReq() *timelinepb.CreateEventRequest + ToReq() (*timelinepb.CreateEventRequest, error) clientEvent() } func Publish(ctx context.Context, event Event) { client := rpc.ClientFromContext[timelinev1connect.TimelineServiceClient](ctx) - _, err := client.CreateEvent(ctx, connect.NewRequest(event.ToReq())) + req, err := event.ToReq() + if err != nil { + log.FromContext(ctx).Warnf("failed to create request to publish %T event: %v", event, err) + return + } + _, err = client.CreateEvent(ctx, connect.NewRequest(req)) if err != nil { log.FromContext(ctx).Warnf("failed to publish %T event: %v", event, err) }