From 3b52a202d04a37770ce4b3f76b2e4600e4121cc4 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Wang Date: Sun, 7 Apr 2024 08:02:54 +0000 Subject: [PATCH 1/5] fix: cancel goroutine before the next one is created Signed-off-by: Xiaoxuan Wang --- copy_test.go | 21 +++++++++++++-------- go.mod | 2 +- go.sum | 4 ++-- internal/syncutil/limit.go | 14 ++++++++++++-- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/copy_test.go b/copy_test.go index 02421524..4fd04772 100644 --- a/copy_test.go +++ b/copy_test.go @@ -1779,7 +1779,9 @@ func TestCopyGraph_WithOptions(t *testing.T) { t.Run("MountFrom error", func(t *testing.T) { root = descs[6] dst := &countingStorage{storage: cas.NewMemory()} - opts = oras.CopyGraphOptions{} + opts = oras.CopyGraphOptions{ + Concurrency: 1, + } var numMountFrom atomic.Int64 e := errors.New("mountFrom error") opts.MountFrom = func(ctx context.Context, desc ocispec.Descriptor) ([]string, error) { @@ -1790,7 +1792,8 @@ func TestCopyGraph_WithOptions(t *testing.T) { t.Fatalf("CopyGraph() error = %v, wantErr %v", err, e) } - if got, expected := dst.numExists.Load(), int64(7); got != expected { + // with a very low probability, dst.numExists may be 3 + if got, expected := dst.numExists.Load(), int64(4); got != expected { t.Errorf("count(Exists()) = %d, want %d", got, expected) } if got, expected := dst.numFetch.Load(), int64(0); got != expected { @@ -1799,7 +1802,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { if got, expected := dst.numPush.Load(), int64(0); got != expected { t.Errorf("count(Push()) = %d, want %d", got, expected) } - if got, expected := numMountFrom.Load(), int64(4); got != expected { + if got, expected := numMountFrom.Load(), int64(1); got != expected { t.Errorf("count(MountFrom()) = %d, want %d", got, expected) } }) @@ -1828,7 +1831,9 @@ func TestCopyGraph_WithOptions(t *testing.T) { } return nil } - opts = oras.CopyGraphOptions{} + opts = oras.CopyGraphOptions{ + Concurrency: 1, + } var numPreCopy, numPostCopy, numOnMounted, numMountFrom atomic.Int64 opts.PreCopy = func(ctx context.Context, desc ocispec.Descriptor) error { numPreCopy.Add(1) @@ -1851,7 +1856,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { t.Fatalf("CopyGraph() error = %v, wantErr %v", err, e) } - if got, expected := dst.numExists.Load(), int64(7); got != expected { + if got, expected := dst.numExists.Load(), int64(4); got != expected { t.Errorf("count(Exists()) = %d, want %d", got, expected) } if got, expected := dst.numFetch.Load(), int64(0); got != expected { @@ -1860,13 +1865,13 @@ func TestCopyGraph_WithOptions(t *testing.T) { if got, expected := dst.numPush.Load(), int64(0); got != expected { t.Errorf("count(Push()) = %d, want %d", got, expected) } - if got, expected := numMount.Load(), int64(4); got != expected { + if got, expected := numMount.Load(), int64(1); got != expected { t.Errorf("count(Mount()) = %d, want %d", got, expected) } - if got, expected := numOnMounted.Load(), int64(4); got != expected { + if got, expected := numOnMounted.Load(), int64(1); got != expected { t.Errorf("count(OnMounted()) = %d, want %d", got, expected) } - if got, expected := numMountFrom.Load(), int64(4); got != expected { + if got, expected := numMountFrom.Load(), int64(1); got != expected { t.Errorf("count(MountFrom()) = %d, want %d", got, expected) } if got, expected := numPreCopy.Load(), int64(0); got != expected { diff --git a/go.mod b/go.mod index 85b83d90..bd267939 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,5 @@ go 1.21 require ( github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.7.0 ) diff --git a/go.sum b/go.sum index 9b89e8ae..eec227b2 100644 --- a/go.sum +++ b/go.sum @@ -2,5 +2,5 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= diff --git a/internal/syncutil/limit.go b/internal/syncutil/limit.go index 2a05d4ea..3b28b8ed 100644 --- a/internal/syncutil/limit.go +++ b/internal/syncutil/limit.go @@ -17,6 +17,7 @@ package syncutil import ( "context" + "sync/atomic" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -68,15 +69,24 @@ type GoFunc[T any] func(ctx context.Context, region *LimitedRegion, t T) error // Go concurrently invokes fn on items. func Go[T any](ctx context.Context, limiter *semaphore.Weighted, fn GoFunc[T], items ...T) error { eg, egCtx := errgroup.WithContext(ctx) + var egErr atomic.Value for _, item := range items { - region := LimitRegion(ctx, limiter) + region := LimitRegion(egCtx, limiter) if err := region.Start(); err != nil { + if egErr, ok := egErr.Load().(error); ok && egErr != nil { + return egErr + } return err } eg.Go(func(t T) func() error { return func() error { defer region.End() - return fn(egCtx, region, t) + // cancel the gorountine before the next goroutine is created + err := fn(egCtx, region, t) + if err != nil { + egErr.CompareAndSwap(nil, err) + } + return err } }(item)) } From ab4698d100c8decaf0b57a810734fcde03926526 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Wang Date: Sun, 7 Apr 2024 09:11:26 +0000 Subject: [PATCH 2/5] added comment Signed-off-by: Xiaoxuan Wang --- copy_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/copy_test.go b/copy_test.go index 4fd04772..46d41b5d 100644 --- a/copy_test.go +++ b/copy_test.go @@ -1780,6 +1780,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { root = descs[6] dst := &countingStorage{storage: cas.NewMemory()} opts = oras.CopyGraphOptions{ + // to make the run result deterministic, we limit concurrency to 1 Concurrency: 1, } var numMountFrom atomic.Int64 @@ -1832,6 +1833,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { return nil } opts = oras.CopyGraphOptions{ + // to make the run result deterministic, we limit concurrency to 1 Concurrency: 1, } var numPreCopy, numPostCopy, numOnMounted, numMountFrom atomic.Int64 From 6ce4acc2d2f115ec4f280ce4977d0d4ad5837061 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Wang Date: Mon, 8 Apr 2024 05:58:16 +0000 Subject: [PATCH 3/5] resolve comments Signed-off-by: Xiaoxuan Wang --- copy_test.go | 3 ++- internal/syncutil/limit.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/copy_test.go b/copy_test.go index 46d41b5d..4658dc88 100644 --- a/copy_test.go +++ b/copy_test.go @@ -1793,7 +1793,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { t.Fatalf("CopyGraph() error = %v, wantErr %v", err, e) } - // with a very low probability, dst.numExists may be 3 + // with a low probability, dst.numExists may be 3 or 5 if got, expected := dst.numExists.Load(), int64(4); got != expected { t.Errorf("count(Exists()) = %d, want %d", got, expected) } @@ -1803,6 +1803,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { if got, expected := dst.numPush.Load(), int64(0); got != expected { t.Errorf("count(Push()) = %d, want %d", got, expected) } + // with a low probability, dst.numExists may be 2 if got, expected := numMountFrom.Load(), int64(1); got != expected { t.Errorf("count(MountFrom()) = %d, want %d", got, expected) } diff --git a/internal/syncutil/limit.go b/internal/syncutil/limit.go index 3b28b8ed..d0a3d88a 100644 --- a/internal/syncutil/limit.go +++ b/internal/syncutil/limit.go @@ -85,8 +85,9 @@ func Go[T any](ctx context.Context, limiter *semaphore.Weighted, fn GoFunc[T], i err := fn(egCtx, region, t) if err != nil { egErr.CompareAndSwap(nil, err) + return err } - return err + return nil } }(item)) } From 8be8ad200e8a69bf070ed89843e554e87be08b11 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Wang Date: Mon, 8 Apr 2024 07:47:20 +0000 Subject: [PATCH 4/5] fix test Signed-off-by: Xiaoxuan Wang --- copy_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/copy_test.go b/copy_test.go index 4658dc88..c9e9bfcd 100644 --- a/copy_test.go +++ b/copy_test.go @@ -1777,7 +1777,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { }) t.Run("MountFrom error", func(t *testing.T) { - root = descs[6] + root = descs[3] dst := &countingStorage{storage: cas.NewMemory()} opts = oras.CopyGraphOptions{ // to make the run result deterministic, we limit concurrency to 1 @@ -1793,8 +1793,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { t.Fatalf("CopyGraph() error = %v, wantErr %v", err, e) } - // with a low probability, dst.numExists may be 3 or 5 - if got, expected := dst.numExists.Load(), int64(4); got != expected { + if got, expected := dst.numExists.Load(), int64(2); got != expected { t.Errorf("count(Exists()) = %d, want %d", got, expected) } if got, expected := dst.numFetch.Load(), int64(0); got != expected { @@ -1803,14 +1802,13 @@ func TestCopyGraph_WithOptions(t *testing.T) { if got, expected := dst.numPush.Load(), int64(0); got != expected { t.Errorf("count(Push()) = %d, want %d", got, expected) } - // with a low probability, dst.numExists may be 2 if got, expected := numMountFrom.Load(), int64(1); got != expected { t.Errorf("count(MountFrom()) = %d, want %d", got, expected) } }) t.Run("MountFrom OnMounted error", func(t *testing.T) { - root = descs[6] + root = descs[3] dst := &countingStorage{storage: cas.NewMemory()} var numMount atomic.Int64 dst.mount = func(ctx context.Context, @@ -1859,7 +1857,7 @@ func TestCopyGraph_WithOptions(t *testing.T) { t.Fatalf("CopyGraph() error = %v, wantErr %v", err, e) } - if got, expected := dst.numExists.Load(), int64(4); got != expected { + if got, expected := dst.numExists.Load(), int64(2); got != expected { t.Errorf("count(Exists()) = %d, want %d", got, expected) } if got, expected := dst.numFetch.Load(), int64(0); got != expected { From 24d9601d59ca6f448f3bbe53f2fc172e5bf8e196 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Wang Date: Mon, 8 Apr 2024 09:14:45 +0000 Subject: [PATCH 5/5] remove a comment Signed-off-by: Xiaoxuan Wang --- internal/syncutil/limit.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/syncutil/limit.go b/internal/syncutil/limit.go index d0a3d88a..f2caabf7 100644 --- a/internal/syncutil/limit.go +++ b/internal/syncutil/limit.go @@ -81,7 +81,6 @@ func Go[T any](ctx context.Context, limiter *semaphore.Weighted, fn GoFunc[T], i eg.Go(func(t T) func() error { return func() error { defer region.End() - // cancel the gorountine before the next goroutine is created err := fn(egCtx, region, t) if err != nil { egErr.CompareAndSwap(nil, err)