Skip to content

Commit

Permalink
This is to handle myriad ways that git revision can be used. (#6803)
Browse files Browse the repository at this point in the history
* add more complex behavior to git to handle branches,tags and hashes

* add changelog

* small order change

* simplify changes

* clean up

* lint changes

* remove chdir

* standardize pull frequency and eventually timing
  • Loading branch information
mattdurham authored Apr 2, 2024
1 parent 1570f97 commit d1afb48
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 68 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Main (unreleased)

- Fix a bug where a panic could occur when reloading custom components. (@wildum)

- The `import.git` config block did not work with branches or tags this now fixes that behavior. (@mattdurham)

### Other changes

- Clustering for Grafana Agent in Flow mode has graduated from beta to stable.
Expand Down
277 changes: 243 additions & 34 deletions internal/flow/import_git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
package flow_test

import (
"bufio"
"bytes"
"context"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand All @@ -25,41 +28,177 @@ func TestPullUpdating(t *testing.T) {
// file based git repo then committing a file, running the component, then updating the file in the repo.
testRepo := t.TempDir()

contents := `declare "add" {
argument "a" {}
argument "b" {}
main := `
import.git "testImport" {
repository = "` + testRepo + `"
path = "math.river"
pull_frequency = "1s"
}
testImport.add "cc" {
a = 1
b = 1
}
`
// Create our git repository.
runGit(t, testRepo, "init", testRepo)

// Add the file we want.
math := filepath.Join(testRepo, "math.river")
err := os.WriteFile(math, []byte(contents), 0666)
require.NoError(t, err)

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
err = ctrl.LoadSource(f, nil)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 2
}, 5*time.Second, 100*time.Millisecond)

err = os.WriteFile(math, []byte(contentsMore), 0666)
require.NoError(t, err)

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test2\"")

// Check for final condition.
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 3
}, 5*time.Second, 100*time.Millisecond)
}

func TestPullUpdatingFromBranch(t *testing.T) {
testRepo := t.TempDir()

export "sum" {
value = argument.a.value + argument.b.value
}
}`
main := `
import.git "testImport" {
repository = "` + testRepo + `"
path = "math.river"
pull_frequency = "5s"
pull_frequency = "1s"
revision = "testor"
}
testImport.add "cc" {
a = 1
b = 1
}
`
init := exec.Command("git", "init", testRepo)
err := init.Run()
runGit(t, testRepo, "init", testRepo)

runGit(t, testRepo, "checkout", "-b", "testor")

math := filepath.Join(testRepo, "math.river")
err := os.WriteFile(math, []byte(contents), 0666)
require.NoError(t, err)

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
err = ctrl.LoadSource(f, nil)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 2
}, 5*time.Second, 100*time.Millisecond)

err = os.WriteFile(math, []byte(contentsMore), 0666)
require.NoError(t, err)

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test2\"")

// Check for final condition.
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 3
}, 5*time.Second, 100*time.Millisecond)
}

func TestPullUpdatingFromHash(t *testing.T) {
testRepo := t.TempDir()

runGit(t, testRepo, "init", testRepo)
math := filepath.Join(testRepo, "math.river")
err = os.WriteFile(math, []byte(contents), 0666)
err := os.WriteFile(math, []byte(contents), 0666)
require.NoError(t, err)
add := exec.Command("git", "add", ".")
add.Dir = testRepo
err = add.Run()

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test\"")

getHead := exec.Command("git", "rev-parse", "HEAD")
var stdBuffer bytes.Buffer
getHead.Dir = testRepo
getHead.Stdout = bufio.NewWriter(&stdBuffer)
err = getHead.Run()
require.NoError(t, err)
commit := exec.Command("git", "commit", "-m \"test\"")
commit.Dir = testRepo
err = commit.Run()
hash := stdBuffer.String()
hash = strings.TrimSpace(hash)

main := `
import.git "testImport" {
repository = "` + testRepo + `"
path = "math.river"
pull_frequency = "1s"
revision = "` + hash + `"
}
testImport.add "cc" {
a = 1
b = 1
}
`

// After this update the sum should still be 2 and not 3 since it is pinned to the initial hash.
err = os.WriteFile(math, []byte(contentsMore), 0666)
require.NoError(t, err)

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test2\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
err = ctrl.LoadSource(f, nil)
Expand All @@ -78,33 +217,103 @@ testImport.add "cc" {
ctrl.Run(ctx)
}()

// Check for initial condition
// Check for final condition.
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 2
}, 3*time.Second, 10*time.Millisecond)
}, 5*time.Second, 100*time.Millisecond)
}

contentsMore := `declare "add" {
argument "a" {}
argument "b" {}
func TestPullUpdatingFromTag(t *testing.T) {
testRepo := t.TempDir()

export "sum" {
value = argument.a.value + argument.b.value + 1
}
}`
runGit(t, testRepo, "init", testRepo)

math := filepath.Join(testRepo, "math.river")
err := os.WriteFile(math, []byte(contents), 0666)
require.NoError(t, err)

runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test\"")

runGit(t, testRepo, "tag", "-a", "tagtest", "-m", "testtag")

main := `
import.git "testImport" {
repository = "` + testRepo + `"
path = "math.river"
pull_frequency = "1s"
revision = "tagtest"
}
testImport.add "cc" {
a = 1
b = 1
}
`

// After this update the sum should still be 2 and not 3 since it is pinned to the tag.
err = os.WriteFile(math, []byte(contentsMore), 0666)
require.NoError(t, err)
add2 := exec.Command("git", "add", ".")
add2.Dir = testRepo
add2.Run()

commit2 := exec.Command("git", "commit", "-m \"test2\"")
commit2.Dir = testRepo
commit2.Run()
runGit(t, testRepo, "add", ".")

runGit(t, testRepo, "commit", "-m \"test2\"")

defer verifyNoGoroutineLeaks(t)

ctrl, f := setup(t, main)
err = ctrl.LoadSource(f, nil)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for final condition.
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 3
}, 20*time.Second, 1*time.Millisecond)
return export["sum"] == 2
}, 5*time.Second, 100*time.Millisecond)
}

func runGit(t *testing.T, dir string, args ...string) {
exe := exec.Command("git", args...)
var stdErr bytes.Buffer
exe.Stderr = bufio.NewWriter(&stdErr)
exe.Dir = dir
err := exe.Run()
errTxt := stdErr.String()
if err != nil {
t.Error(errTxt)
}
require.NoErrorf(t, err, "command git %v failed", args)
}

const contents = `declare "add" {
argument "a" {}
argument "b" {}
export "sum" {
value = argument.a.value + argument.b.value
}
}`

const contentsMore = `declare "add" {
argument "a" {}
argument "b" {}
export "sum" {
value = argument.a.value + argument.b.value + 1
}
}`
Loading

0 comments on commit d1afb48

Please sign in to comment.