Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module id fail #5261

Merged
merged 24 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6db453b
push changes
mattdurham Sep 17, 2023
a2c2755
Add remove whenever using the component module.
mattdurham Sep 20, 2023
13c99ca
Add additional context and remove dead file.
mattdurham Sep 21, 2023
dfa0b77
Add long changelog comment
mattdurham Sep 21, 2023
01552ce
merge main
mattdurham Sep 21, 2023
f8f9930
fix linter
mattdurham Sep 21, 2023
8ceafb2
Remove mutex check
mattdurham Sep 21, 2023
54b238c
Add changes to support module id removal.
mattdurham Oct 5, 2023
91c0abe
Remove unneeded line
mattdurham Oct 5, 2023
f7caffe
main merge
mattdurham Oct 5, 2023
cb8d986
Fix merge errors.
mattdurham Oct 5, 2023
92cb1c1
Ensure module is checked on first run.
mattdurham Oct 10, 2023
f9e89e0
rename and add comments
mattdurham Oct 10, 2023
6d28509
Add manual removal back in and make test closer to actual usage.
mattdurham Oct 10, 2023
a96a32e
Merge branch 'main' into module_id_fail
mattdurham Oct 10, 2023
6c44b21
move changelog comment to correct location
mattdurham Oct 10, 2023
db23876
A different approach by keying off run instead of build.
mattdurham Oct 11, 2023
a9d509d
Add test for duplicate registration.
mattdurham Oct 11, 2023
e81a2ef
Minor changes
mattdurham Oct 12, 2023
e5588be
Merge branch 'main' into module_id_fail
mattdurham Oct 12, 2023
e69db8c
PR feedback
mattdurham Oct 16, 2023
acaae50
Merge remote-tracking branch 'origin/module_id_fail' into module_id_fail
mattdurham Oct 16, 2023
77777b0
add locks around the reads for tests, its a bit hacky.
mattdurham Oct 16, 2023
9b9f7ad
Merge branch 'main' into module_id_fail
mattdurham Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Main (unreleased)

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
allowing invalid combination of configuration options. (@thampiotr)

- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to
permanently fail to load with `id already exists` error. (@mattdurham)

### Enhancements

Expand Down
16 changes: 15 additions & 1 deletion pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"testing"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/dag"
"github.com/grafana/agent/pkg/flow/logging"
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string, availableServices []string) controller.ModuleController {
return nil
return fakeModuleController{}
},
},
}
Expand Down Expand Up @@ -304,3 +305,16 @@ func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) {
}
require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match")
}

type fakeModuleController struct{}

func (f fakeModuleController) NewModule(id string, export component.ExportFunc) (component.Module, error) {
return nil, nil
}

func (f fakeModuleController) ModuleIDs() []string {
return nil
}

func (f fakeModuleController) ClearModuleIDs() {
}
3 changes: 3 additions & 0 deletions pkg/flow/internal/controller/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ type ModuleController interface {

// ModuleIDs returns the list of managed modules in unspecified order.
ModuleIDs() []string

// ClearModuleIDs removes all the module id references from the controller.
ClearModuleIDs()
}
34 changes: 22 additions & 12 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,19 @@ type ComponentGlobals struct {
// arguments and exports. ComponentNode manages the arguments for the component
// from a River block.
type ComponentNode struct {
id ComponentID
globalID string
label string
componentName string
nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.
reg component.Registration
managedOpts component.Options
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time
id ComponentID
globalID string
label string
componentName string
nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.
reg component.Registration
managedOpts component.Options
registry *prometheus.Registry
exportsType reflect.Type
moduleFailureCheck sync.Once // This is used on first startup to see if module loaded correctly and if not then clean up registry.
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time

mut sync.RWMutex
block *ast.BlockStmt // Current River block to derive args from
Expand Down Expand Up @@ -257,6 +258,13 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {
msg := fmt.Sprintf("component evaluation failed: %s", err)
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
}
// Check for failure on initial loading, if not then we need to remove any modules it the component may habe created.
// In the case where the component is not a module loader this is a noop.
cn.moduleFailureCheck.Do(func() {
if err != nil {
cn.moduleController.ClearModuleIDs()
}
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a problem here: what if a component fails to be constructed more than once?

Rather than using a sync.Once, this check could be moved down to line 289 (i.e., if cn.reg.Build fails) so you only clear module IDs for components which can't be built, no matter how many times the construction fails.


return err
}
Expand Down Expand Up @@ -312,6 +320,8 @@ func (cn *ComponentNode) Run(ctx context.Context) error {
cn.mut.RLock()
managed := cn.managed
cn.mut.RUnlock()
// When finished running clear the module ids.
defer cn.moduleController.ClearModuleIDs()
rfratto marked this conversation as resolved.
Show resolved Hide resolved

if managed == nil {
return ErrUnevaluated
Expand Down
13 changes: 12 additions & 1 deletion pkg/flow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,19 @@ func (m *moduleController) removeID(id string) {
m.mut.Lock()
defer m.mut.Unlock()

delete(m.modules, id)
m.o.ModuleRegistry.Unregister(id)
delete(m.modules, id)
}

// ClearModuleIDs removes all ids from the registry.
func (m *moduleController) ClearModuleIDs() {
m.mut.Lock()
defer m.mut.Unlock()

for id := range m.modules {
m.o.ModuleRegistry.Unregister(id)
}
m.modules = make(map[string]struct{})
}

// ModuleIDs implements [controller.ModuleController].
Expand Down
105 changes: 105 additions & 0 deletions pkg/flow/module_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package flow

import (
"context"
"fmt"
"testing"
"time"

"github.com/grafana/agent/component"
mod "github.com/grafana/agent/component/module"
"github.com/stretchr/testify/require"
)

func TestIDRemovalIfFailedToLoad(t *testing.T) {
f := New(testOptions(t))

fullContent := "test.fail.module \"t1\" { content = \"\" }"
fl, err := ParseSource("test", []byte(fullContent))
require.NoError(t, err)
err = f.LoadSource(fl, nil)
require.NoError(t, err)
ctx := context.Background()
ctx, cnc := context.WithTimeout(ctx, 600*time.Second)

go f.Run(ctx)
require.Eventually(t, func() bool {
t1 := f.loader.Components()[0].Component().(*testFailModule)
return t1 != nil
}, 10*time.Second, 100*time.Millisecond)
t1 := f.loader.Components()[0].Component().(*testFailModule)
badContent :=
`test.fail.module "int" {
content=""
fail=true
}`
err = t1.updateContent(badContent)
// Because we have bad content this should fail, but the ids should be removed.
require.Error(t, err)
goodContent :=
`test.fail.module "int" {
content=""
fail=false
}`
err = t1.updateContent(goodContent)
require.NoError(t, err)
cnc()
}

func init() {
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
component.Register(component.Registration{
Name: "test.fail.module",
Args: TestFailArguments{},
Exports: mod.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
m, err := mod.NewModuleComponent(opts)
if err != nil {
return nil, err
}
if args.(TestFailArguments).Fail {
return nil, fmt.Errorf("module told to fail")
}
err = m.LoadFlowSource(nil, args.(TestFailArguments).Content)
if err != nil {
return nil, err
}
return &testFailModule{
mc: m,
content: args.(TestFailArguments).Content,
opts: opts,
fail: args.(TestFailArguments).Fail,
ch: make(chan error),
}, nil
},
})
}

type TestFailArguments struct {
Content string `river:"content,attr"`
Fail bool `river:"fail,attr,optional"`
}

type testFailModule struct {
content string
opts component.Options
ch chan error
mc *mod.ModuleComponent
fail bool
}

func (t *testFailModule) Run(ctx context.Context) error {
go t.mc.RunFlowController(ctx)
<-ctx.Done()
return nil
}

func (t *testFailModule) updateContent(content string) error {
t.content = content
err := t.mc.LoadFlowSource(nil, t.content)
return err
}

func (t *testFailModule) Update(_ component.Arguments) error {
return nil
}
18 changes: 0 additions & 18 deletions pkg/flow/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,6 @@ func TestIDCollision(t *testing.T) {
require.Nil(t, m)
}

func TestIDRemoval(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
opts := testModuleControllerOptions(t)
defer opts.WorkerPool.Stop()
opts.ID = "test"
nc := newModuleController(opts)
m, err := nc.NewModule("t1", func(exports map[string]any) {})
require.NoError(t, err)
err = m.LoadConfig([]byte(""), nil)
require.NoError(t, err)
require.NotNil(t, m)
ctx := context.Background()
ctx, cncl := context.WithTimeout(ctx, 1*time.Second)
defer cncl()
m.Run(ctx)
require.Len(t, nc.(*moduleController).modules, 0)
}

func testModuleControllerOptions(t *testing.T) *moduleControllerOptions {
t.Helper()

Expand Down