From 6db453b1c52ca8b136e0e4ec1b3b6da3b3af1cd4 Mon Sep 17 00:00:00 2001 From: matt durham Date: Sun, 17 Sep 2023 11:50:14 -0400 Subject: [PATCH 01/18] push changes --- component/module/file/file_test.go | 1 + pkg/flow/module_test.go | 75 ++++++++++++++++++++++++++---- 2 files changed, 66 insertions(+), 10 deletions(-) create mode 100644 component/module/file/file_test.go diff --git a/component/module/file/file_test.go b/component/module/file/file_test.go new file mode 100644 index 000000000000..b691ba57a438 --- /dev/null +++ b/component/module/file/file_test.go @@ -0,0 +1 @@ +package file diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index ee3281081667..336e0add18de 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/agent/component" + mod "github.com/grafana/agent/component/module" "github.com/grafana/agent/pkg/flow/logging" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -198,6 +199,39 @@ func TestIDRemoval(t *testing.T) { require.Len(t, nc.(*moduleController).modules, 0) } +func TestIDRemovalIfFailedToLoad(t *testing.T) { + f := New(testOptions(t)) + internalContent := + `test.module \"good\" { content=\"\"}` + fullContent := "test.module \"t1\" { content = \"" + internalContent + "\" }" + fl, err := ReadFile("test", []byte(fullContent)) + require.NoError(t, err) + err = f.LoadFile(fl, nil) + require.NoError(t, err) + ctx := context.Background() + ctx, cnc := context.WithTimeout(ctx, 600*time.Second) + defer cnc() + go f.Run(ctx) + time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + t1 := f.loader.Components()[0].Component().(*testModule) + return t1 != nil + }, 10*time.Second, 100*time.Millisecond) + t1 := f.loader.Components()[0].Component().(*testModule) + m := t1.mc + require.NotNil(t, m) + internalModule := f.modules.modules["test.module.t1"] + require.NotNil(t, internalModule) + t2 := internalModule.f.loader.Components()[0].Component().(*testModule) + require.NotNil(t, t2) + err = t2.updateContent("garbage") + require.Error(t, err) + time.Sleep(5 * time.Second) + // This has to be different since we are passing the string directly instead of via readfile. + err = t2.updateContent("") + require.NoError(t, err) +} + func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { t.Helper() @@ -216,12 +250,19 @@ func init() { component.Register(component.Registration{ Name: "test.module", Args: TestArguments{}, - Exports: TestExports{}, + Exports: mod.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + m, err := mod.NewModuleComponent(opts) + if err != nil { + return nil, err + } + return &testModule{ + mc: m, content: args.(TestArguments).Content, opts: opts, + ch: make(chan error), }, nil }, }) @@ -240,28 +281,42 @@ type testModule struct { args map[string]interface{} exports map[string]interface{} opts component.Options + ch chan error + mc *mod.ModuleComponent } func (t *testModule) Run(ctx context.Context) error { - m, err := t.opts.ModuleController.NewModule("t1", func(exports map[string]any) { - t.exports = exports - if t.opts.OnStateChange == nil { - return - } - t.opts.OnStateChange(TestExports{Exports: exports}) - }) + var err error if err != nil { return err } - err = m.LoadConfig([]byte(t.content), t.args) + err = t.mc.LoadFlowContent(t.args, t.content) if err != nil { return err } - m.Run(ctx) + go t.mc.RunFlowController(ctx) + + for { + select { + case <-ctx.Done(): + return nil + case err = <-t.ch: + return err + } + } return nil } +func (t *testModule) updateContent(content string) error { + t.content = content + err := t.mc.LoadFlowContent(t.args, t.content) + if err != nil { + t.ch <- err + } + return err +} + func (t *testModule) Update(_ component.Arguments) error { return nil } From a2c27559f535d408ac005a1d550bf74623641bc9 Mon Sep 17 00:00:00 2001 From: matt durham Date: Wed, 20 Sep 2023 16:29:27 -0400 Subject: [PATCH 02/18] Add remove whenever using the component module. --- component/module/file/file.go | 2 + component/module/git/git.go | 1 + component/module/http/http.go | 2 + component/module/module.go | 7 +++ component/module/string/string.go | 2 + component/registry.go | 6 ++ pkg/flow/module.go | 10 ++- pkg/flow/module_fail_test.go | 101 ++++++++++++++++++++++++++++++ pkg/flow/module_test.go | 75 +++------------------- 9 files changed, 139 insertions(+), 67 deletions(-) create mode 100644 pkg/flow/module_fail_test.go diff --git a/component/module/file/file.go b/component/module/file/file.go index b523da22efc5..5a98d5458548 100644 --- a/component/module/file/file.go +++ b/component/module/file/file.go @@ -75,9 +75,11 @@ func New(o component.Options, args Arguments) (*Component, error) { c.managedLocalFile, err = c.newManagedLocalComponent(o) if err != nil { + m.Remove() return nil, err } if err := c.Update(args); err != nil { + m.Remove() return nil, err } return c, nil diff --git a/component/module/git/git.go b/component/module/git/git.go index a27ab96f4540..4f5000bec186 100644 --- a/component/module/git/git.go +++ b/component/module/git/git.go @@ -90,6 +90,7 @@ func New(o component.Options, args Arguments) (*Component, error) { } if err := c.Update(args); err != nil { + m.Remove() return nil, err } return c, nil diff --git a/component/module/http/http.go b/component/module/http/http.go index f7a8736102dd..4295309c8393 100644 --- a/component/module/http/http.go +++ b/component/module/http/http.go @@ -74,9 +74,11 @@ func New(o component.Options, args Arguments) (*Component, error) { c.managedRemoteHTTP, err = c.newManagedLocalComponent(o) if err != nil { + m.Remove() return nil, err } if err := c.Update(args); err != nil { + m.Remove() return nil, err } return c, nil diff --git a/component/module/module.go b/component/module/module.go index 9e03e6d9213c..ea6d6f8716dd 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -70,6 +70,13 @@ func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue stri return nil } +// Remove removes the module from the registry if there is an error. +func (c *ModuleComponent) Remove() { + c.mut.Lock() + defer c.mut.Unlock() + c.mod.Remove() +} + // RunFlowController runs the flow controller that all module components start. func (c *ModuleComponent) RunFlowController(ctx context.Context) { c.mod.Run(ctx) diff --git a/component/module/string/string.go b/component/module/string/string.go index c634bd76c09b..17a090275ad7 100644 --- a/component/module/string/string.go +++ b/component/module/string/string.go @@ -45,6 +45,7 @@ var ( // New creates a new module.string component. func New(o component.Options, args Arguments) (*Component, error) { + var err error m, err := module.NewModuleComponent(o) if err != nil { return nil, err @@ -54,6 +55,7 @@ func New(o component.Options, args Arguments) (*Component, error) { } if err := c.Update(args); err != nil { + m.Remove() return nil, err } return c, nil diff --git a/component/registry.go b/component/registry.go index 4425b11b096b..ac30a30e2bc6 100644 --- a/component/registry.go +++ b/component/registry.go @@ -35,6 +35,10 @@ type ModuleController interface { // If id is non-empty, it must be a valid River identifier, matching the // regex /[A-Za-z_][A-Za-z0-9_]/. NewModule(id string, export ExportFunc) (Module, error) + + // RemoveID is used to remove the id from the cache, this is normally called from Run but if + // the Module is created and then fails during loading then it will leak the id. + RemoveID(id string) } // Module is a controller for running components within a Module. @@ -50,6 +54,8 @@ type Module interface { // Run blocks until the provided context is canceled. The ID of a module as defined in // ModuleController.NewModule will not be released until Run returns. Run(context.Context) + + Remove() } // ExportFunc is used for onExport of the Module diff --git a/pkg/flow/module.go b/pkg/flow/module.go index 19183023d5e2..b3cfdbc2e09d 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -64,7 +64,8 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co return mod, nil } -func (m *moduleController) removeID(id string) { +// RemoveID removes a reference from the module registry and name cache. +func (m *moduleController) RemoveID(id string) { m.mut.Lock() defer m.mut.Unlock() @@ -129,12 +130,17 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error { return c.f.LoadFile(ff, args) } +// Remove removed the ID from the registry. Generally only used if fails to start. +func (c *module) Remove() { + c.o.parent.RemoveID(c.o.ID) +} + // Run starts the Module. No components within the Module // will be run until Run is called. // // Run blocks until the provided context is canceled. func (c *module) Run(ctx context.Context) { - defer c.o.parent.removeID(c.o.ID) + defer c.o.parent.RemoveID(c.o.ID) c.f.Run(ctx) } diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go new file mode 100644 index 000000000000..32c72703b395 --- /dev/null +++ b/pkg/flow/module_fail_test.go @@ -0,0 +1,101 @@ +package flow + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/grafana/agent/component" + mod "github.com/grafana/agent/component/module" + "github.com/stretchr/testify/require" +) + +func TestIDRemovalIfFailedToLoad(t *testing.T) { + f := New(testOptions(t)) + + fullContent := "test.fail.module \"t1\" { content = \"\" }" + fl, err := ReadFile("test", []byte(fullContent)) + require.NoError(t, err) + err = f.LoadFile(fl, nil) + require.NoError(t, err) + ctx := context.Background() + ctx, cnc := context.WithTimeout(ctx, 600*time.Second) + defer cnc() + go f.Run(ctx) + require.Eventually(t, func() bool { + t1 := f.loader.Components()[0].Component().(*testFailModule) + return t1 != nil + }, 10*time.Second, 100*time.Millisecond) + t1 := f.loader.Components()[0].Component().(*testFailModule) + badContent := + `test.fail.module "int" { +content="" +fail=true +}` + err = t1.updateContent(badContent) + goodContent := + `test.fail.module "int" { +content="" +fail=false +}` + err = t1.updateContent(goodContent) + require.NoError(t, err) +} + +func init() { + component.Register(component.Registration{ + Name: "test.fail.module", + Args: TestFailArguments{}, + Exports: mod.Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + m, err := mod.NewModuleComponent(opts) + if err != nil { + return nil, err + } + err = m.LoadFlowContent(nil, args.(TestFailArguments).Content) + if args.(TestFailArguments).Fail { + m.Remove() + return nil, fmt.Errorf("module told to fail") + } + return &testFailModule{ + mc: m, + content: args.(TestFailArguments).Content, + opts: opts, + fail: args.(TestFailArguments).Fail, + ch: make(chan error), + }, nil + }, + }) +} + +type TestFailArguments struct { + Content string `river:"content,attr"` + Fail bool `river:"fail,attr,optional"` +} + +type testFailModule struct { + content string + args map[string]interface{} + exports map[string]interface{} + opts component.Options + ch chan error + mc *mod.ModuleComponent + fail bool +} + +func (t *testFailModule) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +func (t *testFailModule) updateContent(content string) error { + t.content = content + err := t.mc.LoadFlowContent(t.args, t.content) + return err +} + +func (t *testFailModule) Update(_ component.Arguments) error { + return nil +} diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index 336e0add18de..ee3281081667 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/grafana/agent/component" - mod "github.com/grafana/agent/component/module" "github.com/grafana/agent/pkg/flow/logging" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -199,39 +198,6 @@ func TestIDRemoval(t *testing.T) { require.Len(t, nc.(*moduleController).modules, 0) } -func TestIDRemovalIfFailedToLoad(t *testing.T) { - f := New(testOptions(t)) - internalContent := - `test.module \"good\" { content=\"\"}` - fullContent := "test.module \"t1\" { content = \"" + internalContent + "\" }" - fl, err := ReadFile("test", []byte(fullContent)) - require.NoError(t, err) - err = f.LoadFile(fl, nil) - require.NoError(t, err) - ctx := context.Background() - ctx, cnc := context.WithTimeout(ctx, 600*time.Second) - defer cnc() - go f.Run(ctx) - time.Sleep(5 * time.Second) - require.Eventually(t, func() bool { - t1 := f.loader.Components()[0].Component().(*testModule) - return t1 != nil - }, 10*time.Second, 100*time.Millisecond) - t1 := f.loader.Components()[0].Component().(*testModule) - m := t1.mc - require.NotNil(t, m) - internalModule := f.modules.modules["test.module.t1"] - require.NotNil(t, internalModule) - t2 := internalModule.f.loader.Components()[0].Component().(*testModule) - require.NotNil(t, t2) - err = t2.updateContent("garbage") - require.Error(t, err) - time.Sleep(5 * time.Second) - // This has to be different since we are passing the string directly instead of via readfile. - err = t2.updateContent("") - require.NoError(t, err) -} - func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { t.Helper() @@ -250,19 +216,12 @@ func init() { component.Register(component.Registration{ Name: "test.module", Args: TestArguments{}, - Exports: mod.Exports{}, + Exports: TestExports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - m, err := mod.NewModuleComponent(opts) - if err != nil { - return nil, err - } - return &testModule{ - mc: m, content: args.(TestArguments).Content, opts: opts, - ch: make(chan error), }, nil }, }) @@ -281,42 +240,28 @@ type testModule struct { args map[string]interface{} exports map[string]interface{} opts component.Options - ch chan error - mc *mod.ModuleComponent } func (t *testModule) Run(ctx context.Context) error { - var err error + m, err := t.opts.ModuleController.NewModule("t1", func(exports map[string]any) { + t.exports = exports + if t.opts.OnStateChange == nil { + return + } + t.opts.OnStateChange(TestExports{Exports: exports}) + }) if err != nil { return err } - err = t.mc.LoadFlowContent(t.args, t.content) + err = m.LoadConfig([]byte(t.content), t.args) if err != nil { return err } - go t.mc.RunFlowController(ctx) - - for { - select { - case <-ctx.Done(): - return nil - case err = <-t.ch: - return err - } - } + m.Run(ctx) return nil } -func (t *testModule) updateContent(content string) error { - t.content = content - err := t.mc.LoadFlowContent(t.args, t.content) - if err != nil { - t.ch <- err - } - return err -} - func (t *testModule) Update(_ component.Arguments) error { return nil } From 13c99ca17799ce6d758d9d19b1e48152589dae9b Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 21 Sep 2023 08:59:52 -0400 Subject: [PATCH 03/18] Add additional context and remove dead file. --- component/module/file/file_test.go | 1 - component/module/module.go | 4 +++- component/registry.go | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) delete mode 100644 component/module/file/file_test.go diff --git a/component/module/file/file_test.go b/component/module/file/file_test.go deleted file mode 100644 index b691ba57a438..000000000000 --- a/component/module/file/file_test.go +++ /dev/null @@ -1 +0,0 @@ -package file diff --git a/component/module/module.go b/component/module/module.go index ea6d6f8716dd..c37301d44649 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -70,10 +70,12 @@ func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue stri return nil } -// Remove removes the module from the registry if there is an error. +// Remove removes the module from the registry. This is normally called after Run but in the case +// where the parent component fails to load and successfully run it must be called manually. func (c *ModuleComponent) Remove() { c.mut.Lock() defer c.mut.Unlock() + c.mod.Remove() } diff --git a/component/registry.go b/component/registry.go index ac30a30e2bc6..9b16dc95e7b0 100644 --- a/component/registry.go +++ b/component/registry.go @@ -55,6 +55,7 @@ type Module interface { // ModuleController.NewModule will not be released until Run returns. Run(context.Context) + // Remove is used to manually remove the module from the registry. Remove() } From dfa0b7739146c040673e28cf5e8a51a991fe5342 Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 21 Sep 2023 09:03:23 -0400 Subject: [PATCH 04/18] Add long changelog comment --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 654aa71d8fe1..55c5eb6a0edf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,9 @@ Main (unreleased) - Fixed a bug where `otelcol.processor.discovery` could modify the `targets` passed by an upstream component. (@ptodev) +- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to + permanently fail to load with `id already exists` error. (@mattdurham) + v0.36.1 (2023-09-06) -------------------- From f8f993043644bd2da6ada513f34027af6c4443ae Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 21 Sep 2023 09:14:47 -0400 Subject: [PATCH 05/18] fix linter --- pkg/flow/module_fail_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index 32c72703b395..c3b964a2a919 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -34,6 +34,7 @@ content="" fail=true }` err = t1.updateContent(badContent) + require.Error(t, err) goodContent := `test.fail.module "int" { content="" @@ -54,11 +55,15 @@ func init() { if err != nil { return nil, err } - err = m.LoadFlowContent(nil, args.(TestFailArguments).Content) if args.(TestFailArguments).Fail { m.Remove() return nil, fmt.Errorf("module told to fail") } + err = m.LoadFlowContent(nil, args.(TestFailArguments).Content) + if err != nil { + m.Remove() + return nil, err + } return &testFailModule{ mc: m, content: args.(TestFailArguments).Content, @@ -77,8 +82,6 @@ type TestFailArguments struct { type testFailModule struct { content string - args map[string]interface{} - exports map[string]interface{} opts component.Options ch chan error mc *mod.ModuleComponent @@ -92,7 +95,7 @@ func (t *testFailModule) Run(ctx context.Context) error { func (t *testFailModule) updateContent(content string) error { t.content = content - err := t.mc.LoadFlowContent(t.args, t.content) + err := t.mc.LoadFlowContent(nil, t.content) return err } From 8ceafb21a87eddc9a06fd3c753e59c3bfd1bf5a0 Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 21 Sep 2023 09:19:27 -0400 Subject: [PATCH 06/18] Remove mutex check --- component/module/module.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/component/module/module.go b/component/module/module.go index c37301d44649..5e41464789ff 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -73,9 +73,6 @@ func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue stri // Remove removes the module from the registry. This is normally called after Run but in the case // where the parent component fails to load and successfully run it must be called manually. func (c *ModuleComponent) Remove() { - c.mut.Lock() - defer c.mut.Unlock() - c.mod.Remove() } From 54b238c9e40baf61266228802f2156e05641f0c0 Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 5 Oct 2023 10:15:55 -0400 Subject: [PATCH 07/18] Add changes to support module id removal. --- component/module/file/file.go | 2 -- component/module/git/git.go | 1 - component/module/http/http.go | 2 -- component/module/module.go | 6 ------ component/module/string/string.go | 1 - component/registry.go | 7 ------- pkg/flow/internal/controller/loader_test.go | 16 +++++++++++++++- pkg/flow/internal/controller/module.go | 3 +++ pkg/flow/internal/controller/node_component.go | 3 +++ pkg/flow/module.go | 15 +++++---------- pkg/flow/module_fail_test.go | 3 +-- pkg/flow/module_test.go | 16 ---------------- 12 files changed, 27 insertions(+), 48 deletions(-) diff --git a/component/module/file/file.go b/component/module/file/file.go index 5a98d5458548..b523da22efc5 100644 --- a/component/module/file/file.go +++ b/component/module/file/file.go @@ -75,11 +75,9 @@ func New(o component.Options, args Arguments) (*Component, error) { c.managedLocalFile, err = c.newManagedLocalComponent(o) if err != nil { - m.Remove() return nil, err } if err := c.Update(args); err != nil { - m.Remove() return nil, err } return c, nil diff --git a/component/module/git/git.go b/component/module/git/git.go index 4f5000bec186..a27ab96f4540 100644 --- a/component/module/git/git.go +++ b/component/module/git/git.go @@ -90,7 +90,6 @@ func New(o component.Options, args Arguments) (*Component, error) { } if err := c.Update(args); err != nil { - m.Remove() return nil, err } return c, nil diff --git a/component/module/http/http.go b/component/module/http/http.go index 4295309c8393..f7a8736102dd 100644 --- a/component/module/http/http.go +++ b/component/module/http/http.go @@ -74,11 +74,9 @@ func New(o component.Options, args Arguments) (*Component, error) { c.managedRemoteHTTP, err = c.newManagedLocalComponent(o) if err != nil { - m.Remove() return nil, err } if err := c.Update(args); err != nil { - m.Remove() return nil, err } return c, nil diff --git a/component/module/module.go b/component/module/module.go index 5e41464789ff..9e03e6d9213c 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -70,12 +70,6 @@ func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue stri return nil } -// Remove removes the module from the registry. This is normally called after Run but in the case -// where the parent component fails to load and successfully run it must be called manually. -func (c *ModuleComponent) Remove() { - c.mod.Remove() -} - // RunFlowController runs the flow controller that all module components start. func (c *ModuleComponent) RunFlowController(ctx context.Context) { c.mod.Run(ctx) diff --git a/component/module/string/string.go b/component/module/string/string.go index 17a090275ad7..a32c13186dc3 100644 --- a/component/module/string/string.go +++ b/component/module/string/string.go @@ -55,7 +55,6 @@ func New(o component.Options, args Arguments) (*Component, error) { } if err := c.Update(args); err != nil { - m.Remove() return nil, err } return c, nil diff --git a/component/registry.go b/component/registry.go index 9b16dc95e7b0..4425b11b096b 100644 --- a/component/registry.go +++ b/component/registry.go @@ -35,10 +35,6 @@ type ModuleController interface { // If id is non-empty, it must be a valid River identifier, matching the // regex /[A-Za-z_][A-Za-z0-9_]/. NewModule(id string, export ExportFunc) (Module, error) - - // RemoveID is used to remove the id from the cache, this is normally called from Run but if - // the Module is created and then fails during loading then it will leak the id. - RemoveID(id string) } // Module is a controller for running components within a Module. @@ -54,9 +50,6 @@ type Module interface { // Run blocks until the provided context is canceled. The ID of a module as defined in // ModuleController.NewModule will not be released until Run returns. Run(context.Context) - - // Remove is used to manually remove the module from the registry. - Remove() } // ExportFunc is used for onExport of the Module diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index c806a3ad35c8..ac52de2ec3df 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/dag" "github.com/grafana/agent/pkg/flow/logging" @@ -220,7 +221,7 @@ func TestScopeWithFailingComponent(t *testing.T) { OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), NewModuleController: func(id string, availableServices []string) controller.ModuleController { - return nil + return fakeModuleController{} }, }, } @@ -315,3 +316,16 @@ func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) { } require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match") } + +type fakeModuleController struct{} + +func (f fakeModuleController) NewModule(id string, export component.ExportFunc) (component.Module, error) { + return nil, nil +} + +func (f fakeModuleController) ModuleIDs() []string { + return nil +} + +func (f fakeModuleController) ClearModuleIDs() { +} diff --git a/pkg/flow/internal/controller/module.go b/pkg/flow/internal/controller/module.go index a672763cf79a..5b3f9e341485 100644 --- a/pkg/flow/internal/controller/module.go +++ b/pkg/flow/internal/controller/module.go @@ -9,4 +9,7 @@ type ModuleController interface { // ModuleIDs returns the list of managed modules in unspecified order. ModuleIDs() []string + + // ClearModuleIDs removes all the module id references from the controller. + ClearModuleIDs() } diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index e734787f96f1..61d62a8e8161 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -255,6 +255,7 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { case nil: cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") default: + cn.moduleController.ClearModuleIDs() msg := fmt.Sprintf("component evaluation failed: %s", err) cn.setEvalHealth(component.HealthTypeUnhealthy, msg) } @@ -316,6 +317,8 @@ func (cn *ComponentNode) Run(ctx context.Context) error { cn.mut.RLock() managed := cn.managed cn.mut.RUnlock() + // When finished running clear the module ids. + defer cn.moduleController.ClearModuleIDs() if managed == nil { return ErrUnevaluated diff --git a/pkg/flow/module.go b/pkg/flow/module.go index b3cfdbc2e09d..87aa5c56f284 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -64,13 +64,14 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co return mod, nil } -// RemoveID removes a reference from the module registry and name cache. -func (m *moduleController) RemoveID(id string) { +func (m *moduleController) ClearModuleIDs() { m.mut.Lock() defer m.mut.Unlock() - delete(m.modules, id) - m.o.ModuleRegistry.Unregister(id) + for id := range m.modules { + m.o.ModuleRegistry.Unregister(id) + } + m.modules = make(map[string]struct{}) } // ModuleIDs implements [controller.ModuleController]. @@ -130,17 +131,11 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error { return c.f.LoadFile(ff, args) } -// Remove removed the ID from the registry. Generally only used if fails to start. -func (c *module) Remove() { - c.o.parent.RemoveID(c.o.ID) -} - // Run starts the Module. No components within the Module // will be run until Run is called. // // Run blocks until the provided context is canceled. func (c *module) Run(ctx context.Context) { - defer c.o.parent.RemoveID(c.o.ID) c.f.Run(ctx) } diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index c3b964a2a919..ea9c4bae3b51 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -34,6 +34,7 @@ content="" fail=true }` err = t1.updateContent(badContent) + // Because we have bad content this should fail, but the ids should be removed. require.Error(t, err) goodContent := `test.fail.module "int" { @@ -56,12 +57,10 @@ func init() { return nil, err } if args.(TestFailArguments).Fail { - m.Remove() return nil, fmt.Errorf("module told to fail") } err = m.LoadFlowContent(nil, args.(TestFailArguments).Content) if err != nil { - m.Remove() return nil, err } return &testFailModule{ diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index ee3281081667..282d7f179026 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -182,22 +182,6 @@ func TestIDCollision(t *testing.T) { require.Nil(t, m) } -func TestIDRemoval(t *testing.T) { - opts := testModuleControllerOptions(t) - opts.ID = "test" - nc := newModuleController(opts) - m, err := nc.NewModule("t1", func(exports map[string]any) {}) - require.NoError(t, err) - err = m.LoadConfig([]byte(""), nil) - require.NoError(t, err) - require.NotNil(t, m) - ctx := context.Background() - ctx, cncl := context.WithTimeout(ctx, 1*time.Second) - defer cncl() - m.Run(ctx) - require.Len(t, nc.(*moduleController).modules, 0) -} - func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { t.Helper() From 91c0abe50f7abc36edce98156f33ec35988b2017 Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 5 Oct 2023 10:19:32 -0400 Subject: [PATCH 08/18] Remove unneeded line --- component/module/string/string.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/module/string/string.go b/component/module/string/string.go index a32c13186dc3..c634bd76c09b 100644 --- a/component/module/string/string.go +++ b/component/module/string/string.go @@ -45,7 +45,6 @@ var ( // New creates a new module.string component. func New(o component.Options, args Arguments) (*Component, error) { - var err error m, err := module.NewModuleComponent(o) if err != nil { return nil, err From cb8d986565b6412c1d579dfd557b722a209d3866 Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 5 Oct 2023 10:38:29 -0400 Subject: [PATCH 09/18] Fix merge errors. --- pkg/flow/module_fail_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index ea9c4bae3b51..ba8ef041c1ab 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -15,9 +15,9 @@ func TestIDRemovalIfFailedToLoad(t *testing.T) { f := New(testOptions(t)) fullContent := "test.fail.module \"t1\" { content = \"\" }" - fl, err := ReadFile("test", []byte(fullContent)) + fl, err := ParseSource("test", []byte(fullContent)) require.NoError(t, err) - err = f.LoadFile(fl, nil) + err = f.LoadSource(fl, nil) require.NoError(t, err) ctx := context.Background() ctx, cnc := context.WithTimeout(ctx, 600*time.Second) @@ -59,7 +59,7 @@ func init() { if args.(TestFailArguments).Fail { return nil, fmt.Errorf("module told to fail") } - err = m.LoadFlowContent(nil, args.(TestFailArguments).Content) + err = m.LoadFlowSource(nil, args.(TestFailArguments).Content) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func (t *testFailModule) Run(ctx context.Context) error { func (t *testFailModule) updateContent(content string) error { t.content = content - err := t.mc.LoadFlowContent(nil, t.content) + err := t.mc.LoadFlowSource(nil, t.content) return err } From 92cb1c1651b9c9ace80422e1af2c1504712e3b68 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 10 Oct 2023 09:43:13 -0400 Subject: [PATCH 10/18] Ensure module is checked on first run. --- .../internal/controller/node_component.go | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index de1c7959ba55..8af7e6c75ca9 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -80,18 +80,19 @@ type ComponentGlobals struct { // arguments and exports. ComponentNode manages the arguments for the component // from a River block. type ComponentNode struct { - id ComponentID - globalID string - label string - componentName string - nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. - reg component.Registration - managedOpts component.Options - registry *prometheus.Registry - exportsType reflect.Type - moduleController ModuleController - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate - lastUpdateTime atomic.Time + id ComponentID + globalID string + label string + componentName string + nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. + reg component.Registration + managedOpts component.Options + registry *prometheus.Registry + exportsType reflect.Type + moduleSuccessCheck sync.Once // This is used on first startup to see if module loaded correctly and if not then clean up registry. + moduleController ModuleController + OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate + lastUpdateTime atomic.Time mut sync.RWMutex block *ast.BlockStmt // Current River block to derive args from @@ -254,10 +255,14 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { case nil: cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") default: - cn.moduleController.ClearModuleIDs() msg := fmt.Sprintf("component evaluation failed: %s", err) cn.setEvalHealth(component.HealthTypeUnhealthy, msg) } + cn.moduleSuccessCheck.Do(func() { + if err != nil { + cn.moduleController.ClearModuleIDs() + } + }) return err } From f9e89e0ac3adc7ef789f01bc6b7ace63c279993d Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 10 Oct 2023 09:50:02 -0400 Subject: [PATCH 11/18] rename and add comments --- pkg/flow/internal/controller/node_component.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index 8af7e6c75ca9..6a7d84b7b07f 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -89,7 +89,7 @@ type ComponentNode struct { managedOpts component.Options registry *prometheus.Registry exportsType reflect.Type - moduleSuccessCheck sync.Once // This is used on first startup to see if module loaded correctly and if not then clean up registry. + moduleFailureCheck sync.Once // This is used on first startup to see if module loaded correctly and if not then clean up registry. moduleController ModuleController OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate lastUpdateTime atomic.Time @@ -258,7 +258,9 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { msg := fmt.Sprintf("component evaluation failed: %s", err) cn.setEvalHealth(component.HealthTypeUnhealthy, msg) } - cn.moduleSuccessCheck.Do(func() { + // Check for failure on initial loading, if not then we need to remove any modules it the component may habe created. + // In the case where the component is not a module loader this is a noop. + cn.moduleFailureCheck.Do(func() { if err != nil { cn.moduleController.ClearModuleIDs() } From 6d28509f89014a31e53c35c1da189ed99bd0a7bd Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 10 Oct 2023 10:22:35 -0400 Subject: [PATCH 12/18] Add manual removal back in and make test closer to actual usage. --- pkg/flow/module.go | 10 ++++++++++ pkg/flow/module_fail_test.go | 4 +++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/flow/module.go b/pkg/flow/module.go index 39c8e2324b74..a80618c6b290 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -65,6 +65,15 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co return mod, nil } +func (m *moduleController) removeID(id string) { + m.mut.Lock() + defer m.mut.Unlock() + + m.o.ModuleRegistry.Unregister(id) + delete(m.modules, id) +} + +// ClearModuleIDs removes all ids from the registry. func (m *moduleController) ClearModuleIDs() { m.mut.Lock() defer m.mut.Unlock() @@ -138,6 +147,7 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error { // // Run blocks until the provided context is canceled. func (c *module) Run(ctx context.Context) { + defer c.o.parent.removeID(c.o.ID) c.f.Run(ctx) } diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index ba8ef041c1ab..b15407a57b7b 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -21,7 +21,7 @@ func TestIDRemovalIfFailedToLoad(t *testing.T) { require.NoError(t, err) ctx := context.Background() ctx, cnc := context.WithTimeout(ctx, 600*time.Second) - defer cnc() + go f.Run(ctx) require.Eventually(t, func() bool { t1 := f.loader.Components()[0].Component().(*testFailModule) @@ -43,6 +43,7 @@ fail=false }` err = t1.updateContent(goodContent) require.NoError(t, err) + cnc() } func init() { @@ -88,6 +89,7 @@ type testFailModule struct { } func (t *testFailModule) Run(ctx context.Context) error { + go t.mc.RunFlowController(ctx) <-ctx.Done() return nil } From 6c44b21f6ab97b26801e98b1fd6c312d0221bbc9 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 10 Oct 2023 10:25:13 -0400 Subject: [PATCH 13/18] move changelog comment to correct location --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34f3bb287137..92088e6f39f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,9 @@ Main (unreleased) - Fixed an issue where `loki.process` validation for stage `metric.counter` was allowing invalid combination of configuration options. (@thampiotr) + +- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to + permanently fail to load with `id already exists` error. (@mattdurham) ### Enhancements @@ -237,9 +240,6 @@ v0.36.2 (2023-09-22) - Fix `loki.source.file` race condition in cleaning up metrics when stopping to tail files. (@thampiotr) -- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to - permanently fail to load with `id already exists` error. (@mattdurham) - v0.36.1 (2023-09-06) -------------------- From db2387654e514c85afe58999337ee3cc30025fba Mon Sep 17 00:00:00 2001 From: matt durham Date: Wed, 11 Oct 2023 16:13:44 -0400 Subject: [PATCH 14/18] A different approach by keying off run instead of build. --- component/module/module.go | 6 ++- component/registry.go | 2 +- pkg/flow/flow_services_test.go | 15 ++++-- pkg/flow/internal/controller/module.go | 3 -- .../internal/controller/node_component.go | 37 +++++--------- pkg/flow/module.go | 51 ++++++++++--------- pkg/flow/module_fail_test.go | 3 ++ pkg/flow/module_test.go | 40 ++++++++------- 8 files changed, 81 insertions(+), 76 deletions(-) diff --git a/component/module/module.go b/component/module/module.go index 79d95a002826..53e886721e65 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" ) @@ -71,7 +72,10 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin // RunFlowController runs the flow controller that all module components start. func (c *ModuleComponent) RunFlowController(ctx context.Context) { - c.mod.Run(ctx) + err := c.mod.Run(ctx) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "error running module", "id", c.opts.ID, "err", err) + } } // CurrentHealth contains the implementation details for CurrentHealth in a module component. diff --git a/component/registry.go b/component/registry.go index f544bf8287f7..20a364be81c4 100644 --- a/component/registry.go +++ b/component/registry.go @@ -49,7 +49,7 @@ type Module interface { // // Run blocks until the provided context is canceled. The ID of a module as defined in // ModuleController.NewModule will not be released until Run returns. - Run(context.Context) + Run(context.Context) error } // ExportFunc is used for onExport of the Module diff --git a/pkg/flow/flow_services_test.go b/pkg/flow/flow_services_test.go index 128b1267e180..7639daea8144 100644 --- a/pkg/flow/flow_services_test.go +++ b/pkg/flow/flow_services_test.go @@ -183,7 +183,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - componentBuilt := util.NewWaitTrigger() + componentRunning := util.NewWaitTrigger() var ( svc = &testservices.Fake{ @@ -218,9 +218,14 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) { Name: "service_consumer", Args: struct{}{}, NeedsServices: []string{"service"}, - Build: func(_ component.Options, _ component.Arguments) (component.Component, error) { - componentBuilt.Trigger() - return &testcomponents.Fake{}, nil + Build: func(o component.Options, _ component.Arguments) (component.Component, error) { + return &testcomponents.Fake{ + RunFunc: func(ctx context.Context) error { + componentRunning.Trigger() + <-ctx.Done() + return nil + }, + }, nil }, }, } @@ -243,7 +248,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) { require.NoError(t, ctrl.LoadSource(f, nil)) go ctrl.Run(ctx) - require.NoError(t, componentBuilt.Wait(5*time.Second), "Component should have been built") + require.NoError(t, componentRunning.Wait(5*time.Second), "Component should have been built") consumers := ctrl.GetServiceConsumers("service") require.Len(t, consumers, 2, "There should be a consumer for the module loader and the module's component") diff --git a/pkg/flow/internal/controller/module.go b/pkg/flow/internal/controller/module.go index 5b3f9e341485..a672763cf79a 100644 --- a/pkg/flow/internal/controller/module.go +++ b/pkg/flow/internal/controller/module.go @@ -9,7 +9,4 @@ type ModuleController interface { // ModuleIDs returns the list of managed modules in unspecified order. ModuleIDs() []string - - // ClearModuleIDs removes all the module id references from the controller. - ClearModuleIDs() } diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index 6a7d84b7b07f..9481fd1f5b5b 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -80,19 +80,18 @@ type ComponentGlobals struct { // arguments and exports. ComponentNode manages the arguments for the component // from a River block. type ComponentNode struct { - id ComponentID - globalID string - label string - componentName string - nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. - reg component.Registration - managedOpts component.Options - registry *prometheus.Registry - exportsType reflect.Type - moduleFailureCheck sync.Once // This is used on first startup to see if module loaded correctly and if not then clean up registry. - moduleController ModuleController - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate - lastUpdateTime atomic.Time + id ComponentID + globalID string + label string + componentName string + nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. + reg component.Registration + managedOpts component.Options + registry *prometheus.Registry + exportsType reflect.Type + moduleController ModuleController + OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate + lastUpdateTime atomic.Time mut sync.RWMutex block *ast.BlockStmt // Current River block to derive args from @@ -258,14 +257,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { msg := fmt.Sprintf("component evaluation failed: %s", err) cn.setEvalHealth(component.HealthTypeUnhealthy, msg) } - // Check for failure on initial loading, if not then we need to remove any modules it the component may habe created. - // In the case where the component is not a module loader this is a noop. - cn.moduleFailureCheck.Do(func() { - if err != nil { - cn.moduleController.ClearModuleIDs() - } - }) - return err } @@ -311,7 +302,7 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error { } // Run runs the managed component in the calling goroutine until ctx is -// canceled. Evaluate must have been called at least once without retuning an +// canceled. Evaluate must have been called at least once without returning an // error before calling Run. // // Run will immediately return ErrUnevaluated if Evaluate has never been called @@ -320,8 +311,6 @@ func (cn *ComponentNode) Run(ctx context.Context) error { cn.mut.RLock() managed := cn.managed cn.mut.RUnlock() - // When finished running clear the module ids. - defer cn.moduleController.ClearModuleIDs() if managed == nil { return ErrUnevaluated diff --git a/pkg/flow/module.go b/pkg/flow/module.go index a80618c6b290..47bdf620eeaf 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -6,6 +6,7 @@ import ( "path" "sync" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/worker" @@ -46,48 +47,42 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co if id != "" { fullPath = path.Join(fullPath, id) } - if _, found := m.modules[fullPath]; found { - return nil, fmt.Errorf("id %s already exists", id) - } mod := newModule(&moduleOptions{ - ID: fullPath, + FullPath: fullPath, + LocalID: id, export: export, moduleControllerOptions: m.o, parent: m, }) - if err := m.o.ModuleRegistry.Register(fullPath, mod); err != nil { - return nil, err - } - - m.modules[fullPath] = struct{}{} return mod, nil } -func (m *moduleController) removeID(id string) { +func (m *moduleController) removeModule(mod *module) { m.mut.Lock() defer m.mut.Unlock() - m.o.ModuleRegistry.Unregister(id) - delete(m.modules, id) + m.o.ModuleRegistry.Unregister(mod.o.FullPath) + delete(m.modules, mod.o.FullPath) } -// ClearModuleIDs removes all ids from the registry. -func (m *moduleController) ClearModuleIDs() { +func (m *moduleController) addModule(mod *module) error { m.mut.Lock() defer m.mut.Unlock() - - for id := range m.modules { - m.o.ModuleRegistry.Unregister(id) + if err := m.o.ModuleRegistry.Register(mod.o.FullPath, mod); err != nil { + level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.FullPath, "err", err) + return err } - m.modules = make(map[string]struct{}) + m.modules[mod.o.FullPath] = struct{}{} + return nil } // ModuleIDs implements [controller.ModuleController]. func (m *moduleController) ModuleIDs() []string { m.mut.RLock() defer m.mut.RUnlock() + return maps.Keys(m.modules) } @@ -97,9 +92,10 @@ type module struct { } type moduleOptions struct { - ID string - export component.ExportFunc - parent *moduleController + FullPath string // FullPath is the full name including all parents, "module.file.example.prometheus.remote_write.id". + LocalID string // LocalID is the id of this module without the full path, "prometheus.remote_write.id" of the above. + export component.ExportFunc + parent *moduleController *moduleControllerOptions } @@ -117,7 +113,7 @@ func newModule(o *moduleOptions) *module { ComponentRegistry: o.ComponentRegistry, WorkerPool: o.WorkerPool, Options: Options{ - ControllerID: o.ID, + ControllerID: o.FullPath, Tracer: o.Tracer, Reg: o.Reg, Logger: o.Logger, @@ -135,7 +131,7 @@ func newModule(o *moduleOptions) *module { // LoadConfig parses River config and loads it. func (c *module) LoadConfig(config []byte, args map[string]any) error { - ff, err := ParseSource(c.o.ID, config) + ff, err := ParseSource(c.o.FullPath, config) if err != nil { return err } @@ -146,9 +142,14 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error { // will be run until Run is called. // // Run blocks until the provided context is canceled. -func (c *module) Run(ctx context.Context) { - defer c.o.parent.removeID(c.o.ID) +func (c *module) Run(ctx context.Context) error { + if err := c.o.parent.addModule(c); err != nil { + return err + } + defer c.o.parent.removeModule(c) + c.f.Run(ctx) + return nil } // moduleControllerOptions holds static options for module controller. diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index b15407a57b7b..4efeb9102aa4 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -36,6 +36,9 @@ fail=true err = t1.updateContent(badContent) // Because we have bad content this should fail, but the ids should be removed. require.Error(t, err) + // fail a second time to ensure the once is done again. + err = t1.updateContent(badContent) + require.Error(t, err) goodContent := `test.fail.module "int" { content="" diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index 4bc2c30dad49..93bd1b681b8a 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -185,26 +185,32 @@ func TestIDList(t *testing.T) { nc := newModuleController(o) require.Len(t, nc.ModuleIDs(), 0) - _, err := nc.NewModule("t1", nil) + mod1, err := nc.NewModule("t1", nil) require.NoError(t, err) - require.Len(t, nc.ModuleIDs(), 1) - - _, err = nc.NewModule("t2", nil) - require.NoError(t, err) - require.Len(t, nc.ModuleIDs(), 2) -} + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + go func() { + m1err := mod1.Run(ctx) + require.NoError(t, m1err) + }() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 1 + }, 1*time.Second, 100*time.Millisecond) -func TestIDCollision(t *testing.T) { - defer verifyNoGoroutineLeaks(t) - o := testModuleControllerOptions(t) - defer o.WorkerPool.Stop() - nc := newModuleController(o) - m, err := nc.NewModule("t1", nil) + mod2, err := nc.NewModule("t2", nil) require.NoError(t, err) - require.NotNil(t, m) - m, err = nc.NewModule("t1", nil) - require.Error(t, err) - require.Nil(t, m) + go func() { + m2err := mod2.Run(ctx) + require.NoError(t, m2err) + }() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 2 + }, 1*time.Second, 100*time.Millisecond) + // Call cncl which will stop the run methods and remove the ids from the module controller + cncl() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 0 + }, 1*time.Second, 100*time.Millisecond) } func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { From a9d509d2d3432816a06dcce03c35f88a955d5b90 Mon Sep 17 00:00:00 2001 From: matt durham Date: Wed, 11 Oct 2023 16:30:54 -0400 Subject: [PATCH 15/18] Add test for duplicate registration. --- pkg/flow/module_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index 93bd1b681b8a..0a371bc99a63 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -213,6 +213,32 @@ func TestIDList(t *testing.T) { }, 1*time.Second, 100*time.Millisecond) } +func TestDuplicateIDList(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + o := testModuleControllerOptions(t) + defer o.WorkerPool.Stop() + nc := newModuleController(o) + require.Len(t, nc.ModuleIDs(), 0) + + mod1, err := nc.NewModule("t1", nil) + require.NoError(t, err) + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + defer cncl() + go func() { + m1err := mod1.Run(ctx) + require.NoError(t, m1err) + }() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + // This should panic with duplicate registration. + require.PanicsWithError(t, "duplicate metrics collector registration attempted", func() { + _, _ = nc.NewModule("t1", nil) + }) +} + func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { t.Helper() From e81a2ef482f07a28be3f3ce144df74fa62eaac07 Mon Sep 17 00:00:00 2001 From: matt durham Date: Thu, 12 Oct 2023 16:14:50 -0400 Subject: [PATCH 16/18] Minor changes --- pkg/flow/module.go | 24 +++++++++++------------- pkg/flow/module_registry.go | 2 +- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/pkg/flow/module.go b/pkg/flow/module.go index 47bdf620eeaf..10aa353ee882 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -49,8 +49,7 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co } mod := newModule(&moduleOptions{ - FullPath: fullPath, - LocalID: id, + ID: fullPath, export: export, moduleControllerOptions: m.o, parent: m, @@ -63,18 +62,18 @@ func (m *moduleController) removeModule(mod *module) { m.mut.Lock() defer m.mut.Unlock() - m.o.ModuleRegistry.Unregister(mod.o.FullPath) - delete(m.modules, mod.o.FullPath) + m.o.ModuleRegistry.Unregister(mod.o.ID) + delete(m.modules, mod.o.ID) } func (m *moduleController) addModule(mod *module) error { m.mut.Lock() defer m.mut.Unlock() - if err := m.o.ModuleRegistry.Register(mod.o.FullPath, mod); err != nil { - level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.FullPath, "err", err) + if err := m.o.ModuleRegistry.Register(mod.o.ID, mod); err != nil { + level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.ID, "err", err) return err } - m.modules[mod.o.FullPath] = struct{}{} + m.modules[mod.o.ID] = struct{}{} return nil } @@ -92,10 +91,9 @@ type module struct { } type moduleOptions struct { - FullPath string // FullPath is the full name including all parents, "module.file.example.prometheus.remote_write.id". - LocalID string // LocalID is the id of this module without the full path, "prometheus.remote_write.id" of the above. - export component.ExportFunc - parent *moduleController + ID string // ID is the full name including all parents, "module.file.example.prometheus.remote_write.id". + export component.ExportFunc + parent *moduleController *moduleControllerOptions } @@ -113,7 +111,7 @@ func newModule(o *moduleOptions) *module { ComponentRegistry: o.ComponentRegistry, WorkerPool: o.WorkerPool, Options: Options{ - ControllerID: o.FullPath, + ControllerID: o.ID, Tracer: o.Tracer, Reg: o.Reg, Logger: o.Logger, @@ -131,7 +129,7 @@ func newModule(o *moduleOptions) *module { // LoadConfig parses River config and loads it. func (c *module) LoadConfig(config []byte, args map[string]any) error { - ff, err := ParseSource(c.o.FullPath, config) + ff, err := ParseSource(c.o.ID, config) if err != nil { return err } diff --git a/pkg/flow/module_registry.go b/pkg/flow/module_registry.go index 7b88dcaf717b..34e89fb6f4de 100644 --- a/pkg/flow/module_registry.go +++ b/pkg/flow/module_registry.go @@ -16,7 +16,7 @@ func newModuleRegistry() *moduleRegistry { } } -// Get retrives a module by ID. +// Get retrieves a module by ID. func (reg *moduleRegistry) Get(id string) (*module, bool) { reg.mut.RLock() defer reg.mut.RUnlock() From e69db8c6b074bb6945b30a017180711892f461a2 Mon Sep 17 00:00:00 2001 From: matt durham Date: Mon, 16 Oct 2023 16:37:15 -0400 Subject: [PATCH 17/18] PR feedback --- pkg/flow/componenttest/testfailmodule.go | 67 ++++++++++++++++ pkg/flow/module_fail_test.go | 97 +++++++----------------- 2 files changed, 96 insertions(+), 68 deletions(-) create mode 100644 pkg/flow/componenttest/testfailmodule.go diff --git a/pkg/flow/componenttest/testfailmodule.go b/pkg/flow/componenttest/testfailmodule.go new file mode 100644 index 000000000000..011659f95564 --- /dev/null +++ b/pkg/flow/componenttest/testfailmodule.go @@ -0,0 +1,67 @@ +package componenttest + +import ( + "context" + "fmt" + + "github.com/grafana/agent/component" + mod "github.com/grafana/agent/component/module" +) + +func init() { + component.Register(component.Registration{ + Name: "test.fail.module", + Args: TestFailArguments{}, + Exports: mod.Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + m, err := mod.NewModuleComponent(opts) + if err != nil { + return nil, err + } + if args.(TestFailArguments).Fail { + return nil, fmt.Errorf("module told to fail") + } + err = m.LoadFlowSource(nil, args.(TestFailArguments).Content) + if err != nil { + return nil, err + } + return &TestFailModule{ + mc: m, + content: args.(TestFailArguments).Content, + opts: opts, + fail: args.(TestFailArguments).Fail, + ch: make(chan error), + }, nil + }, + }) +} + +type TestFailArguments struct { + Content string `river:"content,attr"` + Fail bool `river:"fail,attr,optional"` +} + +type TestFailModule struct { + content string + opts component.Options + ch chan error + mc *mod.ModuleComponent + fail bool +} + +func (t *TestFailModule) Run(ctx context.Context) error { + go t.mc.RunFlowController(ctx) + <-ctx.Done() + return nil +} + +func (t *TestFailModule) UpdateContent(content string) error { + t.content = content + err := t.mc.LoadFlowSource(nil, t.content) + return err +} + +func (t *TestFailModule) Update(_ component.Arguments) error { + return nil +} diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index 4efeb9102aa4..895bea6d65ac 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -2,12 +2,10 @@ package flow import ( "context" - "fmt" "testing" "time" - "github.com/grafana/agent/component" - mod "github.com/grafana/agent/component/module" + "github.com/grafana/agent/pkg/flow/componenttest" "github.com/stretchr/testify/require" ) @@ -23,86 +21,49 @@ func TestIDRemovalIfFailedToLoad(t *testing.T) { ctx, cnc := context.WithTimeout(ctx, 600*time.Second) go f.Run(ctx) + var t1 *componenttest.TestFailModule require.Eventually(t, func() bool { - t1 := f.loader.Components()[0].Component().(*testFailModule) + t1 = f.loader.Components()[0].Component().(*componenttest.TestFailModule) return t1 != nil }, 10*time.Second, 100*time.Millisecond) - t1 := f.loader.Components()[0].Component().(*testFailModule) + require.Eventually(t, func() bool { + // This should be one due to t1. + return len(f.modules.modules) == 1 + }, 10*time.Second, 100*time.Millisecond) badContent := - `test.fail.module "int" { + `test.fail.module "bad" { content="" fail=true }` - err = t1.updateContent(badContent) + err = t1.UpdateContent(badContent) // Because we have bad content this should fail, but the ids should be removed. require.Error(t, err) + require.Eventually(t, func() bool { + // Only one since the bad one never should have been added. + rightLength := len(f.modules.modules) == 1 + _, foundT1 := f.modules.modules["test.fail.module.t1"] + return rightLength && foundT1 + }, 10*time.Second, 100*time.Millisecond) // fail a second time to ensure the once is done again. - err = t1.updateContent(badContent) + err = t1.UpdateContent(badContent) require.Error(t, err) + goodContent := - `test.fail.module "int" { + `test.fail.module "good" { content="" fail=false }` - err = t1.updateContent(goodContent) + err = t1.UpdateContent(goodContent) require.NoError(t, err) + require.Eventually(t, func() bool { + rightLength := len(f.modules.modules) == 2 + _, foundT1 := f.modules.modules["test.fail.module.t1"] + _, foundGood := f.modules.modules["test.fail.module.t1/test.fail.module.good"] + return rightLength && foundT1 && foundGood + }, 10*time.Second, 100*time.Millisecond) cnc() -} - -func init() { - component.Register(component.Registration{ - Name: "test.fail.module", - Args: TestFailArguments{}, - Exports: mod.Exports{}, - - Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - m, err := mod.NewModuleComponent(opts) - if err != nil { - return nil, err - } - if args.(TestFailArguments).Fail { - return nil, fmt.Errorf("module told to fail") - } - err = m.LoadFlowSource(nil, args.(TestFailArguments).Content) - if err != nil { - return nil, err - } - return &testFailModule{ - mc: m, - content: args.(TestFailArguments).Content, - opts: opts, - fail: args.(TestFailArguments).Fail, - ch: make(chan error), - }, nil - }, - }) -} - -type TestFailArguments struct { - Content string `river:"content,attr"` - Fail bool `river:"fail,attr,optional"` -} - -type testFailModule struct { - content string - opts component.Options - ch chan error - mc *mod.ModuleComponent - fail bool -} - -func (t *testFailModule) Run(ctx context.Context) error { - go t.mc.RunFlowController(ctx) - <-ctx.Done() - return nil -} - -func (t *testFailModule) updateContent(content string) error { - t.content = content - err := t.mc.LoadFlowSource(nil, t.content) - return err -} - -func (t *testFailModule) Update(_ component.Arguments) error { - return nil + require.Eventually(t, func() bool { + // All should be cleaned up. + return len(f.modules.modules) == 0 + }, 10*time.Second, 100*time.Millisecond) } From 77777b0b319e6b389f51eb33e2761ce36bfef07e Mon Sep 17 00:00:00 2001 From: matt durham Date: Mon, 16 Oct 2023 17:13:06 -0400 Subject: [PATCH 18/18] add locks around the reads for tests, its a bit hacky. --- pkg/flow/module_fail_test.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index 895bea6d65ac..28fb0923a892 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -27,8 +27,10 @@ func TestIDRemovalIfFailedToLoad(t *testing.T) { return t1 != nil }, 10*time.Second, 100*time.Millisecond) require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() // This should be one due to t1. - return len(f.modules.modules) == 1 + return len(f.modules.List()) == 1 }, 10*time.Second, 100*time.Millisecond) badContent := `test.fail.module "bad" { @@ -39,9 +41,11 @@ fail=true // Because we have bad content this should fail, but the ids should be removed. require.Error(t, err) require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() // Only one since the bad one never should have been added. - rightLength := len(f.modules.modules) == 1 - _, foundT1 := f.modules.modules["test.fail.module.t1"] + rightLength := len(f.modules.List()) == 1 + _, foundT1 := f.modules.Get("test.fail.module.t1") return rightLength && foundT1 }, 10*time.Second, 100*time.Millisecond) // fail a second time to ensure the once is done again. @@ -56,14 +60,17 @@ fail=false err = t1.UpdateContent(goodContent) require.NoError(t, err) require.Eventually(t, func() bool { - rightLength := len(f.modules.modules) == 2 - _, foundT1 := f.modules.modules["test.fail.module.t1"] - _, foundGood := f.modules.modules["test.fail.module.t1/test.fail.module.good"] - return rightLength && foundT1 && foundGood + f.loadMut.RLock() + defer f.loadMut.RUnlock() + modT1, foundT1 := f.modules.Get("test.fail.module.t1") + modGood, foundGood := f.modules.Get("test.fail.module.t1/test.fail.module.good") + return modT1 != nil && modGood != nil && foundT1 && foundGood }, 10*time.Second, 100*time.Millisecond) cnc() require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() // All should be cleaned up. - return len(f.modules.modules) == 0 + return len(f.modules.List()) == 0 }, 10*time.Second, 100*time.Millisecond) }