diff --git a/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs b/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs index a8c7f1fd4..739a72924 100644 --- a/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs +++ b/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs @@ -256,7 +256,7 @@ public static IMachineBuilder AddServalPlatformService( new MethodConfig { Names = { MethodName.Default }, - RetryPolicy = new RetryPolicy + RetryPolicy = new Grpc.Net.Client.Configuration.RetryPolicy { MaxAttempts = 10, InitialBackoff = TimeSpan.FromSeconds(1), diff --git a/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs b/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs index 56bbc9829..ff8563a42 100644 --- a/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs +++ b/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs @@ -5,15 +5,23 @@ public class S3FileStorage : FileStorage private readonly AmazonS3Client _client; private readonly string _bucketName; private readonly string _basePath; + private readonly ILoggerFactory _loggerFactory; - public S3FileStorage(string bucketName, string basePath, string accessKeyId, string secretAccessKey, string region) + public S3FileStorage( + string bucketName, + string basePath, + string accessKeyId, + string secretAccessKey, + string region, + ILoggerFactory loggerFactory + ) { _client = new AmazonS3Client( accessKeyId, secretAccessKey, new AmazonS3Config { - RetryMode = Amazon.Runtime.RequestRetryMode.Standard, + RetryMode = RequestRetryMode.Standard, MaxErrorRetry = 3, RegionEndpoint = RegionEndpoint.GetBySystemName(region) } @@ -23,6 +31,7 @@ public S3FileStorage(string bucketName, string basePath, string accessKeyId, str //Ultimately, object keys can neither begin nor end with slashes; this is what broke the earlier low-level implementation _basePath = basePath.EndsWith("/") ? basePath.Remove(basePath.Length - 1, 1) : basePath; _basePath = _basePath.StartsWith("/") ? _basePath.Remove(0, 1) : _basePath; + _loggerFactory = loggerFactory; } public override void Dispose() { } @@ -72,11 +81,14 @@ public override async Task OpenRead(string path, CancellationToken cance return response.ResponseStream; } - public override Task OpenWrite(string path, CancellationToken cancellationToken = default) + public override async Task OpenWrite(string path, CancellationToken cancellationToken = default) { string objectId = _basePath + Normalize(path); - return Task.FromResult( - new BufferedStream(new S3WriteStream(_client, objectId, _bucketName), 1024 * 1024 * 100) + InitiateMultipartUploadRequest request = new() { BucketName = _bucketName, Key = objectId }; + InitiateMultipartUploadResponse response = await _client.InitiateMultipartUploadAsync(request); + return new BufferedStream( + new S3WriteStream(_client, objectId, _bucketName, response.UploadId, _loggerFactory), + S3WriteStream.MaxPartSize ); } diff --git a/src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs b/src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs index b2d5808f0..fc2173053 100644 --- a/src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs +++ b/src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs @@ -4,15 +4,27 @@ public class S3WriteStream : Stream { private readonly AmazonS3Client _client; private readonly string _key; + private readonly string _uploadId; private readonly string _bucketName; - private long _length; + private readonly List _uploadResponses; + private readonly ILogger _logger; - public S3WriteStream(AmazonS3Client client, string key, string bucketName) + public const int MaxPartSize = 5 * 1024 * 1024; + + public S3WriteStream( + AmazonS3Client client, + string key, + string bucketName, + string uploadId, + ILoggerFactory loggerFactory + ) { _client = client; _key = key; _bucketName = bucketName; - _length = 0; + _uploadId = uploadId; + _logger = loggerFactory.CreateLogger(); + _uploadResponses = new List(); } public override bool CanRead => false; @@ -21,7 +33,7 @@ public S3WriteStream(AmazonS3Client client, string key, string bucketName) public override bool CanWrite => true; - public override long Length => _length; + public override long Length => 0; public override long Position { @@ -39,38 +51,113 @@ public override void Flush() { } public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; - public override void Write(byte[] buffer, int offset, int count) + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - using Stream inputStream = new MemoryStream(buffer, offset, count); - using var transferUtility = new TransferUtility(_client); - var uploadRequest = new TransferUtilityUploadRequest + try { - BucketName = _bucketName, - InputStream = inputStream, - Key = _key, - PartSize = count - }; - transferUtility.Upload(uploadRequest); + using MemoryStream ms = new(buffer, offset, count); + int partNumber = _uploadResponses.Count + 1; + UploadPartRequest request = + new() + { + BucketName = _bucketName, + Key = _key, + UploadId = _uploadId, + PartNumber = partNumber, + InputStream = ms, + PartSize = MaxPartSize + }; + request.StreamTransferProgress += new EventHandler( + (_, e) => + { + _logger.LogDebug($"Transferred {e.TransferredBytes}/{e.TotalBytes}"); + } + ); + UploadPartResponse response = await _client.UploadPartAsync(request); + if (response.HttpStatusCode != HttpStatusCode.OK) + throw new HttpRequestException( + $"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}" + ); + _uploadResponses.Add(response); + } + catch (Exception e) + { + await AbortAsync(e); + throw; + } } - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + protected override void Dispose(bool disposing) { - using Stream inputStream = new MemoryStream(buffer, offset, count); - using var transferUtility = new TransferUtility(_client); - var uploadRequest = new TransferUtilityUploadRequest + if (disposing) + { + try + { + CompleteMultipartUploadRequest request = + new() + { + BucketName = _bucketName, + Key = _key, + UploadId = _uploadId + }; + request.AddPartETags(_uploadResponses); + CompleteMultipartUploadResponse response = _client + .CompleteMultipartUploadAsync(request) + .WaitAndUnwrapException(); + Dispose(disposing: false); + GC.SuppressFinalize(this); + if (response.HttpStatusCode != HttpStatusCode.OK) + throw new HttpRequestException( + $"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}" + ); + } + catch (Exception e) + { + AbortAsync(e).WaitAndUnwrapException(); + throw; + } + } + base.Dispose(disposing); + } + + public async override ValueTask DisposeAsync() + { + try + { + CompleteMultipartUploadRequest request = + new() + { + BucketName = _bucketName, + Key = _key, + UploadId = _uploadId + }; + request.AddPartETags(_uploadResponses); + CompleteMultipartUploadResponse response = await _client.CompleteMultipartUploadAsync(request); + Dispose(disposing: false); + GC.SuppressFinalize(this); + if (response.HttpStatusCode != HttpStatusCode.OK) + throw new HttpRequestException( + $"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}" + ); + } + catch (Exception e) { - BucketName = _bucketName, - InputStream = inputStream, - Key = _key, - PartSize = count - }; - await transferUtility.UploadAsync(uploadRequest); + await AbortAsync(e); + } } - public override ValueTask DisposeAsync() + private async Task AbortAsync(Exception e) { - Dispose(disposing: false); - GC.SuppressFinalize(this); - return ValueTask.CompletedTask; + _logger.LogError(e, $"Aborted upload {_uploadId} to {_bucketName}/{_key}"); + AbortMultipartUploadRequest abortMPURequest = + new() + { + BucketName = _bucketName, + Key = _key, + UploadId = _uploadId + }; + await _client.AbortMultipartUploadAsync(abortMPURequest); } } diff --git a/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs b/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs index 526cf95fd..cef73bbeb 100644 --- a/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs +++ b/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs @@ -5,9 +5,12 @@ public class SharedFileService : ISharedFileService private readonly Uri? _baseUri; private readonly FileStorage _fileStorage; private readonly bool _supportFolderDelete = true; + private readonly ILoggerFactory _loggerFactory; - public SharedFileService(IOptions? options = null) + public SharedFileService(ILoggerFactory loggerFactory, IOptions? options = null) { + _loggerFactory = loggerFactory; + if (options?.Value.Uri is null) { _fileStorage = new InMemoryStorage(); @@ -30,7 +33,8 @@ public SharedFileService(IOptions? options = null) _baseUri.AbsolutePath, options.Value.S3AccessKeyId, options.Value.S3SecretAccessKey, - options.Value.S3Region + options.Value.S3Region, + _loggerFactory ); _supportFolderDelete = false; break; diff --git a/src/SIL.Machine.AspNetCore/Usings.cs b/src/SIL.Machine.AspNetCore/Usings.cs index 21868db71..8544d1e96 100644 --- a/src/SIL.Machine.AspNetCore/Usings.cs +++ b/src/SIL.Machine.AspNetCore/Usings.cs @@ -13,7 +13,7 @@ global using Amazon; global using Amazon.S3; global using Amazon.S3.Model; -global using Amazon.S3.Transfer; +global using Amazon.Runtime; global using Grpc.Core; global using Grpc.Core.Interceptors; global using Grpc.Net.Client.Configuration; @@ -31,6 +31,7 @@ global using MongoDB.Driver; global using MongoDB.Driver.Linq; global using Nito.AsyncEx; +global using Nito.AsyncEx.Synchronous; global using Polly; global using SIL.DataAccess; global using SIL.Machine.AspNetCore.Configuration; diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs index f9d01a7d6..614dabc52 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/ClearMLNmtEngineServiceTests.cs @@ -84,7 +84,7 @@ public TestEnvironment() new MemoryRepository(), new ObjectIdGenerator() ); - _sharedFileService = new SharedFileService(); + _sharedFileService = new SharedFileService(Substitute.For()); _options = Substitute.For>(); _options.CurrentValue.Returns( new ClearMLNmtEngineOptions { BuildPollingTimeout = TimeSpan.FromMilliseconds(50) }