Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(copy): introduces CopyGraphOptions with events support #145

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6817504
feat(copy): support mounting existing descriptors from other reposito…
leonardochaia Sep 26, 2024
b453fbc
fix: standard header
leonardochaia Sep 26, 2024
6a7a1e9
chore: adds repository tests
leonardochaia Oct 3, 2024
88c6793
fix: copy test
leonardochaia Oct 17, 2024
4c52467
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Oct 17, 2024
36342c9
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 11, 2024
dbe3509
chore: removes mounting support to be implemented in separate PR
leonardochaia Nov 11, 2024
571820c
refactor: renames class and events as per review
leonardochaia Nov 11, 2024
138120e
chore: adds copy tests
leonardochaia Nov 11, 2024
77e8079
chore: adds overload to prevent breaking change
leonardochaia Nov 11, 2024
06fd442
chore: introduces overloads to keep previous signature on CopyGraphOp…
leonardochaia Nov 11, 2024
5787c04
fix: copy signature
leonardochaia Nov 12, 2024
c9be0a2
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 12, 2024
77ba179
refactor: introduces CopyOptions
leonardochaia Nov 13, 2024
ecaf1b1
Merge remote-tracking branch 'origin/feat/improve-copy-performance' i…
leonardochaia Nov 13, 2024
de9ee0d
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 13, 2024
6ef7e74
chore: adds license
leonardochaia Nov 13, 2024
be7b1c5
Merge remote-tracking branch 'origin/feat/improve-copy-performance' i…
leonardochaia Nov 13, 2024
a12ff78
chore: adds more tests from oras-go
leonardochaia Nov 13, 2024
a7a1baf
chore: adds tests from oras-go
leonardochaia Nov 13, 2024
493a8d9
doc: adds comments to new option structs
leonardochaia Nov 25, 2024
9e40655
refactor: makes copy events async
leonardochaia Nov 25, 2024
68220bd
feat: introduces `InvokeAsync` extension method to support asynchrono…
leonardochaia Nov 26, 2024
e35fc7e
refactor: delegate InvokeAsync to execute handlers in parallel
leonardochaia Nov 29, 2024
827f62b
refactor: Copy events to include sync and async variants.
leonardochaia Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/OrasProject.Oras/AsyncInvocationExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The ORAS Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Linq;
using System.Threading.Tasks;

namespace OrasProject.Oras;

internal static class AsyncInvocationExtensions
{
/// <summary>
/// Asynchronously invokes all handlers from an event that returns a <see cref="Task"/>.
/// <remarks>All handlers are executed in parallel</remarks>
/// </summary>
/// <param name="eventDelegate"></param>
/// <param name="args"></param>
/// <typeparam name="TEventArgs"></typeparam>
internal static Task InvokeAsync<TEventArgs>(
this Func<TEventArgs, Task>? eventDelegate, TEventArgs args)
{
if (eventDelegate == null) return Task.CompletedTask;

var tasks = eventDelegate.GetInvocationList()
.Select(d => (Task?)d.DynamicInvoke(args) ?? Task.CompletedTask);

return Task.WhenAll(tasks);
}
}
74 changes: 74 additions & 0 deletions src/OrasProject.Oras/CopyGraphOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The ORAS Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;

namespace OrasProject.Oras;

/// <summary>
/// CopyGraphOptions contains parameters for <see cref="Extensions.CopyGraphAsync(OrasProject.Oras.ITarget,OrasProject.Oras.ITarget,OrasProject.Oras.Oci.Descriptor,System.Threading.CancellationToken)"/>
/// </summary>
public struct CopyGraphOptions
leonardochaia marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// PreCopyAsync handles the current descriptor before it is copied.
/// </summary>
public event Func<Descriptor, Task>? PreCopyAsync;

/// <summary>
/// PreCopy handles the current descriptor before it is copied.
/// </summary>
public event Action<Descriptor>? PreCopy;

/// <summary>
/// PostCopyAsync handles the current descriptor after it is copied.
/// </summary>
public event Func<Descriptor, Task>? PostCopyAsync;

/// <summary>
/// PostCopy handles the current descriptor after it is copied.
/// </summary>
public event Action<Descriptor>? PostCopy;

/// <summary>
/// CopySkippedAsync will be called when the sub-DAG rooted by the current node
/// is skipped.
/// </summary>
public event Func<Descriptor, Task>? CopySkippedAsync;

/// <summary>
/// CopySkipped will be called when the sub-DAG rooted by the current node
/// is skipped.
/// </summary>
public event Action<Descriptor>? CopySkipped;

internal Task OnPreCopyAsync(Descriptor descriptor)
{
PreCopy?.Invoke(descriptor);
return PreCopyAsync?.InvokeAsync(descriptor) ?? Task.CompletedTask;
}
Comment on lines +57 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PoC that plays with SomeEvent, we set up the tasks for the async events before invoking the sync event handler. While in this implementation we first invoke the sync event and then the async events. I think the PoC approach is more efficient. I'm wondering if we can achieve something similar with a reusable extension method.

How about something like this:

    internal static IEnumerable<Task> InvokeAsyncEvents<TEventArgs>(
        this Func<TEventArgs, Task>? eventDelegate, TEventArgs args)
    {
        if (eventDelegate == null)
        {
            return [Task.CompletedTask];
        }
        return eventDelegate.GetInvocationList()
            .Select(d => (Task?)d.DynamicInvoke(args) ?? Task.CompletedTask);
    }
    internal Task OnPreCopyAsync(Descriptor descriptor)
    {
        var tasks = PreCopyAsync.InvokeAsyncEvents(descriptor);
        PreCopy?.Invoke(descriptor);
        return Task.WhenAll(tasks);
    }

Naming is hard though...


internal Task OnPostCopyAsync(Descriptor descriptor)
{
PostCopy?.Invoke(descriptor);
return PostCopyAsync?.InvokeAsync(descriptor) ?? Task.CompletedTask;
}

internal Task OnCopySkippedAsync(Descriptor descriptor)
{
CopySkipped?.Invoke(descriptor);
return CopySkippedAsync?.Invoke(descriptor) ?? Task.CompletedTask;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean InvokeAsync?

}
}
22 changes: 22 additions & 0 deletions src/OrasProject.Oras/CopyOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright The ORAS Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace OrasProject.Oras;

/// <summary>
/// CopyOptions contains parameters for <see cref="Extensions.CopyAsync(OrasProject.Oras.ITarget,string,OrasProject.Oras.ITarget,string,System.Threading.CancellationToken)"/>
/// </summary>
public struct CopyOptions
{
public CopyGraphOptions CopyGraphOptions;
}
51 changes: 40 additions & 11 deletions src/OrasProject.Oras/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using OrasProject.Oras.Oci;
using System;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;
using static OrasProject.Oras.Content.Extensions;

namespace OrasProject.Oras;

public static class Extensions
{
/// <summary>
/// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
/// in the source Target to the destination Target.
/// The destination reference will be the same as the source reference if the
/// destination reference is left blank.
/// Returns the descriptor of the root node on successful copy.
/// </summary>
/// <param name="src"></param>
/// <param name="srcRef"></param>
/// <param name="dst"></param>
/// <param name="dstRef"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef,
CancellationToken cancellationToken = default)
{
return src.CopyAsync(srcRef, dst, dstRef, new CopyOptions(), cancellationToken);
}

/// <summary>
/// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
Expand All @@ -33,41 +52,51 @@ public static class Extensions
/// <param name="srcRef"></param>
/// <param name="dst"></param>
/// <param name="dstRef"></param>
/// <param name="copyOptions"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CancellationToken cancellationToken = default)
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CopyOptions copyOptions = default, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(dstRef))
{
dstRef = srcRef;
}
var root = await src.ResolveAsync(srcRef, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, copyOptions.CopyGraphOptions, cancellationToken).ConfigureAwait(false);
await dst.TagAsync(root, dstRef, cancellationToken).ConfigureAwait(false);
return root;
}

public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken)
public static Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken = default)
{
return src.CopyGraphAsync(dst, node, new CopyGraphOptions(), cancellationToken);
}

public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CopyGraphOptions copyGraphOptions = default, CancellationToken cancellationToken = default)
{
// check if node exists in target
if (await dst.ExistsAsync(node, cancellationToken).ConfigureAwait(false))
{
await copyGraphOptions.OnCopySkippedAsync(node).ConfigureAwait(false);
return;
}

// retrieve successors
var successors = await src.GetSuccessorsAsync(node, cancellationToken).ConfigureAwait(false);
// obtain data stream
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);

// check if the node has successors
if (successors != null)
foreach (var childNode in successors)
Wwwsylvia marked this conversation as resolved.
Show resolved Hide resolved
{
foreach (var childNode in successors)
{
await src.CopyGraphAsync(dst, childNode, cancellationToken).ConfigureAwait(false);
}
await src.CopyGraphAsync(dst, childNode, copyGraphOptions, cancellationToken).ConfigureAwait(false);
}

// perform the copy
await copyGraphOptions.OnPreCopyAsync(node).ConfigureAwait(false);
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);
await dst.PushAsync(node, dataStream, cancellationToken).ConfigureAwait(false);

// we copied it
await copyGraphOptions.OnPostCopyAsync(node).ConfigureAwait(false);
}
}
Loading
Loading