Skip to content

Commit

Permalink
feat(copy): support mounting existing descriptors from other reposito…
Browse files Browse the repository at this point in the history
…ries
  • Loading branch information
leonardochaia committed Sep 26, 2024
1 parent 2445b3d commit 847ecb0
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 32 deletions.
17 changes: 16 additions & 1 deletion src/OrasProject.Oras/Content/MemoryStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using OrasProject.Oras.Exceptions;
using OrasProject.Oras.Oci;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Registry;

namespace OrasProject.Oras.Content;

public class MemoryStore : ITarget, IPredecessorFindable
public class MemoryStore : ITarget, IPredecessorFindable, IMounter
{
private readonly MemoryStorage _storage = new();
private readonly MemoryTagStore _tagResolver = new();
Expand Down Expand Up @@ -94,4 +97,16 @@ public async Task TagAsync(Descriptor descriptor, string reference, Cancellation
/// <returns></returns>
public async Task<IEnumerable<Descriptor>> GetPredecessorsAsync(Descriptor node, CancellationToken cancellationToken = default)
=> await _graph.GetPredecessorsAsync(node, cancellationToken).ConfigureAwait(false);

public async Task MountAsync(Descriptor descriptor, string contentReference, Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken)
{
var taggedDescriptor = await _tagResolver.ResolveAsync(contentReference, cancellationToken).ConfigureAwait(false);
var successors = await _storage.GetSuccessorsAsync(taggedDescriptor, cancellationToken);

if (descriptor != taggedDescriptor && !successors.Contains(descriptor))
{
await _storage.PushAsync(descriptor, await getContents(cancellationToken), cancellationToken).ConfigureAwait(false);
await _graph.IndexAsync(_storage, descriptor, cancellationToken).ConfigureAwait(false);
}
}
}
104 changes: 94 additions & 10 deletions src/OrasProject.Oras/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using OrasProject.Oras.Oci;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;
using OrasProject.Oras.Registry;
using static OrasProject.Oras.Content.Extensions;

namespace OrasProject.Oras;

public struct CopyOptions
{
// public int Concurrency { get; set; }

public event Action<Descriptor> OnPreCopy;
public event Action<Descriptor> OnPostCopy;
public event Action<Descriptor> OnCopySkipped;
public event Action<Descriptor, string> OnMounted;

public Func<Descriptor, string[]> MountFrom { get; set; }

internal void PreCopy(Descriptor descriptor)
{
OnPreCopy?.Invoke(descriptor);
}

internal void PostCopy(Descriptor descriptor)
{
OnPostCopy?.Invoke(descriptor);
}

internal void CopySkipped(Descriptor descriptor)
{
OnCopySkipped?.Invoke(descriptor);
}

internal void Mounted(Descriptor descriptor, string sourceRepository)
{
OnMounted?.Invoke(descriptor, sourceRepository);
}
}
public static class Extensions
{

Expand All @@ -36,38 +69,89 @@ public static class Extensions
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CancellationToken cancellationToken = default)
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CancellationToken cancellationToken = default, CopyOptions? copyOptions = default)
{
if (string.IsNullOrEmpty(dstRef))
{
dstRef = srcRef;
}
var root = await src.ResolveAsync(srcRef, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, cancellationToken, copyOptions).ConfigureAwait(false);
await dst.TagAsync(root, dstRef, cancellationToken).ConfigureAwait(false);
return root;
}

public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken)
public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken, CopyOptions? copyOptions = default)
{
// check if node exists in target
if (await dst.ExistsAsync(node, cancellationToken).ConfigureAwait(false))
{
copyOptions?.CopySkipped(node);
return;
}

// retrieve successors
var successors = await src.GetSuccessorsAsync(node, cancellationToken).ConfigureAwait(false);
// obtain data stream
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);

// check if the node has successors
if (successors != null)
foreach (var childNode in successors)
{
await src.CopyGraphAsync(dst, childNode, cancellationToken, copyOptions).ConfigureAwait(false);
}

var sourceRepositories = copyOptions?.MountFrom(node) ?? [];
if (dst is IMounter mounter && sourceRepositories.Length > 0)
{
foreach (var childNode in successors)
for (var i = 0; i < sourceRepositories.Length; i++)
{
await src.CopyGraphAsync(dst, childNode, cancellationToken).ConfigureAwait(false);
var sourceRepository = sourceRepositories[i];
var mountFailed = false;

async Task<Stream> GetContents(CancellationToken token)
{
// the invocation of getContent indicates that mounting has failed
mountFailed = true;

if (i < sourceRepositories.Length - 1)
{
// If this is not the last one, skip this source and try next one
// We want to return an error that we will test for from mounter.Mount()
throw new SkipSourceException();
}

// this is the last iteration so we need to actually get the content and do the copy
// but first call the PreCopy function
copyOptions?.PreCopy(node);
return await src.FetchAsync(node, token).ConfigureAwait(false);
}

try
{
await mounter.MountAsync(node, sourceRepository, GetContents, cancellationToken).ConfigureAwait(false);
}
catch (SkipSourceException)
{
}

if (!mountFailed)
{
copyOptions?.Mounted(node, sourceRepository);
return;
}
}
}
await dst.PushAsync(node, dataStream, cancellationToken).ConfigureAwait(false);
else
{
// alternatively we just copy it
copyOptions?.PreCopy(node);
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);
await dst.PushAsync(node, dataStream, cancellationToken).ConfigureAwait(false);
}

// we copied it
copyOptions?.PostCopy(node);
}

private class SkipSourceException : Exception {}
}

24 changes: 24 additions & 0 deletions src/OrasProject.Oras/Registry/IMounter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;

namespace OrasProject.Oras.Registry;

/// <summary>
/// Mounter allows cross-repository blob mounts.
/// </summary>
public interface IMounter
{
/// <summary>
/// Mount makes the blob with the given descriptor in fromRepo
/// available in the repository signified by the receiver.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="contentReference"></param>
/// <param name="getContents"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task MountAsync(Descriptor descriptor, string contentReference, Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken);
}
2 changes: 1 addition & 1 deletion src/OrasProject.Oras/Registry/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace OrasProject.Oras.Registry;
/// Furthermore, this interface also provides the ability to enforce the
/// separation of the blob and the manifests CASs.
/// </summary>
public interface IRepository : ITarget, IReferenceFetchable, IReferencePushable, IDeletable, ITagListable
public interface IRepository : ITarget, IReferenceFetchable, IReferencePushable, IDeletable, ITagListable, IMounter
{
/// <summary>
/// Blobs provides access to the blob CAS only, which contains config blobs,layers, and other generic blobs.
Expand Down
116 changes: 96 additions & 20 deletions src/OrasProject.Oras/Registry/Remote/BlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace OrasProject.Oras.Registry.Remote;

public class BlobStore(Repository repository) : IBlobStore
public class BlobStore(Repository repository) : IBlobStore, IMounter
{
public Repository Repository { get; init; } = repository;

Expand Down Expand Up @@ -148,25 +148,7 @@ public async Task PushAsync(Descriptor expected, Stream content, CancellationTok
url = location.IsAbsoluteUri ? location : new Uri(url, location);
}

// monolithic upload
// add digest key to query string with expected digest value
var req = new HttpRequestMessage(HttpMethod.Put, new UriBuilder(url)
{
Query = $"{url.Query}&digest={HttpUtility.UrlEncode(expected.Digest)}"
}.Uri);
req.Content = new StreamContent(content);
req.Content.Headers.ContentLength = expected.Size;

// the expected media type is ignored as in the API doc.
req.Content.Headers.ContentType = new MediaTypeHeaderValue(MediaTypeNames.Application.Octet);

using (var response = await Repository.Options.HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false))
{
if (response.StatusCode != HttpStatusCode.Created)
{
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}
await InternalPushAsync(url, expected, content, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -198,4 +180,98 @@ public async Task<Descriptor> ResolveAsync(string reference, CancellationToken c
/// <returns></returns>
public async Task DeleteAsync(Descriptor target, CancellationToken cancellationToken = default)
=> await Repository.DeleteAsync(target, false, cancellationToken).ConfigureAwait(false);

/// <summary>
/// Mounts the given descriptor from contentReference into the blob store.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="contentReference"></param>
/// <param name="getContents"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="HttpRequestException"></exception>
/// <exception cref="Exception"></exception>
public async Task MountAsync(Descriptor descriptor, string contentReference,
Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken)
{
var url = new UriFactory(Repository.Options).BuildRepositoryBlobUpload();
var mountReq = new HttpRequestMessage(HttpMethod.Post, new UriBuilder(url)
{
Query =
$"{url.Query}&mount={HttpUtility.UrlEncode(descriptor.Digest)}&from={HttpUtility.UrlEncode(contentReference)}"
}.Uri);

using (var response = await Repository.Options.HttpClient.SendAsync(mountReq, cancellationToken)
.ConfigureAwait(false))
{
switch (response.StatusCode)
{
case HttpStatusCode.Created:
// 201, layer has been mounted
return;
case HttpStatusCode.Accepted:
{
// 202, mounting failed. upload session has begun
var location = response.Headers.Location ??
throw new HttpRequestException("missing location header");
url = location.IsAbsoluteUri ? location : new Uri(url, location);
break;
}
default:
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}

// From the [spec]:
//
// "If a registry does not support cross-repository mounting
// or is unable to mount the requested blob,
// it SHOULD return a 202.
// This indicates that the upload session has begun
// and that the client MAY proceed with the upload."
//
// So we need to get the content from somewhere in order to
// push it. If the caller has provided a getContent function, we
// can use that, otherwise pull the content from the source repository.
//
// [spec]: https://github.com/opencontainers/distribution-spec/blob/v1.1.0/spec.md#mounting-a-blob-from-another-repository

Stream contents;
if (getContents != null)
{
contents = await getContents(cancellationToken).ConfigureAwait(false);
}
else
{
var referenceOptions = repository.Options with
{
Reference = Reference.Parse(contentReference),
};
contents = await new Repository(referenceOptions).FetchAsync(descriptor, cancellationToken);
}

await InternalPushAsync(url, descriptor, contents, cancellationToken).ConfigureAwait(false);
}

private async Task InternalPushAsync(Uri url, Descriptor descriptor, Stream content,
CancellationToken cancellationToken)
{
// monolithic upload
// add digest key to query string with descriptor digest value
var req = new HttpRequestMessage(HttpMethod.Put, new UriBuilder(url)
{
Query = $"{url.Query}&digest={HttpUtility.UrlEncode(descriptor.Digest)}"
}.Uri);
req.Content = new StreamContent(content);
req.Content.Headers.ContentLength = descriptor.Size;

// the descriptor media type is ignored as in the API doc.
req.Content.Headers.ContentType = new MediaTypeHeaderValue(MediaTypeNames.Application.Octet);

using var response =
await Repository.Options.HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
if (response.StatusCode != HttpStatusCode.Created)
{
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}
}
18 changes: 18 additions & 0 deletions src/OrasProject.Oras/Registry/Remote/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,22 @@ internal Reference ParseReferenceFromContentReference(string reference)
/// <param name="desc"></param>
/// <returns></returns>
private IBlobStore BlobStore(Descriptor desc) => IsManifest(desc) ? Manifests : Blobs;

/// <summary>
/// Mount makes the blob with the given digest in fromRepo
/// available in the repository signified by the receiver.
///
/// This avoids the need to pull content down from fromRepo only to push it to r.
///
/// If the registry does not implement mounting, getContent will be used to get the
/// content to push. If getContent is null, the content will be pulled from the source
/// repository.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="contentReference"></param>
/// <param name="getContents"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task MountAsync(Descriptor descriptor, string contentReference, Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken)
=> ((IMounter)Blobs).MountAsync(descriptor,contentReference, getContents, cancellationToken);
}
Loading

0 comments on commit 847ecb0

Please sign in to comment.