diff --git a/pkg/cache/file/file.go b/pkg/cache/file/file.go index 423913f..d7f6022 100644 --- a/pkg/cache/file/file.go +++ b/pkg/cache/file/file.go @@ -39,6 +39,7 @@ func NewFileCache(dir string) (cache.Cache, error) { } // Store stores the object and returns the metadata +// Fails if the object already exists func (f *Cache) Store(_ context.Context, id string, content io.Reader) (cache.Object, error) { if id == "" { return cache.Object{}, fmt.Errorf("%w id cannot be empty", cache.ErrCreatingObject) @@ -49,6 +50,11 @@ func (f *Cache) Store(_ context.Context, id string, content io.Reader) (cache.Ob } objectDir := filepath.Join(f.dir, id) + + if _, err := os.Stat(objectDir); !errors.Is(err, os.ErrNotExist) { + return cache.Object{}, fmt.Errorf("%w: object already exists %q", cache.ErrCreatingObject, id) + } + // TODO: check permissions err := os.MkdirAll(objectDir, 0o750) if err != nil { diff --git a/pkg/cache/file/file_test.go b/pkg/cache/file/file_test.go index 4353119..ad82486 100644 --- a/pkg/cache/file/file_test.go +++ b/pkg/cache/file/file_test.go @@ -56,8 +56,9 @@ func TestFileCacheStoreObject(t *testing.T) { content: []byte("content"), }, }, - id: "object", - content: []byte("new content"), + id: "object", + content: []byte("new content"), + expectErr: cache.ErrCreatingObject, }, { title: "store empty object", diff --git a/pkg/local/local.go b/pkg/local/local.go index 5b94893..185ddcc 100644 --- a/pkg/local/local.go +++ b/pkg/local/local.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "sort" + "sync" "github.com/grafana/k6build" "github.com/grafana/k6build/pkg/cache" @@ -44,6 +45,7 @@ type localBuildSrv struct { catalog k6catalog.Catalog builder k6foundry.Builder cache cache.Cache + mutexes sync.Map } // NewBuildService creates a local build service using the given configuration @@ -158,6 +160,9 @@ func (b *localBuildSrv) Build( } id := fmt.Sprintf("%x", sha1.Sum(hashData.Bytes())) //nolint:gosec + unlock := b.lockArtifact(id) + defer unlock() + artifactObject, err := b.cache.Get(ctx, id) if err == nil { return k6build.Artifact{ @@ -192,3 +197,18 @@ func (b *localBuildSrv) Build( Platform: platform, }, nil } + +// lockArtifact obtains a mutex used to prevent concurrent builds of the same artifact and +// returns a function that will unlock the mutex associated to the given id in the cache. +// The lock is also removed from the map. Subsequent calls will get another lock on the same +// id but this is safe as the object should already be in the cache and no further builds are needed. +func (b *localBuildSrv) lockArtifact(id string) func() { + value, _ := b.mutexes.LoadOrStore(id, &sync.Mutex{}) + mtx, _ := value.(*sync.Mutex) + mtx.Lock() + + return func() { + b.mutexes.Delete(id) + mtx.Unlock() + } +} diff --git a/pkg/local/local_test.go b/pkg/local/local_test.go index 1cf21c8..2fed991 100644 --- a/pkg/local/local_test.go +++ b/pkg/local/local_test.go @@ -3,6 +3,7 @@ package local import ( "context" "errors" + "sync" "testing" "github.com/grafana/k6build" @@ -242,3 +243,66 @@ func TestIdempotentBuild(t *testing.T) { } }) } + +// TestConcurrentBuilds tests that is sage to build the same artifact concurrently and that +// concurrent builds of different artifacts are not affected. +// The test uses a local test setup backed by a file cache. +// Attempting to write the same artifact twice will return an error. +func TestConcurrentBuilds(t *testing.T) { + t.Parallel() + buildsrv, err := SetupTestLocalBuildService(t) + if err != nil { + t.Fatalf("test setup %v", err) + } + + builds := []struct { + k6Ver string + deps []k6build.Dependency + }{ + { + k6Ver: "v0.1.0", + deps: []k6build.Dependency{ + {Name: "k6/x/ext", Constraints: "v0.1.0"}, + }, + }, + { + k6Ver: "v0.1.0", + deps: []k6build.Dependency{ + {Name: "k6/x/ext", Constraints: "v0.1.0"}, + }, + }, + { + k6Ver: "v0.2.0", + deps: []k6build.Dependency{ + {Name: "k6/x/ext", Constraints: "v0.1.0"}, + }, + }, + } + + errch := make(chan error, len(builds)) + + wg := sync.WaitGroup{} + for _, b := range builds { + wg.Add(1) + go func() { + defer wg.Done() + + if _, err := buildsrv.Build( + context.TODO(), + "linux/amd64", + b.k6Ver, + b.deps, + ); err != nil { + errch <- err + } + }() + } + + wg.Wait() + + select { + case err := <-errch: + t.Fatalf("unexpected %v", err) + default: + } +}