Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add StreamModules to ConsoleService #2931

Merged
merged 9 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 220 additions & 40 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,55 +102,20 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
for _, decl := range deployment.Schema.Decls {
switch decl := decl.(type) {
case *schema.Verb:
//nolint:forcetypeassert
v := decl.ToProto().(*schemapb.Verb)
verbSchema := schema.VerbFromProto(v)
var jsonRequestSchema string
if verbSchema.Request != nil {
if requestData, ok := verbSchema.Request.(*schema.Ref); ok {
jsonSchema, err := schema.RequestResponseToJSONSchema(sch, *requestData)
if err != nil {
return nil, err
}
jsonData, err := json.MarshalIndent(jsonSchema, "", " ")
if err != nil {
return nil, err
}
jsonRequestSchema = string(jsonData)
}
}

schemaString, err := verbSchemaString(sch, decl)
verb, err := verbFromDecl(decl, sch)
if err != nil {
return nil, err
}
verbs = append(verbs, &pbconsole.Verb{
Verb: v,
Schema: schemaString,
JsonRequestSchema: jsonRequestSchema,
})
verbs = append(verbs, verb)

case *schema.Data:
//nolint:forcetypeassert
d := decl.ToProto().(*schemapb.Data)
data = append(data, &pbconsole.Data{
Data: d,
Schema: schema.DataFromProto(d).String(),
})
data = append(data, dataFromDecl(decl))

case *schema.Secret:
//nolint:forcetypeassert
s := decl.ToProto().(*schemapb.Secret)
secrets = append(secrets, &pbconsole.Secret{
Secret: s,
})
secrets = append(secrets, secretFromDecl(decl))

case *schema.Config:
//nolint:forcetypeassert
c := decl.ToProto().(*schemapb.Config)
configs = append(configs, &pbconsole.Config{
Config: c,
})
configs = append(configs, configFromDecl(decl))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is much nicer here :)


case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.FSM, *schema.Topic, *schema.Subscription:
}
Expand Down Expand Up @@ -188,6 +153,221 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}), nil
}

func moduleFromDeployment(deployment dalmodel.Deployment, sch *schema.Schema) (*pbconsole.Module, error) {
module, err := moduleFromDecls(deployment.Schema.Decls, sch)
if err != nil {
return nil, err
}

module.Name = deployment.Module
module.DeploymentKey = deployment.Key.String()
module.Language = deployment.Language
module.Schema = deployment.Schema.String()

return module, nil
}

func moduleFromDecls(decls []schema.Decl, sch *schema.Schema) (*pbconsole.Module, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some tests (can be in another PR) for some of this stuff? Or is it covered by other tests atm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I only added an integration test for now and didn't test for the contents of the various <blahblah>FromDecl methods since we're going to change so much of that anyway.

var configs []*pbconsole.Config
var data []*pbconsole.Data
var databases []*pbconsole.Database
var enums []*pbconsole.Enum
var fsms []*pbconsole.FSM
var topics []*pbconsole.Topic
var typealiases []*pbconsole.TypeAlias
var secrets []*pbconsole.Secret
var subscriptions []*pbconsole.Subscription
var verbs []*pbconsole.Verb

for _, decl := range decls {
switch decl := decl.(type) {
case *schema.Config:
configs = append(configs, configFromDecl(decl))

case *schema.Data:
data = append(data, dataFromDecl(decl))

case *schema.Database:
databases = append(databases, databaseFromDecl(decl))

case *schema.Enum:
enums = append(enums, enumFromDecl(decl))

case *schema.FSM:
fsms = append(fsms, fsmFromDecl(decl))

case *schema.Topic:
topics = append(topics, topicFromDecl(decl))

case *schema.Secret:
secrets = append(secrets, secretFromDecl(decl))

case *schema.Subscription:
subscriptions = append(subscriptions, subscriptionFromDecl(decl))

case *schema.TypeAlias:
typealiases = append(typealiases, typealiasFromDecl(decl))

case *schema.Verb:
verb, err := verbFromDecl(decl, sch)
if err != nil {
return nil, err
}
verbs = append(verbs, verb)
}
}

return &pbconsole.Module{
Configs: configs,
Data: data,
Databases: databases,
Enums: enums,
Fsms: fsms,
Topics: topics,
Typealiases: typealiases,
Secrets: secrets,
Subscriptions: subscriptions,
Verbs: verbs,
}, nil
}

func configFromDecl(decl *schema.Config) *pbconsole.Config {
return &pbconsole.Config{
//nolint:forcetypeassert
Config: decl.ToProto().(*schemapb.Config),
}
}

func dataFromDecl(decl *schema.Data) *pbconsole.Data {
//nolint:forcetypeassert
d := decl.ToProto().(*schemapb.Data)
return &pbconsole.Data{
Data: d,
Schema: schema.DataFromProto(d).String(),
}
}

func databaseFromDecl(decl *schema.Database) *pbconsole.Database {
return &pbconsole.Database{
//nolint:forcetypeassert
Database: decl.ToProto().(*schemapb.Database),
}
}

func enumFromDecl(decl *schema.Enum) *pbconsole.Enum {
return &pbconsole.Enum{
//nolint:forcetypeassert
Enum: decl.ToProto().(*schemapb.Enum),
}
}

func fsmFromDecl(decl *schema.FSM) *pbconsole.FSM {
return &pbconsole.FSM{
//nolint:forcetypeassert
Fsm: decl.ToProto().(*schemapb.FSM),
}
}

func topicFromDecl(decl *schema.Topic) *pbconsole.Topic {
return &pbconsole.Topic{
//nolint:forcetypeassert
Topic: decl.ToProto().(*schemapb.Topic),
}
}

func typealiasFromDecl(decl *schema.TypeAlias) *pbconsole.TypeAlias {
return &pbconsole.TypeAlias{
//nolint:forcetypeassert
Typealias: decl.ToProto().(*schemapb.TypeAlias),
}
}

func secretFromDecl(decl *schema.Secret) *pbconsole.Secret {
return &pbconsole.Secret{
//nolint:forcetypeassert
Secret: decl.ToProto().(*schemapb.Secret),
}
}

func subscriptionFromDecl(decl *schema.Subscription) *pbconsole.Subscription {
return &pbconsole.Subscription{
//nolint:forcetypeassert
Subscription: decl.ToProto().(*schemapb.Subscription),
}
}

func verbFromDecl(decl *schema.Verb, sch *schema.Schema) (*pbconsole.Verb, error) {
//nolint:forcetypeassert
v := decl.ToProto().(*schemapb.Verb)
verbSchema := schema.VerbFromProto(v)
var jsonRequestSchema string
if verbSchema.Request != nil {
if requestData, ok := verbSchema.Request.(*schema.Ref); ok {
jsonSchema, err := schema.RequestResponseToJSONSchema(sch, *requestData)
if err != nil {
return nil, fmt.Errorf("failed to retrieve JSON schema: %w", err)
}
jsonData, err := json.MarshalIndent(jsonSchema, "", " ")
if err != nil {
return nil, fmt.Errorf("failed to indent JSON schema: %w", err)
}
jsonRequestSchema = string(jsonData)
}
}

schemaString, err := verbSchemaString(sch, decl)
if err != nil {
return nil, err
}
return &pbconsole.Verb{
Verb: v,
Schema: schemaString,
JsonRequestSchema: jsonRequestSchema,
}, nil
}

func (c *ConsoleService) StreamModules(ctx context.Context, req *connect.Request[pbconsole.StreamModulesRequest], stream *connect.ServerStream[pbconsole.StreamModulesResponse]) error {
deployments, err := c.dal.GetDeploymentsWithMinReplicas(ctx)
if err != nil {
return fmt.Errorf("failed to get deployments: %w", err)
}
sch := &schema.Schema{
Modules: slices.Map(deployments, func(d dalmodel.Deployment) *schema.Module {
return d.Schema
}),
}
builtin := schema.Builtins()
sch.Modules = append(sch.Modules, builtin)

var modules []*pbconsole.Module
for _, deployment := range deployments {
module, err := moduleFromDeployment(deployment, sch)
if err != nil {
return err
}
modules = append(modules, module)
}

builtinModule, err := moduleFromDecls(builtin.Decls, sch)
if err != nil {
return err
}
builtinModule.Name = builtin.Name
builtinModule.Language = "go"
builtinModule.Schema = builtin.String()
modules = append(modules, builtinModule)

err = stream.Send(&pbconsole.StreamModulesResponse{
Modules: modules,
})
if err != nil {
return fmt.Errorf("failed to send StreamModulesResponse to stream: %w", err)
}

// TODO: handle deployment updates
return nil
}

func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbconsole.EventsQuery]) (*connect.Response[pbconsole.GetEventsResponse], error) {
query, err := eventsQueryProtoToDAL(req.Msg)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions backend/controller/console/console_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,29 @@ func TestConsoleGetModules(t *testing.T) {
}),
)
}

// StreamModules calls console service GetModules and returns the response.
//
// This action is defined here vs. actions.go because it is only used in this test file.
func StreamModules(onResponse func(t testing.TB, resp *connect.ServerStreamForClient[pbconsole.StreamModulesResponse])) in.Action {
return func(t testing.TB, ic in.TestContext) {
in.Infof("StreamModules")
modules, err := ic.Console.StreamModules(ic.Context, &connect.Request[pbconsole.StreamModulesRequest]{})
assert.NoError(t, err)
onResponse(t, modules)
}
}

func TestConsoleStreamModules(t *testing.T) {
in.Run(t,
in.CopyModule("console"),
in.Deploy("console"),
StreamModules(func(t testing.TB, stream *connect.ServerStreamForClient[pbconsole.StreamModulesResponse]) {
for stream.Receive() {
assert.Equal(t, 2, len(stream.Msg().Modules))
assert.Equal(t, "console", stream.Msg().Modules[0].Name)
assert.Equal(t, "builtin", stream.Msg().Modules[1].Name)
}
}),
)
}
Loading
Loading