Skip to content

Commit

Permalink
Merge pull request #38 from grafana/prevent-concurrent-builds
Browse files Browse the repository at this point in the history
prevent concurrent builds of same artifact
  • Loading branch information
pablochacin authored Sep 19, 2024
2 parents 14ee86c + 6e7173f commit 8c930cd
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pkg/cache/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewFileCache(dir string) (cache.Cache, error) {
}

// Store stores the object and returns the metadata
// Fails if the object already exists
func (f *Cache) Store(_ context.Context, id string, content io.Reader) (cache.Object, error) {
if id == "" {
return cache.Object{}, fmt.Errorf("%w id cannot be empty", cache.ErrCreatingObject)
Expand All @@ -49,6 +50,11 @@ func (f *Cache) Store(_ context.Context, id string, content io.Reader) (cache.Ob
}

objectDir := filepath.Join(f.dir, id)

if _, err := os.Stat(objectDir); !errors.Is(err, os.ErrNotExist) {
return cache.Object{}, fmt.Errorf("%w: object already exists %q", cache.ErrCreatingObject, id)
}

// TODO: check permissions
err := os.MkdirAll(objectDir, 0o750)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func TestFileCacheStoreObject(t *testing.T) {
content: []byte("content"),
},
},
id: "object",
content: []byte("new content"),
id: "object",
content: []byte("new content"),
expectErr: cache.ErrCreatingObject,
},
{
title: "store empty object",
Expand Down
20 changes: 20 additions & 0 deletions pkg/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"sort"
"sync"

"github.com/grafana/k6build"
"github.com/grafana/k6build/pkg/cache"
Expand Down Expand Up @@ -44,6 +45,7 @@ type localBuildSrv struct {
catalog k6catalog.Catalog
builder k6foundry.Builder
cache cache.Cache
mutexes sync.Map
}

// NewBuildService creates a local build service using the given configuration
Expand Down Expand Up @@ -158,6 +160,9 @@ func (b *localBuildSrv) Build(
}
id := fmt.Sprintf("%x", sha1.Sum(hashData.Bytes())) //nolint:gosec

unlock := b.lockArtifact(id)
defer unlock()

artifactObject, err := b.cache.Get(ctx, id)
if err == nil {
return k6build.Artifact{
Expand Down Expand Up @@ -192,3 +197,18 @@ func (b *localBuildSrv) Build(
Platform: platform,
}, nil
}

// lockArtifact obtains a mutex used to prevent concurrent builds of the same artifact and
// returns a function that will unlock the mutex associated to the given id in the cache.
// The lock is also removed from the map. Subsequent calls will get another lock on the same
// id but this is safe as the object should already be in the cache and no further builds are needed.
func (b *localBuildSrv) lockArtifact(id string) func() {
value, _ := b.mutexes.LoadOrStore(id, &sync.Mutex{})
mtx, _ := value.(*sync.Mutex)
mtx.Lock()

return func() {
b.mutexes.Delete(id)
mtx.Unlock()
}
}
64 changes: 64 additions & 0 deletions pkg/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"errors"
"sync"
"testing"

"github.com/grafana/k6build"
Expand Down Expand Up @@ -242,3 +243,66 @@ func TestIdempotentBuild(t *testing.T) {
}
})
}

// TestConcurrentBuilds tests that is sage to build the same artifact concurrently and that
// concurrent builds of different artifacts are not affected.
// The test uses a local test setup backed by a file cache.
// Attempting to write the same artifact twice will return an error.
func TestConcurrentBuilds(t *testing.T) {
t.Parallel()
buildsrv, err := SetupTestLocalBuildService(t)
if err != nil {
t.Fatalf("test setup %v", err)
}

builds := []struct {
k6Ver string
deps []k6build.Dependency
}{
{
k6Ver: "v0.1.0",
deps: []k6build.Dependency{
{Name: "k6/x/ext", Constraints: "v0.1.0"},
},
},
{
k6Ver: "v0.1.0",
deps: []k6build.Dependency{
{Name: "k6/x/ext", Constraints: "v0.1.0"},
},
},
{
k6Ver: "v0.2.0",
deps: []k6build.Dependency{
{Name: "k6/x/ext", Constraints: "v0.1.0"},
},
},
}

errch := make(chan error, len(builds))

wg := sync.WaitGroup{}
for _, b := range builds {
wg.Add(1)
go func() {
defer wg.Done()

if _, err := buildsrv.Build(
context.TODO(),
"linux/amd64",
b.k6Ver,
b.deps,
); err != nil {
errch <- err
}
}()
}

wg.Wait()

select {
case err := <-errch:
t.Fatalf("unexpected %v", err)
default:
}
}

0 comments on commit 8c930cd

Please sign in to comment.