Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(copy): introduces CopyGraphOptions with events support #145

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6817504
feat(copy): support mounting existing descriptors from other reposito…
leonardochaia Sep 26, 2024
b453fbc
fix: standard header
leonardochaia Sep 26, 2024
6a7a1e9
chore: adds repository tests
leonardochaia Oct 3, 2024
88c6793
fix: copy test
leonardochaia Oct 17, 2024
4c52467
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Oct 17, 2024
36342c9
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 11, 2024
dbe3509
chore: removes mounting support to be implemented in separate PR
leonardochaia Nov 11, 2024
571820c
refactor: renames class and events as per review
leonardochaia Nov 11, 2024
138120e
chore: adds copy tests
leonardochaia Nov 11, 2024
77e8079
chore: adds overload to prevent breaking change
leonardochaia Nov 11, 2024
06fd442
chore: introduces overloads to keep previous signature on CopyGraphOp…
leonardochaia Nov 11, 2024
5787c04
fix: copy signature
leonardochaia Nov 12, 2024
c9be0a2
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 12, 2024
77ba179
refactor: introduces CopyOptions
leonardochaia Nov 13, 2024
ecaf1b1
Merge remote-tracking branch 'origin/feat/improve-copy-performance' i…
leonardochaia Nov 13, 2024
de9ee0d
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 13, 2024
6ef7e74
chore: adds license
leonardochaia Nov 13, 2024
be7b1c5
Merge remote-tracking branch 'origin/feat/improve-copy-performance' i…
leonardochaia Nov 13, 2024
a12ff78
chore: adds more tests from oras-go
leonardochaia Nov 13, 2024
a7a1baf
chore: adds tests from oras-go
leonardochaia Nov 13, 2024
493a8d9
doc: adds comments to new option structs
leonardochaia Nov 25, 2024
9e40655
refactor: makes copy events async
leonardochaia Nov 25, 2024
68220bd
feat: introduces `InvokeAsync` extension method to support asynchrono…
leonardochaia Nov 26, 2024
e35fc7e
refactor: delegate InvokeAsync to execute handlers in parallel
leonardochaia Nov 29, 2024
827f62b
refactor: Copy events to include sync and async variants.
leonardochaia Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/OrasProject.Oras/CopyGraphOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The ORAS Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using OrasProject.Oras.Oci;

namespace OrasProject.Oras;

public struct CopyGraphOptions
leonardochaia marked this conversation as resolved.
Show resolved Hide resolved
{
public event Action<Descriptor> PreCopy;

public event Action<Descriptor> PostCopy;

public event Action<Descriptor> CopySkipped;
leonardochaia marked this conversation as resolved.
Show resolved Hide resolved

internal void OnPreCopy(Descriptor descriptor)
{
PreCopy?.Invoke(descriptor);
}

internal void OnPostCopy(Descriptor descriptor)
{
PostCopy?.Invoke(descriptor);
}

internal void OnCopySkipped(Descriptor descriptor)
{
CopySkipped?.Invoke(descriptor);
}
}
52 changes: 41 additions & 11 deletions src/OrasProject.Oras/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using OrasProject.Oras.Oci;
using System;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;
using static OrasProject.Oras.Content.Extensions;

namespace OrasProject.Oras;

public static class Extensions
{
/// <summary>
/// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
/// in the source Target to the destination Target.
/// The destination reference will be the same as the source reference if the
/// destination reference is left blank.
/// Returns the descriptor of the root node on successful copy.
/// </summary>
/// <param name="src"></param>
/// <param name="srcRef"></param>
/// <param name="dst"></param>
/// <param name="dstRef"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef,
CancellationToken cancellationToken = default)
{
return src.CopyAsync(srcRef, dst, dstRef, new CopyGraphOptions(), cancellationToken);
}

/// <summary>
/// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
Expand All @@ -33,41 +52,52 @@ public static class Extensions
/// <param name="srcRef"></param>
/// <param name="dst"></param>
/// <param name="dstRef"></param>
/// <param name="copyOptions"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CancellationToken cancellationToken = default)
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CopyGraphOptions? copyOptions = default, CancellationToken cancellationToken = default)
leonardochaia marked this conversation as resolved.
Show resolved Hide resolved
{
if (string.IsNullOrEmpty(dstRef))
{
dstRef = srcRef;
}
var root = await src.ResolveAsync(srcRef, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, copyOptions, cancellationToken).ConfigureAwait(false);
await dst.TagAsync(root, dstRef, cancellationToken).ConfigureAwait(false);
return root;
}

public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken)
public static Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken = default)
{
return src.CopyGraphAsync(dst, node, new CopyGraphOptions(), cancellationToken);
}

public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CopyGraphOptions? copyOptions = default, CancellationToken cancellationToken = default)
{
// check if node exists in target
if (await dst.ExistsAsync(node, cancellationToken).ConfigureAwait(false))
{
copyOptions?.OnCopySkipped(node);
return;
}

// retrieve successors
var successors = await src.GetSuccessorsAsync(node, cancellationToken).ConfigureAwait(false);
// obtain data stream
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);

// check if the node has successors
if (successors != null)
foreach (var childNode in successors)
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
{
foreach (var childNode in successors)
{
await src.CopyGraphAsync(dst, childNode, cancellationToken).ConfigureAwait(false);
}
await src.CopyGraphAsync(dst, childNode, copyOptions, cancellationToken).ConfigureAwait(false);
}

// perform the copy
copyOptions?.OnPreCopy(node);
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);
await dst.PushAsync(node, dataStream, cancellationToken).ConfigureAwait(false);

// we copied it
copyOptions?.OnPostCopy(node);
}
}

143 changes: 143 additions & 0 deletions tests/OrasProject.Oras.Tests/CopyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Text;
using System.Text.Json;
using Xunit;
using Index = OrasProject.Oras.Oci.Index;

namespace OrasProject.Oras.Tests;

Expand Down Expand Up @@ -142,4 +143,146 @@ public async Task CanCopyBetweenMemoryTargets()

}
}

[Fact]
public async Task TestCopyGraph_FullCopy()
{
var src = new MemoryStore();
var dst = new MemoryStore();

// Generate test content
var blobs = new List<byte[]>();
var descs = new List<Descriptor>();

void AppendBlob(string mediaType, byte[] blob)
{
blobs.Add(blob);
var desc = new Descriptor
{
MediaType = mediaType,
Digest = Digest.ComputeSHA256(blob),
Size = blob.Length
};
descs.Add(desc);
}

void GenerateManifest(Descriptor config, params Descriptor[] layers)
{
var manifest = new Manifest
{
Config = config,
Layers = layers.ToList()
};
var manifestBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(manifest));
AppendBlob(MediaType.ImageManifest, manifestBytes);
}

void GenerateIndex(params Descriptor[] manifests)
{
var index = new Index
{
Manifests = manifests.ToList()
};
var indexBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(index));
AppendBlob(MediaType.ImageIndex, indexBytes);
}

// Append blobs and generate manifests and indices
var getBytes = (string data) => Encoding.UTF8.GetBytes(data);
AppendBlob(MediaType.ImageConfig, getBytes("config")); // Blob 0
AppendBlob(MediaType.ImageLayer, getBytes("foo")); // Blob 1
AppendBlob(MediaType.ImageLayer, getBytes("bar")); // Blob 2
AppendBlob(MediaType.ImageLayer, getBytes("hello")); // Blob 3
GenerateManifest(descs[0], descs[1], descs[2]); // Blob 4
GenerateManifest(descs[0], descs[3]); // Blob 5
GenerateManifest(descs[0], descs[1], descs[2], descs[3]); // Blob 6
GenerateIndex(descs[4], descs[5]); // Blob 7
GenerateIndex(descs[6]); // Blob 8
GenerateIndex(); // Blob 9
GenerateIndex(descs[7], descs[8], descs[9]); // Blob 10

var root = descs[^1]; // The last descriptor as the root

// Push blobs to the source memory store
for (int i = 0; i < blobs.Count; i++)
{
await src.PushAsync(descs[i], new MemoryStream(blobs[i]), CancellationToken.None);
}

// Set up tracking storage wrappers for verification
var srcTracker = new StorageTracker(src);
var dstTracker = new StorageTracker(dst);

// Perform the copy graph operation
var copyOptions = new CopyGraphOptions();
await srcTracker.CopyGraphAsync(dstTracker, root, copyOptions, CancellationToken.None);

// Verify contents in the destination
foreach (var (desc, blob) in descs.Zip(blobs, Tuple.Create))
{
Assert.True(await dst.ExistsAsync(desc, CancellationToken.None), $"Blob {desc.Digest} should exist in destination.");
var fetchedContent = await dst.FetchAsync(desc, CancellationToken.None);
using var memoryStream = new MemoryStream();
await fetchedContent.CopyToAsync(memoryStream);
Assert.Equal(blob, memoryStream.ToArray());
}

// Verify API counts
// REMARKS: FetchCount should equal to blobs.Count
// but since there's no caching implemented, it is not
Assert.Equal(18, srcTracker.FetchCount);
Assert.Equal(0, srcTracker.PushCount);
Assert.Equal(0, srcTracker.ExistsCount);
Assert.Equal(0, dstTracker.FetchCount);
Assert.Equal(blobs.Count, dstTracker.PushCount);

// REMARKS: ExistsCount should equal to blobs.Count
// but since there's no caching implemented, it is not
Assert.Equal(16, dstTracker.ExistsCount);
}

private class StorageTracker : ITarget
{
private readonly ITarget _storage;

public int FetchCount { get; private set; }
public int PushCount { get; private set; }
public int ExistsCount { get; private set; }

public IList<string> Fetched { get; } = [];

public StorageTracker(ITarget storage)
{
_storage = storage;
}

public async Task<bool> ExistsAsync(Descriptor desc, CancellationToken cancellationToken)
{
ExistsCount++;
return await _storage.ExistsAsync(desc, cancellationToken);
}

public async Task<Stream> FetchAsync(Descriptor desc, CancellationToken cancellationToken)
{
FetchCount++;
Fetched.Add(desc.Digest);
return await _storage.FetchAsync(desc, cancellationToken);
}

public async Task PushAsync(Descriptor desc, Stream content, CancellationToken cancellationToken)
{
PushCount++;
await _storage.PushAsync(desc, content, cancellationToken);
}

public async Task TagAsync(Descriptor desc, string reference, CancellationToken cancellationToken)
{
await _storage.TagAsync(desc, reference, cancellationToken);
}

public Task<Descriptor> ResolveAsync(string reference, CancellationToken cancellationToken)
{
return _storage.ResolveAsync(reference, cancellationToken);
}
}
}
Loading