Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repository): mounting support #152

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/OrasProject.Oras/Registry/IMounter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The ORAS Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;

namespace OrasProject.Oras.Registry;

/// <summary>
/// Mounter allows cross-repository blob mounts.
/// </summary>
public interface IMounter
{
/// <summary>
/// Mount makes the blob with the given descriptor in fromRepo
/// available in the repository signified by the receiver.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="fromRepository"></param>
/// <param name="getContent"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task MountAsync(Descriptor descriptor,
string fromRepository,
Func<CancellationToken, Task<Stream>>? getContent = null,
CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/OrasProject.Oras/Registry/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace OrasProject.Oras.Registry;
/// Furthermore, this interface also provides the ability to enforce the
/// separation of the blob and the manifests CASs.
/// </summary>
public interface IRepository : ITarget, IReferenceFetchable, IReferencePushable, IDeletable, ITagListable
public interface IRepository : ITarget, IReferenceFetchable, IReferencePushable, IDeletable, ITagListable, IMounter
{
/// <summary>
/// Blobs provides access to the blob CAS only, which contains config blobs,layers, and other generic blobs.
Expand Down
135 changes: 115 additions & 20 deletions src/OrasProject.Oras/Registry/Remote/BlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

namespace OrasProject.Oras.Registry.Remote;

public class BlobStore(Repository repository) : IBlobStore
public class BlobStore(Repository repository) : IBlobStore, IMounter
{
public Repository Repository { get; init; } = repository;

Check warning on line 30 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

Parameter 'Repository repository' is captured into the state of the enclosing type and its value is also used to initialize a field, property, or event.

Check warning on line 30 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View workflow job for this annotation

GitHub Actions / Analyze (8.0.x)

Parameter 'Repository repository' is captured into the state of the enclosing type and its value is also used to initialize a field, property, or event.

Check warning on line 30 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View workflow job for this annotation

GitHub Actions / Analyze (8.0.x)

Parameter 'Repository repository' is captured into the state of the enclosing type and its value is also used to initialize a field, property, or event.

public async Task<Stream> FetchAsync(Descriptor target, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -148,25 +148,7 @@
url = location.IsAbsoluteUri ? location : new Uri(url, location);
}

// monolithic upload
// add digest key to query string with expected digest value
var req = new HttpRequestMessage(HttpMethod.Put, new UriBuilder(url)
{
Query = $"{url.Query}&digest={HttpUtility.UrlEncode(expected.Digest)}"
}.Uri);
req.Content = new StreamContent(content);
req.Content.Headers.ContentLength = expected.Size;

// the expected media type is ignored as in the API doc.
req.Content.Headers.ContentType = new MediaTypeHeaderValue(MediaTypeNames.Application.Octet);

using (var response = await Repository.Options.HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false))
{
if (response.StatusCode != HttpStatusCode.Created)
{
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}
await CompletePushAsync(url, expected, content, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -198,4 +180,117 @@
/// <returns></returns>
public async Task DeleteAsync(Descriptor target, CancellationToken cancellationToken = default)
=> await Repository.DeleteAsync(target, false, cancellationToken).ConfigureAwait(false);

/// <summary>
/// Mounts the given descriptor from fromRepository into the blob store.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="fromRepository"></param>
/// <param name="getContent"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="HttpRequestException"></exception>
/// <exception cref="Exception"></exception>
public async Task MountAsync(Descriptor descriptor, string fromRepository,
Func<CancellationToken, Task<Stream>>? getContent, CancellationToken cancellationToken)
{
var url = new UriFactory(Repository.Options).BuildRepositoryBlobUpload();
var mountReq = new HttpRequestMessage(HttpMethod.Post, new UriBuilder(url)
{
Query =
$"{url.Query}&mount={HttpUtility.UrlEncode(descriptor.Digest)}&from={HttpUtility.UrlEncode(fromRepository)}"
}.Uri);

using (var response = await Repository.Options.HttpClient.SendAsync(mountReq, cancellationToken)
.ConfigureAwait(false))
{
switch (response.StatusCode)
{
case HttpStatusCode.Created:
// 201, layer has been mounted
response.VerifyContentDigest(descriptor.Digest);
return;
case HttpStatusCode.Accepted:
{
// 202, mounting failed. upload session has begun
var location = response.Headers.Location ??
throw new HttpRequestException("missing location header");
url = location.IsAbsoluteUri ? location : new Uri(url, location);
break;
}
default:
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}

// From the [spec]:
//
// "If a registry does not support cross-repository mounting
// or is unable to mount the requested blob,
// it SHOULD return a 202.
// This indicates that the upload session has begun
// and that the client MAY proceed with the upload."
//
// So we need to get the content from somewhere in order to
// push it. If the caller has provided a getContent function, we
// can use that, otherwise pull the content from the source repository.
//
// [spec]: https://github.com/opencontainers/distribution-spec/blob/v1.1.0/spec.md#mounting-a-blob-from-another-repository

async Task<Stream> GetContentStream()
{
Stream stream;
if (getContent != null)
{
stream = await getContent(cancellationToken).ConfigureAwait(false);
}
else
{
var referenceOptions = repository.Options with
{
Reference = Reference.Parse(fromRepository),
};
stream = await new Repository(referenceOptions).FetchAsync(descriptor, cancellationToken).ConfigureAwait(false);
}

return stream;
}

await using (var contents = await GetContentStream())
{
await CompletePushAsync(url, descriptor, contents, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Completes a push operation started beforehand.
/// </summary>
/// <param name="url"></param>
/// <param name="descriptor"></param>
/// <param name="content"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="Exception"></exception>
private async Task CompletePushAsync(Uri url, Descriptor descriptor, Stream content,
CancellationToken cancellationToken)
{
// monolithic upload
// add digest key to query string with descriptor digest value
var req = new HttpRequestMessage(HttpMethod.Put, new UriBuilder(url)
{
Query = $"{url.Query}&digest={HttpUtility.UrlEncode(descriptor.Digest)}"
}.Uri);
req.Content = new StreamContent(content);
req.Content.Headers.ContentLength = descriptor.Size;

// the descriptor media type is ignored as in the API doc.
req.Content.Headers.ContentType = new MediaTypeHeaderValue(MediaTypeNames.Application.Octet);

using (var response =
await Repository.Options.HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false))
{
if (response.StatusCode != HttpStatusCode.Created)
{
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);

Check warning on line 292 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Registry/Remote/BlobStore.cs#L291-L292

Added lines #L291 - L292 were not covered by tests
}
}
}
}
18 changes: 18 additions & 0 deletions src/OrasProject.Oras/Registry/Remote/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,22 @@ internal Reference ParseReferenceFromContentReference(string reference)
/// <param name="desc"></param>
/// <returns></returns>
private IBlobStore BlobStore(Descriptor desc) => IsManifest(desc) ? Manifests : Blobs;

/// <summary>
/// Mount makes the blob with the given digest in fromRepo
/// available in the repository signified by the receiver.
///
/// This avoids the need to pull content down from fromRepo only to push it to r.
///
/// If the registry does not implement mounting, getContent will be used to get the
/// content to push. If getContent is null, the content will be pulled from the source
/// repository.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="fromRepository"></param>
/// <param name="getContent"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task MountAsync(Descriptor descriptor, string fromRepository, Func<CancellationToken, Task<Stream>>? getContent = null, CancellationToken cancellationToken = default)
=> await ((IMounter)Blobs).MountAsync(descriptor, fromRepository, getContent, cancellationToken).ConfigureAwait(false);
}
Loading
Loading