Skip to content

Commit

Permalink
feat: add new deployment events and update console (#415)
Browse files Browse the repository at this point in the history
Fixes #300 
<img width="1582" alt="Screenshot 2023-09-21 at 8 40 27 AM"
src="https://github.com/TBD54566975/ftl/assets/51647/c9ed2f4b-bd3b-4fc5-9e09-98fb92177952">
<img width="1582" alt="Screenshot 2023-09-21 at 8 41 53 AM"
src="https://github.com/TBD54566975/ftl/assets/51647/7cd36c64-f44b-457c-a30f-6b08374b0447">
<img width="1582" alt="Screenshot 2023-09-21 at 8 41 58 AM"
src="https://github.com/TBD54566975/ftl/assets/51647/6ec8c46f-bc72-432d-8a33-15304ac7ff7d">

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
wesbillman and github-actions[bot] authored Sep 21, 2023
1 parent fa1582d commit dd080ec
Show file tree
Hide file tree
Showing 30 changed files with 1,086 additions and 928 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ deploy/
.DS_Store
reflex.conf
/logs/
node_modules
/node_modules
*.tsbuildinfo
generated_ftl_module.go

Expand Down
155 changes: 67 additions & 88 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,87 +186,6 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
}
}

func callEventToCall(event *dal.CallEvent) *pbconsole.Call {
var requestName *string
if r, ok := event.RequestName.Get(); ok {
rstr := r.String()
requestName = &rstr
}
var sourceVerbRef *pschema.VerbRef
if sourceVerb, ok := event.SourceVerb.Get(); ok {
sourceVerbRef = sourceVerb.ToProto().(*pschema.VerbRef) //nolint:forcetypeassert
}
return &pbconsole.Call{
RequestName: requestName,
DeploymentName: event.DeploymentName.String(),
TimeStamp: timestamppb.New(event.Time),
SourceVerbRef: sourceVerbRef,
DestinationVerbRef: &pschema.VerbRef{
Module: event.DestVerb.Module,
Name: event.DestVerb.Name,
},
Duration: durationpb.New(event.Duration),
Request: string(event.Request),
Response: string(event.Response),
Error: event.Error.Ptr(),
}
}

func logEventToLogEntry(event *dal.LogEvent) *pbconsole.LogEntry {
var requestName *string
if r, ok := event.RequestName.Get(); ok {
rstr := r.String()
requestName = &rstr
}
return &pbconsole.LogEntry{
DeploymentName: event.DeploymentName.String(),
RequestName: requestName,
TimeStamp: timestamppb.New(event.Time),
LogLevel: event.Level,
Attributes: event.Attributes,
Message: event.Message,
Error: event.Error.Ptr(),
}
}

func deploymentEventToDeployment(event *dal.DeploymentEvent) *pbconsole.Deployment {
var eventType pbconsole.DeploymentEventType
switch event.Type {
case dal.DeploymentCreated:
eventType = pbconsole.DeploymentEventType_DEPLOYMENT_CREATED
case dal.DeploymentUpdated:
eventType = pbconsole.DeploymentEventType_DEPLOYMENT_UPDATED
case dal.DeploymentReplaced:
eventType = pbconsole.DeploymentEventType_DEPLOYMENT_REPLACED
default:
panic(errors.Errorf("unknown deployment event type %v", event.Type))
}

var replaced *string
if r, ok := event.ReplacedDeployment.Get(); ok {
rstr := r.String()
replaced = &rstr
}
return &pbconsole.Deployment{
Name: event.DeploymentName.String(),
Language: event.Language,
ModuleName: event.ModuleName,
MinReplicas: 0,
EventType: eventType,
Replaced: replaced,
}
}

func filterEvents[E dal.Event](events []dal.Event) []E {
var filtered []E
for _, event := range events {
if e, ok := event.(E); ok {
filtered = append(filtered, e)
}
}
return filtered
}

func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error) {
var query []dal.EventFilter

Expand Down Expand Up @@ -306,8 +225,10 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error)
eventTypes = append(eventTypes, dal.EventTypeCall)
case pbconsole.EventType_EVENT_TYPE_LOG:
eventTypes = append(eventTypes, dal.EventTypeLog)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT:
eventTypes = append(eventTypes, dal.EventTypeDeployment)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_CREATED:
eventTypes = append(eventTypes, dal.EventTypeDeploymentCreated)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED:
eventTypes = append(eventTypes, dal.EventTypeDeploymentUpdated)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown event type %v", eventType))
}
Expand Down Expand Up @@ -361,29 +282,87 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error)
func eventDALToProto(event dal.Event) *pbconsole.Event {
switch event := event.(type) {
case *dal.CallEvent:
var requestName *string
if r, ok := event.RequestName.Get(); ok {
rstr := r.String()
requestName = &rstr
}
var sourceVerbRef *pschema.VerbRef
if sourceVerb, ok := event.SourceVerb.Get(); ok {
sourceVerbRef = sourceVerb.ToProto().(*pschema.VerbRef) //nolint:forcetypeassert
}
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_Call{
Call: callEventToCall(event),
Call: &pbconsole.CallEvent{
RequestName: requestName,
DeploymentName: event.DeploymentName.String(),
TimeStamp: timestamppb.New(event.Time),
SourceVerbRef: sourceVerbRef,
DestinationVerbRef: &pschema.VerbRef{
Module: event.DestVerb.Module,
Name: event.DestVerb.Name,
},
Duration: durationpb.New(event.Duration),
Request: string(event.Request),
Response: string(event.Response),
Error: event.Error.Ptr(),
},
},
}

case *dal.LogEvent:
var requestName *string
if r, ok := event.RequestName.Get(); ok {
rstr := r.String()
requestName = &rstr
}
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_Log{
Log: logEventToLogEntry(event),
Log: &pbconsole.LogEvent{
DeploymentName: event.DeploymentName.String(),
RequestName: requestName,
TimeStamp: timestamppb.New(event.Time),
LogLevel: event.Level,
Attributes: event.Attributes,
Message: event.Message,
Error: event.Error.Ptr(),
},
},
}

case *dal.DeploymentEvent:
case *dal.DeploymentCreatedEvent:
var replaced *string
if r, ok := event.ReplacedDeployment.Get(); ok {
rstr := r.String()
replaced = &rstr
}
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_DeploymentCreated{
DeploymentCreated: &pbconsole.DeploymentCreatedEvent{
Name: event.DeploymentName.String(),
Language: event.Language,
ModuleName: event.ModuleName,
MinReplicas: int32(event.MinReplicas),
Replaced: replaced,
},
},
}
case *dal.DeploymentUpdatedEvent:
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_Deployment{
Deployment: deploymentEventToDeployment(event),
Entry: &pbconsole.Event_DeploymentUpdated{
DeploymentUpdated: &pbconsole.DeploymentUpdatedEvent{
Name: event.DeploymentName.String(),
MinReplicas: int32(event.MinReplicas),
PrevMinReplicas: int32(event.PrevMinReplicas),
},
},
}

Expand Down
50 changes: 25 additions & 25 deletions backend/controller/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,15 +480,6 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, schema *sch
}
}

err = tx.InsertDeploymentEvent(ctx, sql.InsertDeploymentEventParams{
DeploymentName: deploymentName.String(),
Type: string(DeploymentCreated),
Language: language,
ModuleName: schema.Name,
})
if err != nil {
return "", errors.Wrap(translatePGError(err), "failed to insert deployment event")
}
return deploymentName, nil
}

Expand Down Expand Up @@ -621,10 +612,33 @@ func (p *postgresClaim) Runner() Runner { return p.runner }

// SetDeploymentReplicas activates the given deployment.
func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentName, minReplicas int) error {
err := d.db.SetDeploymentDesiredReplicas(ctx, key, int32(minReplicas))
// Start the transaction
tx, err := d.db.Begin(ctx)
if err != nil {
return errors.WithStack(translatePGError(err))
}

defer tx.CommitOrRollback(ctx, &err)

deployment, err := d.db.GetDeployment(ctx, key)
if err != nil {
return errors.WithStack(translatePGError(err))
}

err = d.db.SetDeploymentDesiredReplicas(ctx, key, int32(minReplicas))
if err != nil {
return errors.WithStack(translatePGError(err))
}

err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{
DeploymentName: key.String(),
MinReplicas: int32(minReplicas),
PrevMinReplicas: deployment.MinReplicas,
})
if err != nil {
return errors.WithStack(translatePGError(err))
}

return nil
}

Expand All @@ -642,7 +656,6 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentName model.Dep
return errors.WithStack(translatePGError(err))
}

var updateType DeploymentEventType
var replacedDeployment types.Option[string]

// If there's an existing deployment, set its desired replicas to 0
Expand All @@ -655,7 +668,6 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentName model.Dep
if count == 1 {
return errors.Wrap(ErrConflict, "deployment already exists")
}
updateType = DeploymentReplaced
replacedDeployment = types.Some(oldDeployment.Name.String())
} else if !isNotFound(err) {
return errors.WithStack(translatePGError(err))
Expand All @@ -665,12 +677,10 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentName model.Dep
if err != nil {
return errors.WithStack(translatePGError(err))
}
updateType = DeploymentUpdated
}

err = tx.InsertDeploymentEvent(ctx, sql.InsertDeploymentEventParams{
err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{
DeploymentName: newDeploymentName.String(),
Type: string(updateType),
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
MinReplicas: int32(minReplicas),
Expand Down Expand Up @@ -971,16 +981,6 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
})))
}

func (d *DAL) InsertDeploymentEvent(ctx context.Context, deployment *DeploymentEvent) error {
return errors.WithStack(translatePGError(d.db.InsertDeploymentEvent(ctx, sql.InsertDeploymentEventParams{
DeploymentName: deployment.DeploymentName.String(),
Type: string(deployment.Type),
Language: deployment.Language,
ModuleName: deployment.ModuleName,
MinReplicas: int32(deployment.MinReplicas),
})))
}

func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) {
rows, err := d.db.GetActiveRunners(ctx, false)
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions backend/controller/internal/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,9 @@ func TestDAL(t *testing.T) {
assert.NoError(t, err)
})

expectedDeploymentEvent := &DeploymentEvent{
expectedDeploymentUpdatedEvent := &DeploymentUpdatedEvent{
DeploymentName: deploymentName,
Type: DeploymentEventType("created"),
Language: "go",
ModuleName: "test",
MinReplicas: 1,
}

t.Run("QueryEvents", func(t *testing.T) {
Expand All @@ -263,13 +261,13 @@ func TestDAL(t *testing.T) {
t.Run("NoFilters", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000)
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentEvent, callEvent, logEvent}, events)
assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events)
})

t.Run("ByDeployment", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000, FilterDeployments(deploymentName))
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentEvent, callEvent, logEvent}, events)
assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events)
})

t.Run("ByCall", func(t *testing.T) {
Expand Down
Loading

0 comments on commit dd080ec

Please sign in to comment.