From 8ebc9d627659993c56a42e8cb6c69f3c0d501a16 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 19 Dec 2024 10:57:06 +1100 Subject: [PATCH] fix: move generic schema writing to the JVM plugin --- common/schema/schema.go | 40 +++++++++++++ frontend/cli/cmd_new.go | 2 +- internal/buildengine/engine.go | 2 +- internal/buildengine/languageplugin/plugin.go | 4 +- internal/buildengine/stubs.go | 33 +--------- jvm-runtime/plugin/common/java_plugin_test.go | 2 +- jvm-runtime/plugin/common/jvmcommon.go | 60 +++++++++++++++++++ 7 files changed, 106 insertions(+), 37 deletions(-) diff --git a/common/schema/schema.go b/common/schema/schema.go index 576ac0b09..95cafca7a 100644 --- a/common/schema/schema.go +++ b/common/schema/schema.go @@ -215,3 +215,43 @@ func FromProto(s *schemapb.Schema) (*Schema, error) { } return ValidateSchema(schema) } + +// ModuleDependencies returns the modules that the given module depends on +// Dependency modules are the ones that are called by the given module, or that publish topics that the given module subscribes to +func (s *Schema) ModuleDependencies(module string) map[string]*Module { + mods := map[string]*Module{} + for _, sch := range s.Modules { + mods[sch.Name] = sch + } + deps := make(map[string]*Module) + toProcess := []string{module} + for len(toProcess) > 0 { + dep := toProcess[0] + toProcess = toProcess[1:] + if deps[dep] != nil { + continue + } + dm := mods[dep] + deps[dep] = dm + for _, m := range dm.Decls { + if ref, ok := m.(*Verb); ok { + for _, ref := range ref.Metadata { + switch md := ref.(type) { + case *MetadataCalls: + for _, calls := range md.Calls { + if calls.Module != "" { + toProcess = append(toProcess, calls.Module) + } + } + case *MetadataSubscriber: + if md.Topic.Module != "" { + toProcess = append(toProcess, md.Topic.Module) + } + } + } + } + } + } + delete(deps, module) + return deps +} diff --git a/frontend/cli/cmd_new.go b/frontend/cli/cmd_new.go index d3aa38931..9380f8ffa 100644 --- a/frontend/cli/cmd_new.go +++ b/frontend/cli/cmd_new.go @@ -67,7 +67,7 @@ func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPl return optionalPlugin, fmt.Errorf("could not load project config: %w", err) } - plugin, err := languageplugin.New(ctx, filepath.Dir(projConfigPath), language, "new", false) + plugin, err := languageplugin.New(ctx, filepath.Dir(projConfigPath), language, "new") if err != nil { return optionalPlugin, fmt.Errorf("could not create plugin for %v: %w", language, err) diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index fcc831c9a..7a1ab4f22 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -1051,7 +1051,7 @@ func (e *Engine) gatherSchemas( } func (e *Engine) newModuleMeta(ctx context.Context, config moduleconfig.UnvalidatedModuleConfig) (moduleMeta, error) { - plugin, err := languageplugin.New(ctx, config.Dir, config.Language, config.Module, e.devMode) + plugin, err := languageplugin.New(ctx, config.Dir, config.Language, config.Module) if err != nil { return moduleMeta{}, fmt.Errorf("could not create plugin for %s: %w", config.Module, err) } diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index 6dd12e038..bafa3790e 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -96,8 +96,8 @@ type BuildContext struct { var ErrPluginNotRunning = errors.New("language plugin no longer running") -// PluginFromConfig creates a new language plugin from the given config. -func New(ctx context.Context, dir, language, name string, devMode bool) (p *LanguagePlugin, err error) { +// New creates a new language plugin from the given config. +func New(ctx context.Context, dir, language, name string) (p *LanguagePlugin, err error) { impl, err := newClientImpl(ctx, dir, language, name) if err != nil { return nil, err diff --git a/internal/buildengine/stubs.go b/internal/buildengine/stubs.go index 608b8a5b4..e1689a28c 100644 --- a/internal/buildengine/stubs.go +++ b/internal/buildengine/stubs.go @@ -24,7 +24,7 @@ func GenerateStubs(ctx context.Context, projectRoot string, modules []*schema.Mo if err != nil { return err } - return writeGenericSchemaFiles(modules, metas) + return nil } // CleanStubs removes all generated stubs. @@ -107,34 +107,3 @@ func generateStubsForEachLanguage(ctx context.Context, projectRoot string, modul } return nil } - -func writeGenericSchemaFiles(modules []*schema.Module, metas map[string]moduleMeta) error { - sch := &schema.Schema{Modules: modules} - for _, meta := range metas { - module := meta.module.Config - if module.GeneratedSchemaDir == "" { - continue - } - - modPath := module.Abs().GeneratedSchemaDir - err := os.MkdirAll(modPath, 0750) - if err != nil { - return fmt.Errorf("failed to create directory %s: %w", modPath, err) - } - - for _, mod := range sch.Modules { - if mod.Name == module.Module { - continue - } - data, err := schema.ModuleToBytes(mod) - if err != nil { - return fmt.Errorf("failed to export module schema for module %s %w", mod.Name, err) - } - err = os.WriteFile(filepath.Join(modPath, mod.Name+".pb"), data, 0600) - if err != nil { - return fmt.Errorf("failed to write schema file for module %s %w", mod.Name, err) - } - } - } - return nil -} diff --git a/jvm-runtime/plugin/common/java_plugin_test.go b/jvm-runtime/plugin/common/java_plugin_test.go index dce403198..a423daf7f 100644 --- a/jvm-runtime/plugin/common/java_plugin_test.go +++ b/jvm-runtime/plugin/common/java_plugin_test.go @@ -75,7 +75,7 @@ func TestJavaConfigDefaults(t *testing.T) { dir, err := filepath.Abs(tt.dir) assert.NoError(t, err) - plugin, err := languageplugin.New(ctx, t.TempDir(), "java", "test", false) + plugin, err := languageplugin.New(ctx, t.TempDir(), "java", "test") assert.NoError(t, err) t.Cleanup(func() { _ = plugin.Kill() //nolint:errcheck diff --git a/jvm-runtime/plugin/common/jvmcommon.go b/jvm-runtime/plugin/common/jvmcommon.go index df2ecbacc..4fa879ec3 100644 --- a/jvm-runtime/plugin/common/jvmcommon.go +++ b/jvm-runtime/plugin/common/jvmcommon.go @@ -28,6 +28,7 @@ import ( langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" langconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1/languagepbconnect" ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/block/ftl/common/builderrors" "github.com/block/ftl/common/errors" "github.com/block/ftl/common/plugin" @@ -40,6 +41,8 @@ import ( "github.com/block/ftl/internal/flock" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/moduleconfig" + "github.com/block/ftl/internal/rpc" + "github.com/block/ftl/internal/schema/schemaeventsource" "github.com/block/ftl/internal/watch" ) @@ -166,6 +169,10 @@ func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRe if err != nil { return err } + err = s.writeGenericSchemaFiles(buildCtx) + if err != nil { + return err + } if req.Msg.RebuildAutomatically { return s.runDevMode(ctx, req, buildCtx, stream) } @@ -296,6 +303,8 @@ func (s *Service) runQuarkusDev(ctx context.Context, req *connect.Request[langpb debugPort, err := plugin.AllocatePort() debugPort32 := int32(debugPort.Port) + eventSource := schemaeventsource.New(ctx, rpc.Dial(ftlv1connect.NewSchemaServiceClient, os.Getenv("FTL_BIND"), log.Debug)) + if err == nil { devModeBuild = fmt.Sprintf("%s -Ddebug=%d", devModeBuild, debugPort.Port) } @@ -334,6 +343,28 @@ func (s *Service) runQuarkusDev(ctx context.Context, req *connect.Request[langpb } } return nil + case event := <-eventSource.Events(): + // We want to write up-to-date versions of any schema files + // So that our code generation is up-to-date + view := eventSource.View() + // Iterate over the changed modules + for _, module := range event.Schema().Modules { + deps := view.ModuleDependencies(module.Name) + // We don't want to generate a schema if we are a dependency of this module + // This could make it easy to add circular dependencies + if _, ok := deps[buildCtx.Config.Module]; !ok { + bytes, err := schema.ModuleToBytes(module) + if err != nil { + logger.Errorf(err, "failed to write schema for module: %s", module.Name) + continue + } + err = os.WriteFile(filepath.Join(buildCtx.Config.GeneratedSchemaDir, module.Name+".pb"), bytes, 0600) + if err != nil { + logger.Errorf(err, "failed to write schema for module: %s", module.Name) + continue + } + } + } case bc := <-events: buildCtx = bc.buildCtx case <-schemaChangeTicker.C: @@ -519,6 +550,10 @@ func (s *Service) BuildContextUpdated(ctx context.Context, req *connect.Request[ if err != nil { return nil, err } + err = s.writeGenericSchemaFiles(buildCtx) + if err != nil { + return nil, err + } s.updatesTopic.Publish(buildContextUpdatedEvent{ buildCtx: buildCtx, @@ -795,3 +830,28 @@ func loadProtoErrors(config moduleconfig.AbsModuleConfig) (*langpb.ErrorList, er func ptr(s string) *string { return &s } + +func (s *Service) writeGenericSchemaFiles(buildContext buildContext) error { + + modPath := buildContext.Config.GeneratedSchemaDir + err := os.MkdirAll(modPath, 0750) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", modPath, err) + } + + for _, mod := range buildContext.Schema.Modules { + if mod.Name == buildContext.Config.Module { + continue + } + data, err := schema.ModuleToBytes(mod) + if err != nil { + return fmt.Errorf("failed to export module schema for module %s %w", mod.Name, err) + } + err = os.WriteFile(filepath.Join(modPath, mod.Name+".pb"), data, 0600) + if err != nil { + return fmt.Errorf("failed to write schema file for module %s %w", mod.Name, err) + } + } + + return nil +}