Skip to content

Commit

Permalink
feat: add StreamModules to ConsoleService (#2931)
Browse files Browse the repository at this point in the history
Part 1 of #2805

Next:
1. Client side integration for this PR's version of StreamModules, which
behaves in a way that's very similar to the existing `get` endpoint
2. Address the TODO in StreamModules: detect + push deployment updates
to the stream
3. Handle streaming updates on the client side

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
deniseli and github-actions[bot] authored Oct 3, 2024
1 parent c09fa65 commit dcd3dc7
Show file tree
Hide file tree
Showing 8 changed files with 1,057 additions and 445 deletions.
260 changes: 220 additions & 40 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,55 +102,20 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
for _, decl := range deployment.Schema.Decls {
switch decl := decl.(type) {
case *schema.Verb:
//nolint:forcetypeassert
v := decl.ToProto().(*schemapb.Verb)
verbSchema := schema.VerbFromProto(v)
var jsonRequestSchema string
if verbSchema.Request != nil {
if requestData, ok := verbSchema.Request.(*schema.Ref); ok {
jsonSchema, err := schema.RequestResponseToJSONSchema(sch, *requestData)
if err != nil {
return nil, err
}
jsonData, err := json.MarshalIndent(jsonSchema, "", " ")
if err != nil {
return nil, err
}
jsonRequestSchema = string(jsonData)
}
}

schemaString, err := verbSchemaString(sch, decl)
verb, err := verbFromDecl(decl, sch)
if err != nil {
return nil, err
}
verbs = append(verbs, &pbconsole.Verb{
Verb: v,
Schema: schemaString,
JsonRequestSchema: jsonRequestSchema,
})
verbs = append(verbs, verb)

case *schema.Data:
//nolint:forcetypeassert
d := decl.ToProto().(*schemapb.Data)
data = append(data, &pbconsole.Data{
Data: d,
Schema: schema.DataFromProto(d).String(),
})
data = append(data, dataFromDecl(decl))

case *schema.Secret:
//nolint:forcetypeassert
s := decl.ToProto().(*schemapb.Secret)
secrets = append(secrets, &pbconsole.Secret{
Secret: s,
})
secrets = append(secrets, secretFromDecl(decl))

case *schema.Config:
//nolint:forcetypeassert
c := decl.ToProto().(*schemapb.Config)
configs = append(configs, &pbconsole.Config{
Config: c,
})
configs = append(configs, configFromDecl(decl))

case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.FSM, *schema.Topic, *schema.Subscription:
}
Expand Down Expand Up @@ -188,6 +153,221 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}), nil
}

func moduleFromDeployment(deployment dalmodel.Deployment, sch *schema.Schema) (*pbconsole.Module, error) {
module, err := moduleFromDecls(deployment.Schema.Decls, sch)
if err != nil {
return nil, err
}

module.Name = deployment.Module
module.DeploymentKey = deployment.Key.String()
module.Language = deployment.Language
module.Schema = deployment.Schema.String()

return module, nil
}

func moduleFromDecls(decls []schema.Decl, sch *schema.Schema) (*pbconsole.Module, error) {
var configs []*pbconsole.Config
var data []*pbconsole.Data
var databases []*pbconsole.Database
var enums []*pbconsole.Enum
var fsms []*pbconsole.FSM
var topics []*pbconsole.Topic
var typealiases []*pbconsole.TypeAlias
var secrets []*pbconsole.Secret
var subscriptions []*pbconsole.Subscription
var verbs []*pbconsole.Verb

for _, decl := range decls {
switch decl := decl.(type) {
case *schema.Config:
configs = append(configs, configFromDecl(decl))

case *schema.Data:
data = append(data, dataFromDecl(decl))

case *schema.Database:
databases = append(databases, databaseFromDecl(decl))

case *schema.Enum:
enums = append(enums, enumFromDecl(decl))

case *schema.FSM:
fsms = append(fsms, fsmFromDecl(decl))

case *schema.Topic:
topics = append(topics, topicFromDecl(decl))

case *schema.Secret:
secrets = append(secrets, secretFromDecl(decl))

case *schema.Subscription:
subscriptions = append(subscriptions, subscriptionFromDecl(decl))

case *schema.TypeAlias:
typealiases = append(typealiases, typealiasFromDecl(decl))

case *schema.Verb:
verb, err := verbFromDecl(decl, sch)
if err != nil {
return nil, err
}
verbs = append(verbs, verb)
}
}

return &pbconsole.Module{
Configs: configs,
Data: data,
Databases: databases,
Enums: enums,
Fsms: fsms,
Topics: topics,
Typealiases: typealiases,
Secrets: secrets,
Subscriptions: subscriptions,
Verbs: verbs,
}, nil
}

func configFromDecl(decl *schema.Config) *pbconsole.Config {
return &pbconsole.Config{
//nolint:forcetypeassert
Config: decl.ToProto().(*schemapb.Config),
}
}

func dataFromDecl(decl *schema.Data) *pbconsole.Data {
//nolint:forcetypeassert
d := decl.ToProto().(*schemapb.Data)
return &pbconsole.Data{
Data: d,
Schema: schema.DataFromProto(d).String(),
}
}

func databaseFromDecl(decl *schema.Database) *pbconsole.Database {
return &pbconsole.Database{
//nolint:forcetypeassert
Database: decl.ToProto().(*schemapb.Database),
}
}

func enumFromDecl(decl *schema.Enum) *pbconsole.Enum {
return &pbconsole.Enum{
//nolint:forcetypeassert
Enum: decl.ToProto().(*schemapb.Enum),
}
}

func fsmFromDecl(decl *schema.FSM) *pbconsole.FSM {
return &pbconsole.FSM{
//nolint:forcetypeassert
Fsm: decl.ToProto().(*schemapb.FSM),
}
}

func topicFromDecl(decl *schema.Topic) *pbconsole.Topic {
return &pbconsole.Topic{
//nolint:forcetypeassert
Topic: decl.ToProto().(*schemapb.Topic),
}
}

func typealiasFromDecl(decl *schema.TypeAlias) *pbconsole.TypeAlias {
return &pbconsole.TypeAlias{
//nolint:forcetypeassert
Typealias: decl.ToProto().(*schemapb.TypeAlias),
}
}

func secretFromDecl(decl *schema.Secret) *pbconsole.Secret {
return &pbconsole.Secret{
//nolint:forcetypeassert
Secret: decl.ToProto().(*schemapb.Secret),
}
}

func subscriptionFromDecl(decl *schema.Subscription) *pbconsole.Subscription {
return &pbconsole.Subscription{
//nolint:forcetypeassert
Subscription: decl.ToProto().(*schemapb.Subscription),
}
}

func verbFromDecl(decl *schema.Verb, sch *schema.Schema) (*pbconsole.Verb, error) {
//nolint:forcetypeassert
v := decl.ToProto().(*schemapb.Verb)
verbSchema := schema.VerbFromProto(v)
var jsonRequestSchema string
if verbSchema.Request != nil {
if requestData, ok := verbSchema.Request.(*schema.Ref); ok {
jsonSchema, err := schema.RequestResponseToJSONSchema(sch, *requestData)
if err != nil {
return nil, fmt.Errorf("failed to retrieve JSON schema: %w", err)
}
jsonData, err := json.MarshalIndent(jsonSchema, "", " ")
if err != nil {
return nil, fmt.Errorf("failed to indent JSON schema: %w", err)
}
jsonRequestSchema = string(jsonData)
}
}

schemaString, err := verbSchemaString(sch, decl)
if err != nil {
return nil, err
}
return &pbconsole.Verb{
Verb: v,
Schema: schemaString,
JsonRequestSchema: jsonRequestSchema,
}, nil
}

func (c *ConsoleService) StreamModules(ctx context.Context, req *connect.Request[pbconsole.StreamModulesRequest], stream *connect.ServerStream[pbconsole.StreamModulesResponse]) error {
deployments, err := c.dal.GetDeploymentsWithMinReplicas(ctx)
if err != nil {
return fmt.Errorf("failed to get deployments: %w", err)
}
sch := &schema.Schema{
Modules: slices.Map(deployments, func(d dalmodel.Deployment) *schema.Module {
return d.Schema
}),
}
builtin := schema.Builtins()
sch.Modules = append(sch.Modules, builtin)

var modules []*pbconsole.Module
for _, deployment := range deployments {
module, err := moduleFromDeployment(deployment, sch)
if err != nil {
return err
}
modules = append(modules, module)
}

builtinModule, err := moduleFromDecls(builtin.Decls, sch)
if err != nil {
return err
}
builtinModule.Name = builtin.Name
builtinModule.Language = "go"
builtinModule.Schema = builtin.String()
modules = append(modules, builtinModule)

err = stream.Send(&pbconsole.StreamModulesResponse{
Modules: modules,
})
if err != nil {
return fmt.Errorf("failed to send StreamModulesResponse to stream: %w", err)
}

// TODO: handle deployment updates
return nil
}

func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbconsole.EventsQuery]) (*connect.Response[pbconsole.GetEventsResponse], error) {
query, err := eventsQueryProtoToDAL(req.Msg)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions backend/controller/console/console_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,29 @@ func TestConsoleGetModules(t *testing.T) {
}),
)
}

// StreamModules calls console service GetModules and returns the response.
//
// This action is defined here vs. actions.go because it is only used in this test file.
func StreamModules(onResponse func(t testing.TB, resp *connect.ServerStreamForClient[pbconsole.StreamModulesResponse])) in.Action {
return func(t testing.TB, ic in.TestContext) {
in.Infof("StreamModules")
modules, err := ic.Console.StreamModules(ic.Context, &connect.Request[pbconsole.StreamModulesRequest]{})
assert.NoError(t, err)
onResponse(t, modules)
}
}

func TestConsoleStreamModules(t *testing.T) {
in.Run(t,
in.CopyModule("console"),
in.Deploy("console"),
StreamModules(func(t testing.TB, stream *connect.ServerStreamForClient[pbconsole.StreamModulesResponse]) {
for stream.Receive() {
assert.Equal(t, 2, len(stream.Msg().Modules))
assert.Equal(t, "console", stream.Msg().Modules[0].Name)
assert.Equal(t, "builtin", stream.Msg().Modules[1].Name)
}
}),
)
}
Loading

0 comments on commit dcd3dc7

Please sign in to comment.