diff --git a/CHANGELOG.md b/CHANGELOG.md index db1ccf5a9a39..0e8bf05d97b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,8 @@ Main (unreleased) - Fix a bug where a panic could occur when reloading custom components. (@wildum) +- The `import.git` config block did not work with branches or tags this now fixes that behavior. (@mattdurham) + ### Other changes - Clustering for Grafana Agent in Flow mode has graduated from beta to stable. diff --git a/internal/flow/import_git_test.go b/internal/flow/import_git_test.go index c2b4c4f9211c..12027c7cdfce 100644 --- a/internal/flow/import_git_test.go +++ b/internal/flow/import_git_test.go @@ -3,10 +3,13 @@ package flow_test import ( + "bufio" + "bytes" "context" "os" "os/exec" "path/filepath" + "strings" "sync" "testing" "time" @@ -25,19 +28,77 @@ func TestPullUpdating(t *testing.T) { // file based git repo then committing a file, running the component, then updating the file in the repo. testRepo := t.TempDir() - contents := `declare "add" { - argument "a" {} - argument "b" {} + main := ` +import.git "testImport" { + repository = "` + testRepo + `" + path = "math.river" + pull_frequency = "1s" +} + +testImport.add "cc" { + a = 1 + b = 1 +} +` + // Create our git repository. + runGit(t, testRepo, "init", testRepo) + + // Add the file we want. + math := filepath.Join(testRepo, "math.river") + err := os.WriteFile(math, []byte(contents), 0666) + require.NoError(t, err) + + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test\"") + + defer verifyNoGoroutineLeaks(t) + ctrl, f := setup(t, main) + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + ctrl.Run(ctx) + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") + return export["sum"] == 2 + }, 5*time.Second, 100*time.Millisecond) + + err = os.WriteFile(math, []byte(contentsMore), 0666) + require.NoError(t, err) + + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test2\"") + + // Check for final condition. + require.Eventually(t, func() bool { + export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") + return export["sum"] == 3 + }, 5*time.Second, 100*time.Millisecond) +} + +func TestPullUpdatingFromBranch(t *testing.T) { + testRepo := t.TempDir() - export "sum" { - value = argument.a.value + argument.b.value - } -}` main := ` import.git "testImport" { repository = "` + testRepo + `" path = "math.river" - pull_frequency = "5s" + pull_frequency = "1s" + revision = "testor" } testImport.add "cc" { @@ -45,21 +106,99 @@ testImport.add "cc" { b = 1 } ` - init := exec.Command("git", "init", testRepo) - err := init.Run() + runGit(t, testRepo, "init", testRepo) + + runGit(t, testRepo, "checkout", "-b", "testor") + + math := filepath.Join(testRepo, "math.river") + err := os.WriteFile(math, []byte(contents), 0666) + require.NoError(t, err) + + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test\"") + + defer verifyNoGoroutineLeaks(t) + ctrl, f := setup(t, main) + err = ctrl.LoadSource(f, nil) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + ctrl.Run(ctx) + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") + return export["sum"] == 2 + }, 5*time.Second, 100*time.Millisecond) + + err = os.WriteFile(math, []byte(contentsMore), 0666) + require.NoError(t, err) + + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test2\"") + + // Check for final condition. + require.Eventually(t, func() bool { + export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") + return export["sum"] == 3 + }, 5*time.Second, 100*time.Millisecond) +} + +func TestPullUpdatingFromHash(t *testing.T) { + testRepo := t.TempDir() + + runGit(t, testRepo, "init", testRepo) math := filepath.Join(testRepo, "math.river") - err = os.WriteFile(math, []byte(contents), 0666) + err := os.WriteFile(math, []byte(contents), 0666) require.NoError(t, err) - add := exec.Command("git", "add", ".") - add.Dir = testRepo - err = add.Run() + + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test\"") + + getHead := exec.Command("git", "rev-parse", "HEAD") + var stdBuffer bytes.Buffer + getHead.Dir = testRepo + getHead.Stdout = bufio.NewWriter(&stdBuffer) + err = getHead.Run() require.NoError(t, err) - commit := exec.Command("git", "commit", "-m \"test\"") - commit.Dir = testRepo - err = commit.Run() + hash := stdBuffer.String() + hash = strings.TrimSpace(hash) + + main := ` +import.git "testImport" { + repository = "` + testRepo + `" + path = "math.river" + pull_frequency = "1s" + revision = "` + hash + `" +} + +testImport.add "cc" { + a = 1 + b = 1 +} +` + + // After this update the sum should still be 2 and not 3 since it is pinned to the initial hash. + err = os.WriteFile(math, []byte(contentsMore), 0666) require.NoError(t, err) + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test2\"") + defer verifyNoGoroutineLeaks(t) ctrl, f := setup(t, main) err = ctrl.LoadSource(f, nil) @@ -78,33 +217,103 @@ testImport.add "cc" { ctrl.Run(ctx) }() - // Check for initial condition + // Check for final condition. require.Eventually(t, func() bool { export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") return export["sum"] == 2 - }, 3*time.Second, 10*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) +} - contentsMore := `declare "add" { - argument "a" {} - argument "b" {} +func TestPullUpdatingFromTag(t *testing.T) { + testRepo := t.TempDir() - export "sum" { - value = argument.a.value + argument.b.value + 1 - } -}` + runGit(t, testRepo, "init", testRepo) + + math := filepath.Join(testRepo, "math.river") + err := os.WriteFile(math, []byte(contents), 0666) + require.NoError(t, err) + + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test\"") + + runGit(t, testRepo, "tag", "-a", "tagtest", "-m", "testtag") + + main := ` +import.git "testImport" { + repository = "` + testRepo + `" + path = "math.river" + pull_frequency = "1s" + revision = "tagtest" +} + +testImport.add "cc" { + a = 1 + b = 1 +} +` + + // After this update the sum should still be 2 and not 3 since it is pinned to the tag. err = os.WriteFile(math, []byte(contentsMore), 0666) require.NoError(t, err) - add2 := exec.Command("git", "add", ".") - add2.Dir = testRepo - add2.Run() - commit2 := exec.Command("git", "commit", "-m \"test2\"") - commit2.Dir = testRepo - commit2.Run() + runGit(t, testRepo, "add", ".") + + runGit(t, testRepo, "commit", "-m \"test2\"") + + defer verifyNoGoroutineLeaks(t) + + ctrl, f := setup(t, main) + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + ctrl.Run(ctx) + }() // Check for final condition. require.Eventually(t, func() bool { export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") - return export["sum"] == 3 - }, 20*time.Second, 1*time.Millisecond) + return export["sum"] == 2 + }, 5*time.Second, 100*time.Millisecond) +} + +func runGit(t *testing.T, dir string, args ...string) { + exe := exec.Command("git", args...) + var stdErr bytes.Buffer + exe.Stderr = bufio.NewWriter(&stdErr) + exe.Dir = dir + err := exe.Run() + errTxt := stdErr.String() + if err != nil { + t.Error(errTxt) + } + require.NoErrorf(t, err, "command git %v failed", args) } + +const contents = `declare "add" { + argument "a" {} + argument "b" {} + + export "sum" { + value = argument.a.value + argument.b.value + } +}` + +const contentsMore = `declare "add" { + argument "a" {} + argument "b" {} + + export "sum" { + value = argument.a.value + argument.b.value + 1 + } +}` diff --git a/internal/vcs/git.go b/internal/vcs/git.go index 788c1029f785..41bd20949070 100644 --- a/internal/vcs/git.go +++ b/internal/vcs/git.go @@ -86,28 +86,18 @@ func NewGitRepo(ctx context.Context, storagePath string, opts GitRepoOptions) (* } } - // Finally, hard reset to our requested revision. - hash, err := findRevision(opts.Revision, repo) - if err != nil { - return nil, InvalidRevisionError{Revision: opts.Revision} - } - - workTree, err := repo.Worktree() - if err != nil { - return nil, err - } - err = workTree.Reset(&git.ResetOptions{ - Commit: hash, - Mode: git.HardReset, - }) - if err != nil { - return nil, err + checkoutErr := checkout(opts.Revision, repo) + if checkoutErr != nil { + return nil, UpdateFailedError{ + Repository: opts.Repository, + Inner: checkoutErr, + } } return &GitRepo{ opts: opts, repo: repo, - workTree: workTree, + workTree: wt, }, err } @@ -119,7 +109,6 @@ func isRepoCloned(dir string) bool { // Update updates the repository by pulling new content and re-checking out to // latest version of Revision. func (repo *GitRepo) Update(ctx context.Context) error { - var err error pullRepoErr := repo.workTree.PullContext(ctx, &git.PullOptions{ RemoteName: "origin", Force: true, @@ -132,17 +121,12 @@ func (repo *GitRepo) Update(ctx context.Context) error { } } - // Find the latest revision being requested and hard-reset to it. - hash, err := findRevision(repo.opts.Revision, repo.repo) - if err != nil { - return InvalidRevisionError{Revision: repo.opts.Revision} - } - err = repo.workTree.Reset(&git.ResetOptions{ - Commit: hash, - Mode: git.HardReset, - }) - if err != nil { - return err + checkoutErr := checkout(repo.opts.Revision, repo.repo) + if checkoutErr != nil { + return UpdateFailedError{ + Repository: repo.opts.Repository, + Inner: checkoutErr, + } } return nil @@ -186,24 +170,41 @@ func (repo *GitRepo) CurrentRevision() (string, error) { return ref.Hash().String(), nil } -func findRevision(rev string, repo *git.Repository) (plumbing.Hash, error) { +// Depending on the type of revision we need to handle checkout differently. +// Tags are checked out as branches +// Branches as branches +// Commits are commits +func checkout(rev string, repo *git.Repository) error { // Try looking for the revision in the following order: // // 1. Search by tag name. // 2. Search by remote ref name. // 3. Try to resolve the revision directly. + wt, err := repo.Worktree() + if err != nil { + return err + } if tagRef, err := repo.Tag(rev); err == nil { - return tagRef.Hash(), nil + return wt.Checkout(&git.CheckoutOptions{ + Branch: tagRef.Name(), + Force: true, + }) } if remoteRef, err := repo.Reference(plumbing.NewRemoteReferenceName("origin", rev), true); err == nil { - return remoteRef.Hash(), nil + return wt.Checkout(&git.CheckoutOptions{ + Branch: remoteRef.Name(), + Force: true, + }) } if hash, err := repo.ResolveRevision(plumbing.Revision(rev)); err == nil { - return *hash, nil + return wt.Checkout(&git.CheckoutOptions{ + Hash: *hash, + Force: true, + }) } - return plumbing.ZeroHash, plumbing.ErrReferenceNotFound + return plumbing.ErrReferenceNotFound }