Skip to content

Commit

Permalink
Reverted to low-level implementation (removing need for large buffers…
Browse files Browse the repository at this point in the history
…); fixes Refine S3 bucket usage #61

Also removed synchronous write; fixes S3 remove sync write (async only) #64

Note: Dispose has async calls/logic because "await using" on the StreamWriters in ClearMLNMTBuildJob calls DisposeAsync on the StreamWriters but Dispose on the BaseStream. This is the only way I could find to circumvent this - other solutions are welcome given there's a workaround I'm missing.

--ECL
  • Loading branch information
Enkidu93 committed Sep 7, 2023
1 parent 3cdaf0d commit 3177461
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public static IMachineBuilder AddServalPlatformService(
new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
RetryPolicy = new Grpc.Net.Client.Configuration.RetryPolicy
{
MaxAttempts = 10,
InitialBackoff = TimeSpan.FromSeconds(1),
Expand Down
9 changes: 6 additions & 3 deletions src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ public override async Task<Stream> OpenRead(string path, CancellationToken cance
return response.ResponseStream;
}

public override Task<Stream> OpenWrite(string path, CancellationToken cancellationToken = default)
public override async Task<Stream> OpenWrite(string path, CancellationToken cancellationToken = default)
{
string objectId = _basePath + Normalize(path);
return Task.FromResult<Stream>(
new BufferedStream(new S3WriteStream(_client, objectId, _bucketName), 1024 * 1024 * 100)
InitiateMultipartUploadRequest request = new() { BucketName = _bucketName, Key = objectId };
InitiateMultipartUploadResponse response = await _client.InitiateMultipartUploadAsync(request);
return new BufferedStream(
new S3WriteStream(_client, objectId, _bucketName, response.UploadId),
1024 * 1024 * 5
);
}

Expand Down
133 changes: 105 additions & 28 deletions src/SIL.Machine.AspNetCore/Services/S3WriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ public class S3WriteStream : Stream
{
private readonly AmazonS3Client _client;
private readonly string _key;
private readonly string _uploadId;
private readonly string _bucketName;
private long _length;
private readonly List<UploadPartResponse> _uploadResponses;
private readonly ILogger<S3WriteStream> _logger;

public S3WriteStream(AmazonS3Client client, string key, string bucketName)
public S3WriteStream(AmazonS3Client client, string key, string bucketName, string uploadId)
{
_client = client;
_key = key;
_bucketName = bucketName;
_length = 0;
_uploadId = uploadId;
_logger = new LoggerFactory().CreateLogger<S3WriteStream>();
_uploadResponses = new List<UploadPartResponse>();
}

public override bool CanRead => false;
Expand All @@ -21,7 +25,7 @@ public S3WriteStream(AmazonS3Client client, string key, string bucketName)

public override bool CanWrite => true;

public override long Length => _length;
public override long Length => 0;

public override long Position
{
Expand All @@ -39,38 +43,111 @@ public override void Flush() { }

public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;

public override void Write(byte[] buffer, int offset, int count)
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
using Stream inputStream = new MemoryStream(buffer, offset, count);
using var transferUtility = new TransferUtility(_client);
var uploadRequest = new TransferUtilityUploadRequest
try
{
using MemoryStream ms = new(buffer, offset, count);
int partNumber = _uploadResponses.Count + 1;
UploadPartRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = partNumber,
InputStream = ms,
PartSize = 5 * 1024 * 1024
};
request.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(
(_, e) =>
{
_logger.LogDebug($"Transferred {e.TransferredBytes}/{e.TotalBytes}");
}
);
UploadPartResponse response = await _client.UploadPartAsync(request);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to upload part {partNumber} of upload {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
_uploadResponses.Add(response);
}
catch (Exception e)
{
BucketName = _bucketName,
InputStream = inputStream,
Key = _key,
PartSize = count
};
transferUtility.Upload(uploadRequest);
await Abort(e);
}
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
protected override void Dispose(bool disposing)
{
using Stream inputStream = new MemoryStream(buffer, offset, count);
using var transferUtility = new TransferUtility(_client);
var uploadRequest = new TransferUtilityUploadRequest
if (disposing)
{
try
{
CompleteMultipartUploadRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
request.AddPartETags(_uploadResponses);
CompleteMultipartUploadResponse response = _client
.CompleteMultipartUploadAsync(request)
.WaitAndUnwrapException();
Dispose(disposing: false);
GC.SuppressFinalize(this);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}
catch (Exception e)
{
Abort(e).WaitAndUnwrapException();
}
}
base.Dispose(disposing);
}

public async override ValueTask DisposeAsync()
{
try
{
CompleteMultipartUploadRequest request =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
request.AddPartETags(_uploadResponses);
CompleteMultipartUploadResponse response = await _client.CompleteMultipartUploadAsync(request);
Dispose(disposing: false);
GC.SuppressFinalize(this);
if (response.HttpStatusCode != HttpStatusCode.OK)
throw new HttpRequestException(
$"Tried to complete {_uploadId} to {_bucketName}/{_key} but received response code {response.HttpStatusCode}"
);
}
catch (Exception e)
{
BucketName = _bucketName,
InputStream = inputStream,
Key = _key,
PartSize = count
};
await transferUtility.UploadAsync(uploadRequest);
await Abort(e);
}
}

public override ValueTask DisposeAsync()
private async Task Abort(Exception e)
{
Dispose(disposing: false);
GC.SuppressFinalize(this);
return ValueTask.CompletedTask;
_logger.LogError(e, $"Aborted upload {_uploadId} to {_bucketName}/{_key}");
AbortMultipartUploadRequest abortMPURequest =
new()
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};
await _client.AbortMultipartUploadAsync(abortMPURequest);
}
}
3 changes: 2 additions & 1 deletion src/SIL.Machine.AspNetCore/Usings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
global using Amazon;
global using Amazon.S3;
global using Amazon.S3.Model;
global using Amazon.S3.Transfer;
global using Amazon.Runtime;
global using Grpc.Core;
global using Grpc.Core.Interceptors;
global using Grpc.Net.Client.Configuration;
Expand All @@ -31,6 +31,7 @@
global using MongoDB.Driver;
global using MongoDB.Driver.Linq;
global using Nito.AsyncEx;
global using Nito.AsyncEx.Synchronous;
global using Polly;
global using SIL.DataAccess;
global using SIL.Machine.AspNetCore.Configuration;
Expand Down

0 comments on commit 3177461

Please sign in to comment.