diff --git a/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs b/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs index 6183c476d..b1f3212bc 100644 --- a/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs +++ b/src/SIL.Machine.AspNetCore/Configuration/IMachineBuilderExtensions.cs @@ -49,6 +49,21 @@ public static IMachineBuilder AddClearMLOptions(this IMachineBuilder builder, IC return builder; } + public static IMachineBuilder AddMessageOutboxOptions( + this IMachineBuilder builder, + Action configureOptions + ) + { + builder.Services.Configure(configureOptions); + return builder; + } + + public static IMachineBuilder AddMessageOutboxOptions(this IMachineBuilder builder, IConfiguration config) + { + builder.Services.Configure(config); + return builder; + } + public static IMachineBuilder AddSharedFileOptions( this IMachineBuilder builder, Action configureOptions @@ -263,6 +278,8 @@ await c.Indexes.CreateOrUpdateAsync( ) ) ); + o.AddRepository("outbox_messages"); + o.AddRepository("outboxes"); } ); builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo"); @@ -280,6 +297,11 @@ public static IMachineBuilder AddServalPlatformService( throw new InvalidOperationException("Serval connection string is required"); builder.Services.AddScoped(); + + builder.Services.AddSingleton(); + + builder.Services.AddSingleton(); + builder .Services.AddGrpcClient(o => { @@ -334,6 +356,9 @@ public static IMachineBuilder AddServalTranslationEngineService( options.Interceptors.Add(); }); builder.AddServalPlatformService(connectionString); + + builder.Services.AddHostedService(); + engineTypes ??= builder.Configuration?.GetSection("TranslationEngines").Get() ?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt]; diff --git a/src/SIL.Machine.AspNetCore/Configuration/IServiceCollectionExtensions.cs b/src/SIL.Machine.AspNetCore/Configuration/IServiceCollectionExtensions.cs index 21642d82d..0dd26f291 100644 --- a/src/SIL.Machine.AspNetCore/Configuration/IServiceCollectionExtensions.cs +++ b/src/SIL.Machine.AspNetCore/Configuration/IServiceCollectionExtensions.cs @@ -28,6 +28,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf builder.AddSmtTransferEngineOptions(o => { }); builder.AddClearMLOptions(o => { }); builder.AddBuildJobOptions(o => { }); + builder.AddMessageOutboxOptions(o => { }); } else { @@ -36,6 +37,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf builder.AddSmtTransferEngineOptions(configuration.GetSection(SmtTransferEngineOptions.Key)); builder.AddClearMLOptions(configuration.GetSection(ClearMLOptions.Key)); builder.AddBuildJobOptions(configuration.GetSection(BuildJobOptions.Key)); + builder.AddMessageOutboxOptions(configuration.GetSection(MessageOutboxOptions.Key)); } return builder; } diff --git a/src/SIL.Machine.AspNetCore/Configuration/MessageOutboxOptions.cs b/src/SIL.Machine.AspNetCore/Configuration/MessageOutboxOptions.cs new file mode 100644 index 000000000..0b306a900 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Configuration/MessageOutboxOptions.cs @@ -0,0 +1,8 @@ +namespace SIL.Machine.AspNetCore.Configuration; + +public class MessageOutboxOptions +{ + public const string Key = "MessageOutbox"; + + public int MessageExpirationInHours { get; set; } = 48; +} diff --git a/src/SIL.Machine.AspNetCore/Models/Outbox.cs b/src/SIL.Machine.AspNetCore/Models/Outbox.cs new file mode 100644 index 000000000..8b7acdfb3 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Models/Outbox.cs @@ -0,0 +1,28 @@ +namespace SIL.Machine.AspNetCore.Models; + +public record Outbox : IEntity +{ + public string Id { get; set; } = ""; + + public int Revision { get; set; } + + public string Name { get; set; } = null!; + public int CurrentIndex { get; set; } + + public static async Task GetOutboxNextIndexAsync( + IRepository indexRepository, + string outboxName, + CancellationToken cancellationToken + ) + { + Outbox outbox = ( + await indexRepository.UpdateAsync( + i => i.Name == outboxName, + i => i.Inc(b => b.CurrentIndex, 1), + upsert: true, + cancellationToken: cancellationToken + ) + )!; + return outbox; + } +} diff --git a/src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs b/src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs new file mode 100644 index 000000000..f13d3a082 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs @@ -0,0 +1,14 @@ +namespace SIL.Machine.AspNetCore.Models; + +public record OutboxMessage : IEntity +{ + public string Id { get; set; } = ""; + public int Revision { get; set; } = 1; + public required int Index { get; set; } + public required string OutboxName { get; set; } + public required string Method { get; set; } + public required string GroupId { get; set; } + public required string? RequestContent { get; set; } + public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow; + public int Attempts { get; set; } = 0; +} diff --git a/src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs b/src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs index 217a398e1..952a41254 100644 --- a/src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs +++ b/src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs @@ -4,6 +4,7 @@ public class ClearMLMonitorService( IServiceProvider services, IClearMLService clearMLService, ISharedFileService sharedFileService, + IDataAccessContext dataAccessContext, IOptionsMonitor clearMLOptions, IOptionsMonitor buildJobOptions, ILogger logger @@ -23,6 +24,7 @@ ILogger logger private readonly IClearMLService _clearMLService = clearMLService; private readonly ISharedFileService _sharedFileService = sharedFileService; + private readonly IDataAccessContext _dataAccessContext = dataAccessContext; private readonly ILogger _logger = logger; private readonly Dictionary _curBuildStatus = new(); @@ -225,17 +227,24 @@ private async Task TrainJobStartedAsync( CancellationToken cancellationToken = default ) { + bool success; IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken); await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken)) { - if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken)) - return false; + success = await _dataAccessContext.WithTransactionAsync( + async (ct) => + { + if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct)) + return false; + await platformService.BuildStartedAsync(buildId, CancellationToken.None); + return true; + }, + cancellationToken: cancellationToken + ); } - await platformService.BuildStartedAsync(buildId, CancellationToken.None); - await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken); _logger.LogInformation("Build started ({BuildId})", buildId); - return true; + return success; } private async Task TrainJobCompletedAsync( @@ -286,12 +295,18 @@ CancellationToken cancellationToken IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken); await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken)) { - await platformService.BuildFaultedAsync(buildId, message, cancellationToken); - await buildJobService.BuildJobFinishedAsync( - engineId, - buildId, - buildComplete: false, - CancellationToken.None + await _dataAccessContext.WithTransactionAsync( + async (ct) => + { + await platformService.BuildFaultedAsync(buildId, message, ct); + await buildJobService.BuildJobFinishedAsync( + engineId, + buildId, + buildComplete: false, + CancellationToken.None + ); + }, + cancellationToken: cancellationToken ); } _logger.LogError("Build faulted ({BuildId}). Error: {ErrorMessage}", buildId, message); @@ -316,12 +331,18 @@ CancellationToken cancellationToken IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken); await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken)) { - await platformService.BuildCanceledAsync(buildId, cancellationToken); - await buildJobService.BuildJobFinishedAsync( - engineId, - buildId, - buildComplete: false, - CancellationToken.None + await _dataAccessContext.WithTransactionAsync( + async (ct) => + { + await platformService.BuildCanceledAsync(buildId, ct); + await buildJobService.BuildJobFinishedAsync( + engineId, + buildId, + buildComplete: false, + CancellationToken.None + ); + }, + cancellationToken: cancellationToken ); } _logger.LogInformation("Build canceled ({BuildId})", buildId); diff --git a/src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs index deb9219b6..515f88732 100644 --- a/src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs +++ b/src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs @@ -4,9 +4,10 @@ public abstract class HangfireBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, IBuildJobService buildJobService, ILogger logger -) : HangfireBuildJob(platformService, engines, lockFactory, buildJobService, logger) +) : HangfireBuildJob(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger) { public virtual Task RunAsync( string engineId, @@ -23,6 +24,7 @@ public abstract class HangfireBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, IBuildJobService buildJobService, ILogger> logger ) @@ -30,6 +32,7 @@ ILogger> logger protected IPlatformService PlatformService { get; } = platformService; protected IRepository Engines { get; } = engines; protected IDistributedReaderWriterLockFactory LockFactory { get; } = lockFactory; + protected IDataAccessContext DataAccessContext { get; } = dataAccessContext; protected IBuildJobService BuildJobService { get; } = buildJobService; protected ILogger> Logger { get; } = logger; @@ -69,12 +72,18 @@ CancellationToken cancellationToken completionStatus = JobCompletionStatus.Canceled; await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None)) { - await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None); - await BuildJobService.BuildJobFinishedAsync( - engineId, - buildId, - buildComplete: false, - CancellationToken.None + await DataAccessContext.WithTransactionAsync( + async (ct) => + { + await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None); + await BuildJobService.BuildJobFinishedAsync( + engineId, + buildId, + buildComplete: false, + CancellationToken.None + ); + }, + cancellationToken: CancellationToken.None ); } Logger.LogInformation("Build canceled ({0})", buildId); @@ -86,8 +95,14 @@ await BuildJobService.BuildJobFinishedAsync( completionStatus = JobCompletionStatus.Restarting; await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None)) { - await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None); - await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None); + await DataAccessContext.WithTransactionAsync( + async (ct) => + { + await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None); + await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None); + }, + cancellationToken: CancellationToken.None + ); } throw; } @@ -101,12 +116,18 @@ await BuildJobService.BuildJobFinishedAsync( completionStatus = JobCompletionStatus.Faulted; await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None)) { - await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None); - await BuildJobService.BuildJobFinishedAsync( - engineId, - buildId, - buildComplete: false, - CancellationToken.None + await DataAccessContext.WithTransactionAsync( + async (ct) => + { + await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None); + await BuildJobService.BuildJobFinishedAsync( + engineId, + buildId, + buildComplete: false, + CancellationToken.None + ); + }, + cancellationToken: CancellationToken.None ); } Logger.LogError(0, e, "Build faulted ({0})", buildId); diff --git a/src/SIL.Machine.AspNetCore/Services/IFileStorage.cs b/src/SIL.Machine.AspNetCore/Services/IFileStorage.cs index 3417cffae..e910a426f 100644 --- a/src/SIL.Machine.AspNetCore/Services/IFileStorage.cs +++ b/src/SIL.Machine.AspNetCore/Services/IFileStorage.cs @@ -16,5 +16,6 @@ Task> ListFilesAsync( Task GetDownloadUrlAsync(string path, DateTime expiresAt, CancellationToken cancellationToken = default); + Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default); Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default); } diff --git a/src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs b/src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs new file mode 100644 index 000000000..997f1efd5 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs @@ -0,0 +1,12 @@ +namespace SIL.Machine.AspNetCore.Services; + +public interface IMessageOutboxService +{ + public Task EnqueueMessageAsync( + T method, + string groupId, + string? requestContent = null, + string? requestContentPath = null, + CancellationToken cancellationToken = default + ); +} diff --git a/src/SIL.Machine.AspNetCore/Services/IOutboxMessageHandler.cs b/src/SIL.Machine.AspNetCore/Services/IOutboxMessageHandler.cs new file mode 100644 index 000000000..a27ade201 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Services/IOutboxMessageHandler.cs @@ -0,0 +1,9 @@ +namespace SIL.Machine.AspNetCore.Services; + +public interface IOutboxMessageHandler +{ + public string Name { get; } + + public Task SendMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default); + public Task CleanupMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default); +} diff --git a/src/SIL.Machine.AspNetCore/Services/IPlatformService.cs b/src/SIL.Machine.AspNetCore/Services/IPlatformService.cs index 163d11517..c27188597 100644 --- a/src/SIL.Machine.AspNetCore/Services/IPlatformService.cs +++ b/src/SIL.Machine.AspNetCore/Services/IPlatformService.cs @@ -22,9 +22,5 @@ Task BuildCompletedAsync( Task BuildFaultedAsync(string buildId, string message, CancellationToken cancellationToken = default); Task BuildRestartingAsync(string buildId, CancellationToken cancellationToken = default); - Task InsertPretranslationsAsync( - string engineId, - IAsyncEnumerable pretranslations, - CancellationToken cancellationToken = default - ); + Task InsertPretranslationsAsync(string engineId, string path, CancellationToken cancellationToken = default); } diff --git a/src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs b/src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs index f082a79c2..ea17ad113 100644 --- a/src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs +++ b/src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs @@ -19,6 +19,7 @@ Task> ListFilesAsync( Task OpenWriteAsync(string path, CancellationToken cancellationToken = default); Task ExistsAsync(string path, CancellationToken cancellationToken = default); + Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default); Task DeleteAsync(string path, CancellationToken cancellationToken = default); } diff --git a/src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs b/src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs index 7deccb6e9..3824bfe0e 100644 --- a/src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs +++ b/src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs @@ -133,6 +133,15 @@ public async Task DeleteAsync(string path, bool recurse, CancellationToken cance } } + public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default) + { + if (!_memoryStreams.TryGetValue(Normalize(sourcePath), out Entry? entry)) + throw new FileNotFoundException($"Unable to find file {sourcePath}"); + _memoryStreams[Normalize(destPath)] = entry; + _memoryStreams.Remove(Normalize(sourcePath), out _); + return Task.CompletedTask; + } + protected override void DisposeManagedResources() { foreach (Entry stream in _memoryStreams.Values) diff --git a/src/SIL.Machine.AspNetCore/Services/LocalStorage.cs b/src/SIL.Machine.AspNetCore/Services/LocalStorage.cs index 38e9049bd..42c573c38 100644 --- a/src/SIL.Machine.AspNetCore/Services/LocalStorage.cs +++ b/src/SIL.Machine.AspNetCore/Services/LocalStorage.cs @@ -59,6 +59,15 @@ public Task OpenWriteAsync(string path, CancellationToken cancellationTo return Task.FromResult(File.OpenWrite(pathUri.LocalPath)); } + public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default) + { + Uri sourcePathUri = new(_basePath, Normalize(sourcePath)); + Uri destPathUri = new(_basePath, Normalize(destPath)); + Directory.CreateDirectory(Path.GetDirectoryName(destPathUri.LocalPath)!); + File.Move(sourcePathUri.LocalPath, destPathUri.LocalPath); + return Task.CompletedTask; + } + public async Task DeleteAsync(string path, bool recurse, CancellationToken cancellationToken = default) { Uri pathUri = new(_basePath, Normalize(path)); diff --git a/src/SIL.Machine.AspNetCore/Services/MessageOutboxDeliveryService.cs b/src/SIL.Machine.AspNetCore/Services/MessageOutboxDeliveryService.cs new file mode 100644 index 000000000..08cd891b3 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Services/MessageOutboxDeliveryService.cs @@ -0,0 +1,142 @@ +namespace SIL.Machine.AspNetCore.Services; + +public class MessageOutboxDeliveryService( + IRepository messages, + IEnumerable outboxMessageHandlers, + MessageOutboxOptions options, + ILogger logger +) : BackgroundService +{ + private readonly IRepository _messages = messages; + private readonly Dictionary _outboxMessageHandlers = + outboxMessageHandlers.ToDictionary(o => o.Name); + + private readonly ILogger _logger = logger; + protected TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10); + protected TimeSpan MessageExpiration { get; set; } = TimeSpan.FromHours(options.MessageExpirationInHours); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + using ISubscription subscription = await _messages.SubscribeAsync(e => true); + while (true) + { + await subscription.WaitForChangeAsync(timeout: Timeout, cancellationToken: stoppingToken); + if (stoppingToken.IsCancellationRequested) + break; + await ProcessMessagesAsync(); + } + } + + protected async Task ProcessMessagesAsync(CancellationToken cancellationToken = default) + { + bool anyMessages = await _messages.ExistsAsync(m => true); + if (!anyMessages) + return; + + IReadOnlyList messages = await _messages.GetAllAsync(); + + IEnumerable> messageGroups = messages.GroupBy( + m => new { m.GroupId, m.OutboxName }, + m => m, + (key, element) => element.OrderBy(m => m.Index).ToList() + ); + + foreach (List messageGroup in messageGroups) + { + bool abortMessageGroup = false; + var outboxMessageHandler = _outboxMessageHandlers[messageGroup.First().OutboxName]; + foreach (OutboxMessage message in messageGroup) + { + try + { + await ProcessGroupMessagesAsync(message, outboxMessageHandler, cancellationToken); + } + catch (RpcException e) + { + switch (e.StatusCode) + { + case StatusCode.Unavailable: + case StatusCode.Unauthenticated: + case StatusCode.PermissionDenied: + case StatusCode.Cancelled: + _logger.LogWarning(e, "Platform Message sending failure: {statusCode}", e.StatusCode); + return; + case StatusCode.Aborted: + case StatusCode.DeadlineExceeded: + case StatusCode.Internal: + case StatusCode.ResourceExhausted: + case StatusCode.Unknown: + abortMessageGroup = !await CheckIfFinalMessageAttempt(message, e); + break; + case StatusCode.InvalidArgument: + default: + // log error + await PermanentlyFailedMessage(message, e); + break; + } + } + catch (Exception e) + { + await PermanentlyFailedMessage(message, e); + break; + } + if (abortMessageGroup) + break; + } + } + } + + async Task ProcessGroupMessagesAsync( + OutboxMessage message, + IOutboxMessageHandler outboxMessageHandler, + CancellationToken cancellationToken = default + ) + { + await outboxMessageHandler.SendMessageAsync(message, cancellationToken); + await _messages.DeleteAsync(message.Id); + await outboxMessageHandler.CleanupMessageAsync(message, cancellationToken); + } + + async Task CheckIfFinalMessageAttempt(OutboxMessage message, Exception e) + { + if (message.Created < DateTimeOffset.UtcNow.Subtract(MessageExpiration)) + { + await PermanentlyFailedMessage(message, e); + return true; + } + else + { + await LogFailedAttempt(message, e); + return false; + } + } + + async Task PermanentlyFailedMessage(OutboxMessage message, Exception e) + { + // log error + _logger.LogError( + e, + "Permanently failed to process message {message.Id}: {message.Method} with content {message.RequestContent} and error message: {e.Message}", + message.Id, + message.Method, + message.RequestContent, + e.Message + ); + await _messages.DeleteAsync(message.Id); + } + + async Task LogFailedAttempt(OutboxMessage message, Exception e) + { + // log error + await _messages.UpdateAsync(m => m.Id == message.Id, b => b.Inc(m => m.Attempts, 1)); + _logger.LogError( + e, + "Attempt {message.Attempts}. Failed to process message {message.Id}: {message.Method} with content {message.RequestContent} and error message: {e.Message}", + message.Attempts + 1, + message.Id, + message.Method, + message.RequestContent, + e.Message + ); + } +} diff --git a/src/SIL.Machine.AspNetCore/Services/MessageOutboxService.cs b/src/SIL.Machine.AspNetCore/Services/MessageOutboxService.cs new file mode 100644 index 000000000..158077af5 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Services/MessageOutboxService.cs @@ -0,0 +1,52 @@ +using MongoDB.Bson; + +namespace SIL.Machine.AspNetCore.Services; + +public class MessageOutboxService( + IRepository messageIndexes, + IRepository messages, + ISharedFileService sharedFileService +) : IMessageOutboxService +{ + private readonly IRepository _messageIndex = messageIndexes; + private readonly IRepository _messages = messages; + private readonly ISharedFileService _sharedFileService = sharedFileService; + protected int MaxDocumentSize { get; set; } = 1_000_000; + + public async Task EnqueueMessageAsync( + T method, + string groupId, + string? requestContent = null, + string? requestContentPath = null, + CancellationToken cancellationToken = default + ) + { + if (requestContent == null && requestContentPath == null) + { + throw new ArgumentException("Either requestContent or contentPath must be specified."); + } + if (requestContent is not null && requestContent.Length > MaxDocumentSize) + { + throw new ArgumentException( + $"The content is too large for request {method} with group ID {groupId}. " + + $"It is {requestContent.Length} bytes, but the maximum is {MaxDocumentSize} bytes." + ); + } + Outbox outbox = await Outbox.GetOutboxNextIndexAsync(_messageIndex, typeof(T).ToString(), cancellationToken); + OutboxMessage outboxMessage = new OutboxMessage + { + Id = ObjectId.GenerateNewId().ToString(), + Index = outbox.CurrentIndex, + OutboxName = typeof(T).ToString(), + Method = method?.ToString() ?? throw new ArgumentNullException(nameof(method)), + GroupId = groupId, + RequestContent = requestContent + }; + if (requestContentPath != null) + { + await _sharedFileService.MoveAsync(requestContentPath, $"outbox/{outboxMessage.Id}", cancellationToken); + } + await _messages.InsertAsync(outboxMessage, cancellationToken: cancellationToken); + return outboxMessage.Id; + } +} diff --git a/src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs b/src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs index 28af9ee9a..62d32e5d1 100644 --- a/src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs +++ b/src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs @@ -181,12 +181,16 @@ public bool IsLanguageNativeToModel(string language, out string internalCode) private async Task CancelBuildJobAsync(string engineId, CancellationToken cancellationToken) { - (string? buildId, BuildJobState jobState) = await _buildJobService.CancelBuildJobAsync( - engineId, - cancellationToken + string? buildId = null; + await _dataAccessContext.WithTransactionAsync( + async (ct) => + { + (buildId, BuildJobState jobState) = await _buildJobService.CancelBuildJobAsync(engineId, ct); + if (buildId is not null && jobState is BuildJobState.None) + await _platformService.BuildCanceledAsync(buildId, CancellationToken.None); + }, + cancellationToken: cancellationToken ); - if (buildId is not null && jobState is BuildJobState.None) - await _platformService.BuildCanceledAsync(buildId, CancellationToken.None); return buildId is not null; } diff --git a/src/SIL.Machine.AspNetCore/Services/NmtPreprocessBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/NmtPreprocessBuildJob.cs index 5ba4d99d1..64c51589a 100644 --- a/src/SIL.Machine.AspNetCore/Services/NmtPreprocessBuildJob.cs +++ b/src/SIL.Machine.AspNetCore/Services/NmtPreprocessBuildJob.cs @@ -4,12 +4,23 @@ public class NmtPreprocessBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, ILogger logger, IBuildJobService buildJobService, ISharedFileService sharedFileService, ICorpusService corpusService, ILanguageTagService languageTagService -) : PreprocessBuildJob(platformService, engines, lockFactory, logger, buildJobService, sharedFileService, corpusService) +) + : PreprocessBuildJob( + platformService, + engines, + lockFactory, + dataAccessContext, + logger, + buildJobService, + sharedFileService, + corpusService + ) { private readonly ILanguageTagService _languageTagService = languageTagService; diff --git a/src/SIL.Machine.AspNetCore/Services/PostprocessBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/PostprocessBuildJob.cs index 1bf7a4389..16e35d14b 100644 --- a/src/SIL.Machine.AspNetCore/Services/PostprocessBuildJob.cs +++ b/src/SIL.Machine.AspNetCore/Services/PostprocessBuildJob.cs @@ -4,14 +4,12 @@ public class PostprocessBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, IBuildJobService buildJobService, ILogger logger, ISharedFileService sharedFileService -) : HangfireBuildJob<(int, double)>(platformService, engines, lockFactory, buildJobService, logger) +) : HangfireBuildJob<(int, double)>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger) { - private static readonly JsonSerializerOptions JsonSerializerOptions = - new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; - protected ISharedFileService SharedFileService { get; } = sharedFileService; protected override async Task DoWorkAsync( @@ -25,19 +23,33 @@ CancellationToken cancellationToken { (int corpusSize, double confidence) = data; - // The MT job has successfully completed, so insert the generated pretranslations into the database. - await InsertPretranslationsAsync(engineId, buildId, cancellationToken); + await PlatformService.InsertPretranslationsAsync( + engineId, + $"builds/{buildId}/pretranslate.trg.json", + cancellationToken + ); await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None)) { - int additionalCorpusSize = await SaveModelAsync(engineId, buildId); - await PlatformService.BuildCompletedAsync( - buildId, - corpusSize + additionalCorpusSize, - Math.Round(confidence, 2, MidpointRounding.AwayFromZero), - CancellationToken.None + await DataAccessContext.WithTransactionAsync( + async (ct) => + { + int additionalCorpusSize = await SaveModelAsync(engineId, buildId); + await PlatformService.BuildCompletedAsync( + buildId, + corpusSize + additionalCorpusSize, + Math.Round(confidence, 2, MidpointRounding.AwayFromZero), + CancellationToken.None + ); + await BuildJobService.BuildJobFinishedAsync( + engineId, + buildId, + buildComplete: true, + CancellationToken.None + ); + }, + cancellationToken: CancellationToken.None ); - await BuildJobService.BuildJobFinishedAsync(engineId, buildId, buildComplete: true, CancellationToken.None); } Logger.LogInformation("Build completed ({0}).", buildId); @@ -69,26 +81,4 @@ JobCompletionStatus completionStatus Logger.LogWarning(e, "Unable to to delete job data for build {0}.", buildId); } } - - protected async Task InsertPretranslationsAsync( - string engineId, - string buildId, - CancellationToken cancellationToken - ) - { - await using Stream targetPretranslateStream = await SharedFileService.OpenReadAsync( - $"builds/{buildId}/pretranslate.trg.json", - cancellationToken - ); - - IAsyncEnumerable pretranslations = JsonSerializer - .DeserializeAsyncEnumerable( - targetPretranslateStream, - JsonSerializerOptions, - cancellationToken - ) - .OfType(); - - await PlatformService.InsertPretranslationsAsync(engineId, pretranslations, cancellationToken); - } } diff --git a/src/SIL.Machine.AspNetCore/Services/PreprocessBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/PreprocessBuildJob.cs index 37d8535de..3f07ecbe1 100644 --- a/src/SIL.Machine.AspNetCore/Services/PreprocessBuildJob.cs +++ b/src/SIL.Machine.AspNetCore/Services/PreprocessBuildJob.cs @@ -15,12 +15,13 @@ public PreprocessBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, ILogger logger, IBuildJobService buildJobService, ISharedFileService sharedFileService, ICorpusService corpusService ) - : base(platformService, engines, lockFactory, buildJobService, logger) + : base(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger) { _sharedFileService = sharedFileService; _corpusService = corpusService; diff --git a/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs b/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs index a9d265e9c..7466fe5cc 100644 --- a/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs +++ b/src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs @@ -111,6 +111,27 @@ public async Task OpenWriteAsync(string path, CancellationToken cancella ); } + public async Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default) + { + CopyObjectRequest copyRequest = + new() + { + SourceBucket = _bucketName, + SourceKey = _basePath + Normalize(sourcePath), + DestinationBucket = _bucketName, + DestinationKey = _basePath + Normalize(destPath) + }; + CopyObjectResponse copyResponse = await _client.CopyObjectAsync(copyRequest, cancellationToken); + if (!copyResponse.HttpStatusCode.Equals(HttpStatusCode.OK)) + { + throw new HttpRequestException( + $"Received status code {copyResponse.HttpStatusCode} when attempting to copy {sourcePath} to {destPath}" + ); + } + + await DeleteAsync(sourcePath, cancellationToken: cancellationToken); + } + public async Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default) { DeleteObjectRequest request = new() { BucketName = _bucketName, Key = _basePath + Normalize(path) }; diff --git a/src/SIL.Machine.AspNetCore/Services/ServalPlatformOutboxHandler.cs b/src/SIL.Machine.AspNetCore/Services/ServalPlatformOutboxHandler.cs new file mode 100644 index 000000000..4d1b2636b --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Services/ServalPlatformOutboxHandler.cs @@ -0,0 +1,119 @@ +using Serval.Translation.V1; + +namespace SIL.Machine.AspNetCore.Services; + +public enum ServalPlatformMessageMethod +{ + BuildStarted, + BuildCompleted, + BuildCanceled, + BuildFaulted, + BuildRestarting, + InsertPretranslations, + IncrementTranslationEngineCorpusSize +} + +public class ServalPlatformOutboxHandler( + TranslationPlatformApi.TranslationPlatformApiClient client, + ISharedFileService sharedFileService, + ILogger logger +) : IOutboxMessageHandler +{ + private readonly TranslationPlatformApi.TranslationPlatformApiClient _client = client; + private readonly ISharedFileService _sharedFileService = sharedFileService; + private readonly ILogger _logger = logger; + private static readonly JsonSerializerOptions JsonSerializerOptions = + new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; + + private readonly string _name = typeof(ServalPlatformMessageMethod).ToString(); + public string Name => _name; + + public async Task SendMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default) + { + ServalPlatformMessageMethod messageType = Enum.Parse(message.Method); + switch (messageType) + { + case ServalPlatformMessageMethod.BuildStarted: + await _client.BuildStartedAsync( + JsonSerializer.Deserialize(message.RequestContent!), + cancellationToken: cancellationToken + ); + break; + case ServalPlatformMessageMethod.BuildCompleted: + await _client.BuildCompletedAsync( + JsonSerializer.Deserialize(message.RequestContent!), + cancellationToken: cancellationToken + ); + break; + case ServalPlatformMessageMethod.BuildCanceled: + await _client.BuildCanceledAsync( + JsonSerializer.Deserialize(message.RequestContent!), + cancellationToken: cancellationToken + ); + break; + case ServalPlatformMessageMethod.BuildFaulted: + await _client.BuildFaultedAsync( + JsonSerializer.Deserialize(message.RequestContent!), + cancellationToken: cancellationToken + ); + break; + case ServalPlatformMessageMethod.BuildRestarting: + await _client.BuildRestartingAsync( + JsonSerializer.Deserialize(message.RequestContent!), + cancellationToken: cancellationToken + ); + break; + case ServalPlatformMessageMethod.InsertPretranslations: + + { + Stream targetPretranslateStream = await _sharedFileService.OpenReadAsync( + $"outbox/{message.Id}", + cancellationToken + ); + IAsyncEnumerable pretranslations = JsonSerializer + .DeserializeAsyncEnumerable( + targetPretranslateStream, + JsonSerializerOptions, + cancellationToken + ) + .OfType(); + IAsyncEnumerable requests = pretranslations.Select( + p => new InsertPretranslationRequest + { + EngineId = message.RequestContent!, + CorpusId = p.CorpusId, + TextId = p.TextId, + Refs = { p.Refs }, + Translation = p.Translation + } + ); + + using var call = _client.InsertPretranslations(cancellationToken: cancellationToken); + await foreach (var request in requests) + { + await call.RequestStream.WriteAsync(request, cancellationToken: cancellationToken); + } + await call.RequestStream.CompleteAsync(); + } + break; + case ServalPlatformMessageMethod.IncrementTranslationEngineCorpusSize: + await _client.IncrementTranslationEngineCorpusSizeAsync( + JsonSerializer.Deserialize(message.RequestContent!), + cancellationToken: cancellationToken + ); + break; + default: + _logger.LogWarning( + "Unknown method: {message.Method}. Deleting the message from the list.", + message.Method.ToString() + ); + break; + } + } + + public async Task CleanupMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default) + { + if (await _sharedFileService.ExistsAsync($"outbox/{message.Id}", cancellationToken)) + await _sharedFileService.DeleteAsync($"outbox/{message.Id}", cancellationToken); + } +} diff --git a/src/SIL.Machine.AspNetCore/Services/ServalPlatformService.cs b/src/SIL.Machine.AspNetCore/Services/ServalPlatformService.cs index a882a234b..ad89471c6 100644 --- a/src/SIL.Machine.AspNetCore/Services/ServalPlatformService.cs +++ b/src/SIL.Machine.AspNetCore/Services/ServalPlatformService.cs @@ -2,14 +2,20 @@ namespace SIL.Machine.AspNetCore.Services; -public class ServalPlatformService(TranslationPlatformApi.TranslationPlatformApiClient client) : IPlatformService +public class ServalPlatformService( + TranslationPlatformApi.TranslationPlatformApiClient client, + IMessageOutboxService outboxService +) : IPlatformService { private readonly TranslationPlatformApi.TranslationPlatformApiClient _client = client; + private readonly IMessageOutboxService _outboxService = outboxService; public async Task BuildStartedAsync(string buildId, CancellationToken cancellationToken = default) { - await _client.BuildStartedAsync( - new BuildStartedRequest { BuildId = buildId }, + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.BuildStarted, + buildId, + JsonSerializer.Serialize(new BuildStartedRequest { BuildId = buildId }), cancellationToken: cancellationToken ); } @@ -21,37 +27,47 @@ public async Task BuildCompletedAsync( CancellationToken cancellationToken = default ) { - await _client.BuildCompletedAsync( - new BuildCompletedRequest - { - BuildId = buildId, - CorpusSize = trainSize, - Confidence = confidence - }, + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.BuildCompleted, + buildId, + JsonSerializer.Serialize( + new BuildCompletedRequest + { + BuildId = buildId, + CorpusSize = trainSize, + Confidence = confidence + } + ), cancellationToken: cancellationToken ); } public async Task BuildCanceledAsync(string buildId, CancellationToken cancellationToken = default) { - await _client.BuildCanceledAsync( - new BuildCanceledRequest { BuildId = buildId }, + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.BuildCanceled, + buildId, + JsonSerializer.Serialize(new BuildCanceledRequest { BuildId = buildId }), cancellationToken: cancellationToken ); } public async Task BuildFaultedAsync(string buildId, string message, CancellationToken cancellationToken = default) { - await _client.BuildFaultedAsync( - new BuildFaultedRequest { BuildId = buildId, Message = message }, + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.BuildFaulted, + buildId, + JsonSerializer.Serialize(new BuildFaultedRequest { BuildId = buildId, Message = message }), cancellationToken: cancellationToken ); } public async Task BuildRestartingAsync(string buildId, CancellationToken cancellationToken = default) { - await _client.BuildRestartingAsync( - new BuildRestartingRequest { BuildId = buildId }, + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.BuildRestarting, + buildId, + JsonSerializer.Serialize(new BuildRestartingRequest { BuildId = buildId }), cancellationToken: cancellationToken ); } @@ -71,11 +87,13 @@ public async Task UpdateBuildStatusAsync( if (queueDepth is not null) request.QueueDepth = queueDepth.Value; + // just try to send it - if it fails, it fails. await _client.UpdateBuildStatusAsync(request, cancellationToken: cancellationToken); } public async Task UpdateBuildStatusAsync(string buildId, int step, CancellationToken cancellationToken = default) { + // just try to send it - if it fails, it fails. await _client.UpdateBuildStatusAsync( new UpdateBuildStatusRequest { BuildId = buildId, Step = step }, cancellationToken: cancellationToken @@ -84,27 +102,17 @@ await _client.UpdateBuildStatusAsync( public async Task InsertPretranslationsAsync( string engineId, - IAsyncEnumerable pretranslations, + string path, CancellationToken cancellationToken = default ) { - using var call = _client.InsertPretranslations(cancellationToken: cancellationToken); - await foreach (Pretranslation? pretranslation in pretranslations) - { - await call.RequestStream.WriteAsync( - new InsertPretranslationRequest - { - EngineId = engineId, - CorpusId = pretranslation.CorpusId, - TextId = pretranslation.TextId, - Refs = { pretranslation.Refs }, - Translation = pretranslation.Translation - }, - cancellationToken - ); - } - await call.RequestStream.CompleteAsync(); - await call; + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.InsertPretranslations, + engineId, + requestContent: engineId, + requestContentPath: path, + cancellationToken: cancellationToken + ); } public async Task IncrementTrainSizeAsync( @@ -113,8 +121,12 @@ public async Task IncrementTrainSizeAsync( CancellationToken cancellationToken = default ) { - await _client.IncrementTranslationEngineCorpusSizeAsync( - new IncrementTranslationEngineCorpusSizeRequest { EngineId = engineId, Count = count }, + await _outboxService.EnqueueMessageAsync( + ServalPlatformMessageMethod.IncrementTranslationEngineCorpusSize, + engineId, + JsonSerializer.Serialize( + new IncrementTranslationEngineCorpusSizeRequest { EngineId = engineId, Count = count } + ), cancellationToken: cancellationToken ); } diff --git a/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs b/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs index b4244211e..f09b4951c 100644 --- a/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs +++ b/src/SIL.Machine.AspNetCore/Services/SharedFileService.cs @@ -101,4 +101,9 @@ public Task ExistsAsync(string path, CancellationToken cancellationToken = { return _fileStorage.ExistsAsync(path, cancellationToken); } + + public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default) + { + return _fileStorage.MoveAsync(sourcePath, destPath, cancellationToken); + } } diff --git a/src/SIL.Machine.AspNetCore/Services/SmtTransferBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/SmtTransferBuildJob.cs new file mode 100644 index 000000000..21f16b324 --- /dev/null +++ b/src/SIL.Machine.AspNetCore/Services/SmtTransferBuildJob.cs @@ -0,0 +1,157 @@ +namespace SIL.Machine.AspNetCore.Services; + +public class SmtTransferBuildJob( + IPlatformService platformService, + IRepository engines, + IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, + IBuildJobService buildJobService, + ILogger logger, + IRepository trainSegmentPairs, + ITruecaserFactory truecaserFactory, + ISmtModelFactory smtModelFactory, + ICorpusService corpusService +) + : HangfireBuildJob>( + platformService, + engines, + lockFactory, + dataAccessContext, + buildJobService, + logger + ) +{ + private readonly IRepository _trainSegmentPairs = trainSegmentPairs; + private readonly ITruecaserFactory _truecaserFactory = truecaserFactory; + private readonly ISmtModelFactory _smtModelFactory = smtModelFactory; + private readonly ICorpusService _corpusService = corpusService; + + protected override Task InitializeAsync( + string engineId, + string buildId, + IReadOnlyList data, + IDistributedReaderWriterLock @lock, + CancellationToken cancellationToken + ) + { + return _trainSegmentPairs.DeleteAllAsync(p => p.TranslationEngineRef == engineId, cancellationToken); + } + + protected override async Task DoWorkAsync( + string engineId, + string buildId, + IReadOnlyList data, + string? buildOptions, + IDistributedReaderWriterLock @lock, + CancellationToken cancellationToken + ) + { + await PlatformService.BuildStartedAsync(buildId, cancellationToken); + Logger.LogInformation("Build started ({0})", buildId); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + cancellationToken.ThrowIfCancellationRequested(); + + JsonObject? buildOptionsObject = null; + if (buildOptions is not null) + { + buildOptionsObject = JsonSerializer.Deserialize(buildOptions); + } + + var targetCorpora = new List(); + var parallelCorpora = new List(); + foreach (Corpus corpus in data) + { + ITextCorpus? sourceTextCorpus = _corpusService.CreateTextCorpora(corpus.SourceFiles).FirstOrDefault(); + ITextCorpus? targetTextCorpus = _corpusService.CreateTextCorpora(corpus.TargetFiles).FirstOrDefault(); + if (sourceTextCorpus is null || targetTextCorpus is null) + continue; + + targetCorpora.Add(targetTextCorpus); + parallelCorpora.Add(sourceTextCorpus.AlignRows(targetTextCorpus)); + + if ((bool?)buildOptionsObject?["use_key_terms"] ?? true) + { + ITextCorpus? sourceTermCorpus = _corpusService.CreateTermCorpora(corpus.SourceFiles).FirstOrDefault(); + ITextCorpus? targetTermCorpus = _corpusService.CreateTermCorpora(corpus.TargetFiles).FirstOrDefault(); + if (sourceTermCorpus is not null && targetTermCorpus is not null) + { + IParallelTextCorpus parallelKeyTermsCorpus = sourceTermCorpus.AlignRows(targetTermCorpus); + parallelCorpora.Add(parallelKeyTermsCorpus); + } + } + } + + IParallelTextCorpus parallelCorpus = parallelCorpora.Flatten(); + ITextCorpus targetCorpus = targetCorpora.Flatten(); + + var tokenizer = new LatinWordTokenizer(); + var detokenizer = new LatinWordDetokenizer(); + + using ITrainer smtModelTrainer = await _smtModelFactory.CreateTrainerAsync(engineId, tokenizer, parallelCorpus); + using ITrainer truecaseTrainer = await _truecaserFactory.CreateTrainerAsync(engineId, tokenizer, targetCorpus); + + cancellationToken.ThrowIfCancellationRequested(); + + var progress = new BuildProgress(PlatformService, buildId); + await smtModelTrainer.TrainAsync(progress, cancellationToken); + await truecaseTrainer.TrainAsync(cancellationToken: cancellationToken); + + TranslationEngine? engine = await Engines.GetAsync(e => e.EngineId == engineId, cancellationToken); + if (engine is null) + throw new OperationCanceledException(); + + await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + await smtModelTrainer.SaveAsync(CancellationToken.None); + await truecaseTrainer.SaveAsync(CancellationToken.None); + ITruecaser truecaser = await _truecaserFactory.CreateAsync(engineId); + IReadOnlyList segmentPairs = await _trainSegmentPairs.GetAllAsync( + p => p.TranslationEngineRef == engine.Id, + CancellationToken.None + ); + using ( + IInteractiveTranslationModel smtModel = await _smtModelFactory.CreateAsync( + engineId, + tokenizer, + detokenizer, + truecaser + ) + ) + { + foreach (TrainSegmentPair segmentPair in segmentPairs) + { + await smtModel.TrainSegmentAsync( + segmentPair.Source, + segmentPair.Target, + cancellationToken: CancellationToken.None + ); + } + } + + await DataAccessContext.WithTransactionAsync( + async (ct) => + { + await PlatformService.BuildCompletedAsync( + buildId, + smtModelTrainer.Stats.TrainCorpusSize + segmentPairs.Count, + smtModelTrainer.Stats.Metrics["bleu"] * 100.0, + CancellationToken.None + ); + await BuildJobService.BuildJobFinishedAsync( + engineId, + buildId, + buildComplete: true, + CancellationToken.None + ); + }, + cancellationToken: CancellationToken.None + ); + } + + stopwatch.Stop(); + Logger.LogInformation("Build completed in {0}s ({1})", stopwatch.Elapsed.TotalSeconds, buildId); + } +} diff --git a/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs b/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs index 941731cd8..c00910487 100644 --- a/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs +++ b/src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs @@ -150,10 +150,10 @@ async Task TrainSubroutineAsync(SmtTransferEngineState state, CancellationToken } SmtTransferEngineState state = _stateService.Get(engineId); - if (engine.CurrentBuild?.JobState is BuildJobState.Active) - { - await _dataAccessContext.WithTransactionAsync( - async (ct) => + await _dataAccessContext.WithTransactionAsync( + async (ct) => + { + if (engine.CurrentBuild?.JobState is BuildJobState.Active) { await _trainSegmentPairs.InsertAsync( new TrainSegmentPair @@ -163,17 +163,17 @@ await _trainSegmentPairs.InsertAsync( Target = targetSegment, SentenceStart = sentenceStart }, - ct + CancellationToken.None ); + await TrainSubroutineAsync(state, CancellationToken.None); + } + else + { await TrainSubroutineAsync(state, ct); - }, - cancellationToken: CancellationToken.None - ); - } - else - { - await TrainSubroutineAsync(state, cancellationToken); - } + } + }, + cancellationToken: cancellationToken + ); state.IsUpdated = true; state.LastUsedTime = DateTime.Now; @@ -233,12 +233,16 @@ public bool IsLanguageNativeToModel(string language, out string internalCode) private async Task CancelBuildJobAsync(string engineId, CancellationToken cancellationToken) { - (string? buildId, BuildJobState jobState) = await _buildJobService.CancelBuildJobAsync( - engineId, - cancellationToken + string? buildId = null; + await _dataAccessContext.WithTransactionAsync( + async (ct) => + { + (buildId, BuildJobState jobState) = await _buildJobService.CancelBuildJobAsync(engineId, ct); + if (buildId is not null && jobState is BuildJobState.None) + await _platformService.BuildCanceledAsync(buildId, CancellationToken.None); + }, + cancellationToken: cancellationToken ); - if (buildId is not null && jobState is BuildJobState.None) - await _platformService.BuildCanceledAsync(buildId, CancellationToken.None); return buildId is not null; } diff --git a/src/SIL.Machine.AspNetCore/Services/SmtTransferPostprocessBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/SmtTransferPostprocessBuildJob.cs index 2065fe221..f2b6b438b 100644 --- a/src/SIL.Machine.AspNetCore/Services/SmtTransferPostprocessBuildJob.cs +++ b/src/SIL.Machine.AspNetCore/Services/SmtTransferPostprocessBuildJob.cs @@ -4,6 +4,7 @@ public class SmtTransferPostprocessBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, IBuildJobService buildJobService, ILogger logger, ISharedFileService sharedFileService, @@ -11,7 +12,16 @@ public class SmtTransferPostprocessBuildJob( ISmtModelFactory smtModelFactory, ITruecaserFactory truecaserFactory, IOptionsMonitor options -) : PostprocessBuildJob(platformService, engines, lockFactory, buildJobService, logger, sharedFileService) +) + : PostprocessBuildJob( + platformService, + engines, + lockFactory, + dataAccessContext, + buildJobService, + logger, + sharedFileService + ) { private readonly ISmtModelFactory _smtModelFactory = smtModelFactory; private readonly ITruecaserFactory _truecaserFactory = truecaserFactory; diff --git a/src/SIL.Machine.AspNetCore/Services/SmtTransferTrainBuildJob.cs b/src/SIL.Machine.AspNetCore/Services/SmtTransferTrainBuildJob.cs index 5946b50a4..8f2640c3b 100644 --- a/src/SIL.Machine.AspNetCore/Services/SmtTransferTrainBuildJob.cs +++ b/src/SIL.Machine.AspNetCore/Services/SmtTransferTrainBuildJob.cs @@ -4,13 +4,14 @@ public class SmtTransferTrainBuildJob( IPlatformService platformService, IRepository engines, IDistributedReaderWriterLockFactory lockFactory, + IDataAccessContext dataAccessContext, IBuildJobService buildJobService, ILogger logger, ISharedFileService sharedFileService, ITruecaserFactory truecaserFactory, ISmtModelFactory smtModelFactory, ITransferEngineFactory transferEngineFactory -) : HangfireBuildJob(platformService, engines, lockFactory, buildJobService, logger) +) : HangfireBuildJob(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger) { private static readonly JsonWriterOptions PretranslateWriterOptions = new() { Indented = true }; private static readonly JsonSerializerOptions JsonSerializerOptions = diff --git a/src/SIL.Machine.Serval.EngineServer/Program.cs b/src/SIL.Machine.Serval.EngineServer/Program.cs index 19fae81f9..e5f4d46bb 100644 --- a/src/SIL.Machine.Serval.EngineServer/Program.cs +++ b/src/SIL.Machine.Serval.EngineServer/Program.cs @@ -6,10 +6,10 @@ // Add services to the container. builder .Services.AddMachine(builder.Configuration) + .AddBuildJobService() .AddMongoDataAccess() .AddMongoHangfireJobClient() .AddServalTranslationEngineService() - .AddBuildJobService() .AddModelCleanupService() .AddClearMLService(); diff --git a/src/SIL.Machine.Serval.JobServer/Program.cs b/src/SIL.Machine.Serval.JobServer/Program.cs index 6328c45b2..d78bfed80 100644 --- a/src/SIL.Machine.Serval.JobServer/Program.cs +++ b/src/SIL.Machine.Serval.JobServer/Program.cs @@ -4,11 +4,11 @@ builder .Services.AddMachine(builder.Configuration) + .AddBuildJobService() .AddMongoDataAccess() .AddMongoHangfireJobClient() .AddHangfireJobServer() .AddServalPlatformService() - .AddBuildJobService() .AddClearMLService(); if (builder.Environment.IsDevelopment()) { diff --git a/tests/SIL.Machine.AspNetCore.Tests/SIL.Machine.AspNetCore.Tests.csproj b/tests/SIL.Machine.AspNetCore.Tests/SIL.Machine.AspNetCore.Tests.csproj index 5956eaa0d..f9f221847 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/SIL.Machine.AspNetCore.Tests.csproj +++ b/tests/SIL.Machine.AspNetCore.Tests/SIL.Machine.AspNetCore.Tests.csproj @@ -17,6 +17,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/InMemoryStorageTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/InMemoryStorageTests.cs index 3b5052865..61a0cf3e5 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/Services/InMemoryStorageTests.cs +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/InMemoryStorageTests.cs @@ -88,4 +88,20 @@ public async Task DeleteAsync() var files = await fs.ListFilesAsync("test", recurse: true); Assert.That(files, Is.Empty); } + + [Test] + public async Task MoveAsync() + { + using InMemoryStorage fs = new(); + using (StreamWriter sw = new(await fs.OpenWriteAsync("test1/file1"))) + { + string input = "Hello"; + sw.WriteLine(input); + } + await fs.MoveAsync("test1/file1", "test2/file1"); + var files = await fs.ListFilesAsync("test1", recurse: true); + Assert.That(files, Is.Empty); + files = await fs.ListFilesAsync("test2", recurse: true); + Assert.That(files, Is.EquivalentTo(new[] { "test2/file1" })); + } } diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/LocalStorageTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/LocalStorageTests.cs index 280a54bb1..8478c1ab6 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/Services/LocalStorageTests.cs +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/LocalStorageTests.cs @@ -93,4 +93,21 @@ public async Task DeleteFileAsync() IReadOnlyCollection files = await fs.ListFilesAsync("test", recurse: true); Assert.That(files, Is.Empty); } + + [Test] + public async Task MoveAsync() + { + using var tmpDir = new TempDirectory("test"); + using LocalStorage fs = new(tmpDir.Path); + using (StreamWriter sw = new(await fs.OpenWriteAsync("test1/file1"))) + { + string input = "Hello"; + sw.WriteLine(input); + } + await fs.MoveAsync("test1/file1", "test2/file1"); + var files = await fs.ListFilesAsync("test1", recurse: true); + Assert.That(files, Is.Empty); + files = await fs.ListFilesAsync("test2", recurse: true); + Assert.That(files, Is.EquivalentTo(new[] { "test2/file1" })); + } } diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/MessageOutboxDeliveryServiceTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/MessageOutboxDeliveryServiceTests.cs new file mode 100644 index 000000000..739b65699 --- /dev/null +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/MessageOutboxDeliveryServiceTests.cs @@ -0,0 +1,277 @@ +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using NSubstitute.ExceptionExtensions; +using Serval.Translation.V1; + +namespace SIL.Machine.AspNetCore.Services; + +[TestFixture] +public class MessageOutboxDeliveryServiceTests +{ + [Test] + public async Task SendMessages() + { + var env = new TestEnvironment(); + env.AddStandardMessages(); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + Received.InOrder(() => + { + env.Client.BuildStartedAsync(new BuildStartedRequest { BuildId = "A" }); + env.Client.BuildCompletedAsync(Arg.Any()); + env.Client.BuildStartedAsync(new BuildStartedRequest { BuildId = "B" }); + }); + } + + [Test] + public async Task SendMessages_Timeout() + { + var env = new TestEnvironment(); + env.AddStandardMessages(); + + // Timeout is long enough where the message attempt will be incremented, but not deleted. + env.ClientInternalFailure(); + await Task.Delay(100); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + // Each group should try to send one message + Assert.That((await env.Messages.GetAsync(m => m.Id == "B"))!.Attempts, Is.EqualTo(1)); + Assert.That((await env.Messages.GetAsync(m => m.Id == "A"))!.Attempts, Is.EqualTo(0)); + Assert.That((await env.Messages.GetAsync(m => m.Id == "C"))!.Attempts, Is.EqualTo(1)); + + // with now shorter timeout, the messages will be deleted. + // 4 start build attempts, and only one build completed attempt + env.MessageOutboxDeliveryService.SetMessageExpiration(TimeSpan.FromMilliseconds(1)); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + Assert.That(await env.Messages.ExistsAsync(m => true), Is.False); + var startCalls = env + .Client.ReceivedCalls() + .Count(x => x.GetMethodInfo().Name == nameof(env.Client.BuildStartedAsync)); + Assert.That(startCalls, Is.EqualTo(4)); + var completedCalls = env + .Client.ReceivedCalls() + .Count(x => x.GetMethodInfo().Name == nameof(env.Client.BuildCompletedAsync)); + Assert.That(completedCalls, Is.EqualTo(1)); + } + + [Test] + public async Task SendMessagesUnavailable_Failure() + { + var env = new TestEnvironment(); + env.AddStandardMessages(); + env.ClientUnavailableFailure(); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + // Only the first group should be attempted - but not recorded as attempted + Assert.That((await env.Messages.GetAsync(m => m.Id == "B"))!.Attempts, Is.EqualTo(0)); + Assert.That((await env.Messages.GetAsync(m => m.Id == "A"))!.Attempts, Is.EqualTo(0)); + Assert.That((await env.Messages.GetAsync(m => m.Id == "C"))!.Attempts, Is.EqualTo(0)); + env.ClientInternalFailure(); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + Assert.That((await env.Messages.GetAsync(m => m.Id == "B"))!.Attempts, Is.EqualTo(1)); + Assert.That((await env.Messages.GetAsync(m => m.Id == "A"))!.Attempts, Is.EqualTo(0)); + Assert.That((await env.Messages.GetAsync(m => m.Id == "C"))!.Attempts, Is.EqualTo(1)); + env.ClientNoFailure(); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + Assert.That(await env.Messages.ExistsAsync(m => true), Is.False); + // 1 (unavailable) + 2 (internal) + 3 (success) = 6 calls + Assert.That(env.Client.ReceivedCalls().Count(), Is.EqualTo(6)); + } + + [Test] + public async Task LargeMessageContent() + { + var env = new TestEnvironment(); + // large max document size - message not saved to file + var fileIdC = await env.OutboxService.EnqueueMessageAsync( + method: ServalPlatformMessageMethod.BuildStarted, + groupId: "C", + requestContent: JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "C" }), + cancellationToken: CancellationToken.None + ); + Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdC}"), Is.False); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + // small max document size - throws error + env.OutboxService.SetMaxDocumentSize(1); + Assert.ThrowsAsync( + () => + env.OutboxService.EnqueueMessageAsync( + method: ServalPlatformMessageMethod.BuildStarted, + groupId: "D", + requestContent: JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "D" }), + cancellationToken: CancellationToken.None + ) + ); + } + + [Test] + public async Task PretranslateSaveFile() + { + var env = new TestEnvironment(); + // large max document size - message not saved to file + string pretranslationsPath = "build/C/pretranslations.json"; + using (StreamWriter sw = new(await env.SharedFileService.OpenWriteAsync(pretranslationsPath))) + { + sw.WriteLine("[]"); + } + var fileIdC = await env.OutboxService.EnqueueMessageAsync( + method: ServalPlatformMessageMethod.InsertPretranslations, + groupId: "C", + requestContent: "engineId", + requestContentPath: pretranslationsPath, + cancellationToken: CancellationToken.None + ); + Assert.That(await env.SharedFileService.ExistsAsync(pretranslationsPath), Is.False); + Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdC}"), Is.True); + await env.MessageOutboxDeliveryService.ProcessMessagesOnceAsync(); + Assert.That(await env.SharedFileService.ExistsAsync($"outbox/{fileIdC}"), Is.False); + } + + public class TestMessageOutboxDeliveryService( + IRepository messages, + IEnumerable outboxMessageHandlers, + ILogger logger + ) : MessageOutboxDeliveryService(messages, outboxMessageHandlers, new MessageOutboxOptions(), logger) + { + public async Task ProcessMessagesOnceAsync() => await ProcessMessagesAsync(); + + public void SetMessageExpiration(TimeSpan messageExpiration) => MessageExpiration = messageExpiration; + } + + public class TestMessageOutboxService( + IRepository messageIndexes, + IRepository messages, + ISharedFileService sharedFileService + ) : MessageOutboxService(messageIndexes, messages, sharedFileService) + { + public void SetMaxDocumentSize(int maxDocumentSize) => MaxDocumentSize = maxDocumentSize; + } + + private class TestEnvironment : ObjectModel.DisposableBase + { + public MemoryRepository MessageIndexes { get; } + public MemoryRepository Messages { get; } + public TestMessageOutboxService OutboxService { get; } + public ISharedFileService SharedFileService { get; } + public TranslationPlatformApi.TranslationPlatformApiClient Client { get; } + public TestMessageOutboxDeliveryService MessageOutboxDeliveryService { get; } + public AsyncClientStreamingCall InsertPretranslationsCall { get; } + + public TestEnvironment() + { + MessageIndexes = new MemoryRepository(); + Messages = new MemoryRepository(); + SharedFileService = new SharedFileService(Substitute.For()); + OutboxService = new TestMessageOutboxService(MessageIndexes, Messages, SharedFileService); + + InsertPretranslationsCall = Grpc.Core.Testing.TestCalls.AsyncClientStreamingCall( + Substitute.For>(), + Task.FromResult(new Empty()), + Task.FromResult(new Metadata()), + () => Status.DefaultSuccess, + () => new Metadata(), + () => { } + ); + + Client = Substitute.For(); + ClientNoFailure(); + + MessageOutboxDeliveryService = new TestMessageOutboxDeliveryService( + Messages, + [ + new ServalPlatformOutboxHandler( + Client, + SharedFileService, + Substitute.For>() + ) + ], + Substitute.For>() + ); + } + + public static AsyncUnaryCall GetEmptyUnaryCall() => + new AsyncUnaryCall( + Task.FromResult(new Empty()), + Task.FromResult(new Metadata()), + () => Status.DefaultSuccess, + () => new Metadata(), + () => { } + ); + + public void ClientNoFailure() + { + Client.BuildStartedAsync(Arg.Any()).Returns(GetEmptyUnaryCall()); + Client.BuildCanceledAsync(Arg.Any()).Returns(GetEmptyUnaryCall()); + Client.BuildFaultedAsync(Arg.Any()).Returns(GetEmptyUnaryCall()); + Client.BuildCompletedAsync(Arg.Any()).Returns(GetEmptyUnaryCall()); + Client + .IncrementTranslationEngineCorpusSizeAsync(Arg.Any()) + .Returns(GetEmptyUnaryCall()); + Client + .InsertPretranslations(cancellationToken: Arg.Any()) + .Returns(InsertPretranslationsCall); + } + + public void ClientInternalFailure() + { + Client + .BuildStartedAsync(Arg.Any()) + .Throws(new RpcException(new Status(StatusCode.Internal, ""))); + Client + .BuildCompletedAsync(Arg.Any()) + .Throws(new RpcException(new Status(StatusCode.Internal, ""))); + } + + public void ClientUnavailableFailure() + { + Client + .BuildStartedAsync(Arg.Any()) + .Throws(new RpcException(new Status(StatusCode.Unavailable, ""))); + Client + .BuildCompletedAsync(Arg.Any()) + .Throws(new RpcException(new Status(StatusCode.Unavailable, ""))); + } + + public void AddStandardMessages() + { + // messages out of order - will be fixed when retrieved + Messages.Add( + new OutboxMessage + { + Id = "A", + Index = 2, + Method = ServalPlatformMessageMethod.BuildCompleted.ToString(), + GroupId = "A", + OutboxName = typeof(ServalPlatformMessageMethod).ToString(), + RequestContent = JsonSerializer.Serialize( + new BuildCompletedRequest + { + BuildId = "A", + CorpusSize = 100, + Confidence = 0.5 + } + ) + } + ); + Messages.Add( + new OutboxMessage + { + Id = "B", + Index = 1, + Method = ServalPlatformMessageMethod.BuildStarted.ToString(), + OutboxName = typeof(ServalPlatformMessageMethod).ToString(), + GroupId = "A", + RequestContent = JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "A" }) + } + ); + Messages.Add( + new OutboxMessage + { + Id = "C", + Index = 3, + Method = ServalPlatformMessageMethod.BuildStarted.ToString(), + OutboxName = typeof(ServalPlatformMessageMethod).ToString(), + GroupId = "B", + RequestContent = JsonSerializer.Serialize(new BuildStartedRequest { BuildId = "B" }) + } + ); + } + } +} diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/NmtEngineServiceTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/NmtEngineServiceTests.cs index 33d25da3b..ed6087634 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/Services/NmtEngineServiceTests.cs +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/NmtEngineServiceTests.cs @@ -169,6 +169,7 @@ public TestEnvironment() Substitute.For(), ClearMLService, SharedFileService, + new MemoryDataAccessContext(), clearMLOptions, buildJobOptions, Substitute.For>() @@ -297,6 +298,7 @@ public override object ActivateJob(Type jobType) _env.PlatformService, _env.Engines, _env._lockFactory, + new MemoryDataAccessContext(), Substitute.For>(), _env.BuildJobService, _env.SharedFileService, @@ -310,6 +312,7 @@ public override object ActivateJob(Type jobType) _env.PlatformService, _env.Engines, _env._lockFactory, + new MemoryDataAccessContext(), _env.BuildJobService, Substitute.For>(), _env.SharedFileService diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/PreprocessBuildJobTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/PreprocessBuildJobTests.cs index d082b8fa2..5150cc759 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/Services/PreprocessBuildJobTests.cs +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/PreprocessBuildJobTests.cs @@ -467,6 +467,7 @@ public PreprocessBuildJob GetBuildJob(TranslationEngineType engineType) PlatformService, Engines, LockFactory, + new MemoryDataAccessContext(), Substitute.For>(), BuildJobService, SharedFileService, @@ -483,6 +484,7 @@ public PreprocessBuildJob GetBuildJob(TranslationEngineType engineType) PlatformService, Engines, LockFactory, + new MemoryDataAccessContext(), Substitute.For>(), BuildJobService, SharedFileService, diff --git a/tests/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs b/tests/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs index 40dbed2f5..0d73fc101 100644 --- a/tests/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs +++ b/tests/SIL.Machine.AspNetCore.Tests/Services/SmtTransferEngineServiceTests.cs @@ -301,6 +301,7 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp Substitute.For(), ClearMLService, SharedFileService, + new MemoryDataAccessContext(), clearMLOptions, buildJobOptions, Substitute.For>() @@ -692,6 +693,7 @@ public override object ActivateJob(Type jobType) _env.PlatformService, _env.Engines, _env._lockFactory, + new MemoryDataAccessContext(), Substitute.For>(), _env.BuildJobService, _env.SharedFileService, @@ -709,6 +711,7 @@ public override object ActivateJob(Type jobType) _env.PlatformService, _env.Engines, _env._lockFactory, + new MemoryDataAccessContext(), _env.BuildJobService, Substitute.For>(), _env.SharedFileService, @@ -724,6 +727,7 @@ public override object ActivateJob(Type jobType) _env.PlatformService, _env.Engines, _env._lockFactory, + new MemoryDataAccessContext(), _env.BuildJobService, Substitute.For>(), _env.SharedFileService,