Skip to content

Commit

Permalink
Reverted to low-level implementation (removing need for large buffers…
Browse files Browse the repository at this point in the history
…); fixes Refine S3 bucket usage #61

Also removed synchronous write; fixes S3 remove sync write (async only) #64

Note: Dispose has async calls/logic because "await using" on the StreamWriters in ClearMLNMTBuildJob calls DisposeAsync on the StreamWriters but Dispose on the BaseStream. This is the only way I could find to circumvent this - other solutions are welcome given there's a workaround I'm missing.
  • Loading branch information
Enkidu93 authored and johnml1135 committed Sep 8, 2023
1 parent d2d93d0 commit cf57098
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public static IMachineBuilder AddServalPlatformService(
new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
RetryPolicy = new Grpc.Net.Client.Configuration.RetryPolicy
{
MaxAttempts = 10,
InitialBackoff = TimeSpan.FromSeconds(1),
Expand Down
22 changes: 17 additions & 5 deletions src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ public class S3FileStorage : FileStorage
private readonly AmazonS3Client _client;
private readonly string _bucketName;
private readonly string _basePath;
private readonly ILoggerFactory _loggerFactory;

public S3FileStorage(string bucketName, string basePath, string accessKeyId, string secretAccessKey, string region)
public S3FileStorage(
string bucketName,
string basePath,
string accessKeyId,
string secretAccessKey,
string region,
ILoggerFactory loggerFactory
)
{
_client = new AmazonS3Client(
accessKeyId,
secretAccessKey,
new AmazonS3Config
{
RetryMode = Amazon.Runtime.RequestRetryMode.Standard,
RetryMode = RequestRetryMode.Standard,
MaxErrorRetry = 3,
RegionEndpoint = RegionEndpoint.GetBySystemName(region)
}
Expand All @@ -23,6 +31,7 @@ public S3FileStorage(string bucketName, string basePath, string accessKeyId, str
//Ultimately, object keys can neither begin nor end with slashes; this is what broke the earlier low-level implementation
_basePath = basePath.EndsWith("/") ? basePath.Remove(basePath.Length - 1, 1) : basePath;
_basePath = _basePath.StartsWith("/") ? _basePath.Remove(0, 1) : _basePath;
_loggerFactory = loggerFactory;
}

public override void Dispose() { }
Expand Down Expand Up @@ -72,11 +81,14 @@ public override async Task<Stream> OpenRead(string path, CancellationToken cance
return response.ResponseStream;
}

public override Task<Stream> OpenWrite(string path, CancellationToken cancellationToken = default)
public override async Task<Stream> OpenWrite(string path, CancellationToken cancellationToken = default)
{
string objectId = _basePath + Normalize(path);
return Task.FromResult<Stream>(
new BufferedStream(new S3WriteStream(_client, objectId, _bucketName), 1024 * 1024 * 100)
InitiateMultipartUploadRequest request = new() { BucketName = _bucketName, Key = objectId };
InitiateMultipartUploadResponse response = await _client.InitiateMultipartUploadAsync(request);
return new BufferedStream(
new S3WriteStream(_client, objectId, _bucketName, response.UploadId, _loggerFactory),
S3WriteStream.MaxPartSize
);
}

Expand Down
143 changes: 115 additions & 28 deletions src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,27 @@ public class S3WriteStream : Stream
{
private readonly AmazonS3Client _client;
private readonly string _key;
private readonly string _uploadId;
private readonly string _bucketName;
private long _length;
private readonly List<UploadPartResponse> _uploadResponses;
private readonly ILogger<S3WriteStream> _logger;

public S3WriteStream(AmazonS3Client client, string key, string bucketName)
public const int MaxPartSize = 5 * 1024 * 1024;

public S3WriteStream(
AmazonS3Client client,
string key,
string bucketName,
string uploadId,
ILoggerFactory loggerFactory
)
{
_client = client;
_key = key;
_bucketName = bucketName;
_length = 0;
_uploadId = uploadId;
_logger = loggerFactory.CreateLogger<S3WriteStream>();
_uploadResponses = new List<UploadPartResponse>();
}

public override bool CanRead => false;
Expand All @@ -21,7 +33,7 @@ public S3WriteStream(AmazonS3Client client, string key, string bucketName)

public override bool CanWrite => true;

public override long Length => _length;
public override long Length => 0;

public override long Position
{
Expand All @@ -39,38 +51,113 @@ public override void Flush() { }

public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;

public override void Write(byte[] buffer, int offset, int count)
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
using Stream inputStream = new MemoryStream(buffer, offset, count);
using var transferUtility = new TransferUtility(_client);
var uploadRequest = new TransferUtilityUploadRequest
try
{
BucketName = _bucketName,
InputStream = inputStream,
Key = _key,
PartSize = count
};
transferUtility.Upload(uploadRequest);
using MemoryStream ms = new(buffer, offset, count);
int partNumber = _uploadResponses.Count + 1;
UploadPartRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = partNumber,
InputStream = ms,
PartSize = MaxPartSize
};
request.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(
(_, e) =>
{
_logger.LogDebug($"Transferred {e.TransferredBytes}/{e.TotalBytes}");
}
);
UploadPartResponse response = await _client.UploadPartAsync(request);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
_uploadResponses.Add(response);
}
catch (Exception e)
{
await AbortAsync(e);
throw;
}
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
protected override void Dispose(bool disposing)
{
using Stream inputStream = new MemoryStream(buffer, offset, count);
using var transferUtility = new TransferUtility(_client);
var uploadRequest = new TransferUtilityUploadRequest
if (disposing)
{
try
{
CompleteMultipartUploadRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
request.AddPartETags(_uploadResponses);
CompleteMultipartUploadResponse response = _client
.CompleteMultipartUploadAsync(request)
.WaitAndUnwrapException();
Dispose(disposing: false);
GC.SuppressFinalize(this);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}
catch (Exception e)
{
AbortAsync(e).WaitAndUnwrapException();
throw;
}
}
base.Dispose(disposing);
}

public async override ValueTask DisposeAsync()
{
try
{
CompleteMultipartUploadRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
request.AddPartETags(_uploadResponses);
CompleteMultipartUploadResponse response = await _client.CompleteMultipartUploadAsync(request);
Dispose(disposing: false);
GC.SuppressFinalize(this);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}
catch (Exception e)
{
BucketName = _bucketName,
InputStream = inputStream,
Key = _key,
PartSize = count
};
await transferUtility.UploadAsync(uploadRequest);
await AbortAsync(e);
}
}

public override ValueTask DisposeAsync()
private async Task AbortAsync(Exception e)
{
Dispose(disposing: false);
GC.SuppressFinalize(this);
return ValueTask.CompletedTask;
_logger.LogError(e, $"Aborted upload {_uploadId} to {_bucketName}/{_key}");
AbortMultipartUploadRequest abortMPURequest =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
await _client.AbortMultipartUploadAsync(abortMPURequest);
}
}
8 changes: 6 additions & 2 deletions src/SIL.Machine.AspNetCore/Services/SharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ public class SharedFileService : ISharedFileService
private readonly Uri? _baseUri;
private readonly FileStorage _fileStorage;
private readonly bool _supportFolderDelete = true;
private readonly ILoggerFactory _loggerFactory;

public SharedFileService(IOptions<SharedFileOptions>? options = null)
public SharedFileService(ILoggerFactory loggerFactory, IOptions<SharedFileOptions>? options = null)
{
_loggerFactory = loggerFactory;

if (options?.Value.Uri is null)
{
_fileStorage = new InMemoryStorage();
Expand All @@ -30,7 +33,8 @@ public SharedFileService(IOptions<SharedFileOptions>? options = null)
_baseUri.AbsolutePath,
options.Value.S3AccessKeyId,
options.Value.S3SecretAccessKey,
options.Value.S3Region
options.Value.S3Region,
_loggerFactory
);
_supportFolderDelete = false;
break;
Expand Down
3 changes: 2 additions & 1 deletion src/SIL.Machine.AspNetCore/Usings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
global using Amazon;
global using Amazon.S3;
global using Amazon.S3.Model;
global using Amazon.S3.Transfer;
global using Amazon.Runtime;
global using Grpc.Core;
global using Grpc.Core.Interceptors;
global using Grpc.Net.Client.Configuration;
Expand All @@ -31,6 +31,7 @@
global using MongoDB.Driver;
global using MongoDB.Driver.Linq;
global using Nito.AsyncEx;
global using Nito.AsyncEx.Synchronous;
global using Polly;
global using SIL.DataAccess;
global using SIL.Machine.AspNetCore.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public TestEnvironment()
new MemoryRepository<RWLock>(),
new ObjectIdGenerator()
);
_sharedFileService = new SharedFileService();
_sharedFileService = new SharedFileService(Substitute.For<ILoggerFactory>());
_options = Substitute.For<IOptionsMonitor<ClearMLNmtEngineOptions>>();
_options.CurrentValue.Returns(
new ClearMLNmtEngineOptions { BuildPollingTimeout = TimeSpan.FromMilliseconds(50) }
Expand Down

0 comments on commit cf57098

Please sign in to comment.