Skip to content

Commit

Permalink
retry hangfire jobs - #158
Browse files Browse the repository at this point in the history
* Fixed auto-retry as per this forum post: https://discuss.hangfire.io/t/recurring-jobs-do-not-automatically-get-retried-after-application-crash-net-core-service/9160
* MongoDB can't handle documents greater than 16MB
* Treat messages from one id as a group
* Kill failing messages over 4 days old
* Make outbox truly generic, handling multiple queues
* Ensure globally ordered outbox messages
* Add "MoveAsync" to SharedStorage
* Refactor saving pretranslations file
  • Loading branch information
johnml1135 committed Jun 27, 2024
1 parent 1011777 commit 6362191
Show file tree
Hide file tree
Showing 37 changed files with 1,142 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public static IMachineBuilder AddClearMLOptions(this IMachineBuilder builder, IC
return builder;
}

public static IMachineBuilder AddMessageOutboxOptions(
this IMachineBuilder builder,
Action<MessageOutboxOptions> configureOptions
)
{
builder.Services.Configure(configureOptions);
return builder;
}

public static IMachineBuilder AddMessageOutboxOptions(this IMachineBuilder builder, IConfiguration config)
{
builder.Services.Configure<MessageOutboxOptions>(config);
return builder;
}

public static IMachineBuilder AddSharedFileOptions(
this IMachineBuilder builder,
Action<SharedFileOptions> configureOptions
Expand Down Expand Up @@ -263,6 +278,8 @@ await c.Indexes.CreateOrUpdateAsync(
)
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
o.AddRepository<Outbox>("outboxes");
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -280,6 +297,11 @@ public static IMachineBuilder AddServalPlatformService(
throw new InvalidOperationException("Serval connection string is required");

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IOutboxMessageHandler, ServalPlatformOutboxHandler>();

builder.Services.AddSingleton<IMessageOutboxService, MessageOutboxService>();

builder
.Services.AddGrpcClient<TranslationPlatformApi.TranslationPlatformApiClient>(o =>
{
Expand Down Expand Up @@ -334,6 +356,9 @@ public static IMachineBuilder AddServalTranslationEngineService(
options.Interceptors.Add<UnimplementedInterceptor>();
});
builder.AddServalPlatformService(connectionString);

builder.Services.AddHostedService<MessageOutboxDeliveryService>();

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSmtTransferEngineOptions(o => { });
builder.AddClearMLOptions(o => { });
builder.AddBuildJobOptions(o => { });
builder.AddMessageOutboxOptions(o => { });
}
else
{
Expand All @@ -36,6 +37,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSmtTransferEngineOptions(configuration.GetSection(SmtTransferEngineOptions.Key));
builder.AddClearMLOptions(configuration.GetSection(ClearMLOptions.Key));
builder.AddBuildJobOptions(configuration.GetSection(BuildJobOptions.Key));
builder.AddMessageOutboxOptions(configuration.GetSection(MessageOutboxOptions.Key));
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace SIL.Machine.AspNetCore.Configuration;

public class MessageOutboxOptions
{
public const string Key = "MessageOutbox";

public int MessageExpirationInHours { get; set; } = 48;
}
28 changes: 28 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/Outbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace SIL.Machine.AspNetCore.Models;

public record Outbox : IEntity
{
public string Id { get; set; } = "";

public int Revision { get; set; }

public string Name { get; set; } = null!;
public int CurrentIndex { get; set; }

public static async Task<Outbox> GetOutboxNextIndexAsync(
IRepository<Outbox> indexRepository,
string outboxName,
CancellationToken cancellationToken
)
{
Outbox outbox = (
await indexRepository.UpdateAsync(
i => i.Name == outboxName,
i => i.Inc(b => b.CurrentIndex, 1),
upsert: true,
cancellationToken: cancellationToken
)
)!;
return outbox;
}
}
14 changes: 14 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace SIL.Machine.AspNetCore.Models;

public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required int Index { get; set; }
public required string OutboxName { get; set; }
public required string Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
public int Attempts { get; set; } = 0;
}
55 changes: 38 additions & 17 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class ClearMLMonitorService(
IServiceProvider services,
IClearMLService clearMLService,
ISharedFileService sharedFileService,
IDataAccessContext dataAccessContext,
IOptionsMonitor<ClearMLOptions> clearMLOptions,
IOptionsMonitor<BuildJobOptions> buildJobOptions,
ILogger<ClearMLMonitorService> logger
Expand All @@ -23,6 +24,7 @@ ILogger<ClearMLMonitorService> logger

private readonly IClearMLService _clearMLService = clearMLService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly ILogger<IClearMLQueueService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

Expand Down Expand Up @@ -225,17 +227,24 @@ private async Task<bool> TrainJobStartedAsync(
CancellationToken cancellationToken = default
)
{
bool success;
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken))
return false;
success = await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return true;
},
cancellationToken: cancellationToken
);
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);

await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({BuildId})", buildId);
return true;
return success;
}

private async Task<bool> TrainJobCompletedAsync(
Expand Down Expand Up @@ -286,12 +295,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildFaultedAsync(buildId, message, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogError("Build faulted ({BuildId}). Error: {ErrorMessage}", buildId, message);
Expand All @@ -316,12 +331,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildCanceledAsync(buildId, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogInformation("Build canceled ({BuildId})", buildId);
Expand Down
51 changes: 36 additions & 15 deletions src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ public abstract class HangfireBuildJob(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob> logger
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, buildJobService, logger)
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger)
{
public virtual Task RunAsync(
string engineId,
Expand All @@ -23,13 +24,15 @@ public abstract class HangfireBuildJob<T>(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob<T>> logger
)
{
protected IPlatformService PlatformService { get; } = platformService;
protected IRepository<TranslationEngine> Engines { get; } = engines;
protected IDistributedReaderWriterLockFactory LockFactory { get; } = lockFactory;
protected IDataAccessContext DataAccessContext { get; } = dataAccessContext;
protected IBuildJobService BuildJobService { get; } = buildJobService;
protected ILogger<HangfireBuildJob<T>> Logger { get; } = logger;

Expand Down Expand Up @@ -69,12 +72,18 @@ CancellationToken cancellationToken
completionStatus = JobCompletionStatus.Canceled;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogInformation("Build canceled ({0})", buildId);
Expand All @@ -86,8 +95,14 @@ await BuildJobService.BuildJobFinishedAsync(
completionStatus = JobCompletionStatus.Restarting;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
},
cancellationToken: CancellationToken.None
);
}
throw;
}
Expand All @@ -101,12 +116,18 @@ await BuildJobService.BuildJobFinishedAsync(
completionStatus = JobCompletionStatus.Faulted;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogError(0, e, "Build faulted ({0})", buildId);
Expand Down
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Services/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt, CancellationToken cancellationToken = default);

Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default);
Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default);
}
12 changes: 12 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync<T>(
T method,
string groupId,
string? requestContent = null,
string? requestContentPath = null,
CancellationToken cancellationToken = default
);
}
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IOutboxMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IOutboxMessageHandler
{
public string Name { get; }

public Task SendMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default);
public Task CleanupMessageAsync(OutboxMessage message, CancellationToken cancellationToken = default);
}
6 changes: 1 addition & 5 deletions src/SIL.Machine.AspNetCore/Services/IPlatformService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,5 @@ Task BuildCompletedAsync(
Task BuildFaultedAsync(string buildId, string message, CancellationToken cancellationToken = default);
Task BuildRestartingAsync(string buildId, CancellationToken cancellationToken = default);

Task InsertPretranslationsAsync(
string engineId,
IAsyncEnumerable<Pretranslation> pretranslations,
CancellationToken cancellationToken = default
);
Task InsertPretranslationsAsync(string engineId, string path, CancellationToken cancellationToken = default);
}
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(
Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);

Task<bool> ExistsAsync(string path, CancellationToken cancellationToken = default);
Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default);

Task DeleteAsync(string path, CancellationToken cancellationToken = default);
}
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ public async Task DeleteAsync(string path, bool recurse, CancellationToken cance
}
}

public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default)
{
if (!_memoryStreams.TryGetValue(Normalize(sourcePath), out Entry? entry))
throw new FileNotFoundException($"Unable to find file {sourcePath}");
_memoryStreams[Normalize(destPath)] = entry;
_memoryStreams.Remove(Normalize(sourcePath), out _);
return Task.CompletedTask;
}

protected override void DisposeManagedResources()
{
foreach (Entry stream in _memoryStreams.Values)
Expand Down
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/LocalStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationTo
return Task.FromResult<Stream>(File.OpenWrite(pathUri.LocalPath));
}

public Task MoveAsync(string sourcePath, string destPath, CancellationToken cancellationToken = default)
{
Uri sourcePathUri = new(_basePath, Normalize(sourcePath));
Uri destPathUri = new(_basePath, Normalize(destPath));
Directory.CreateDirectory(Path.GetDirectoryName(destPathUri.LocalPath)!);
File.Move(sourcePathUri.LocalPath, destPathUri.LocalPath);
return Task.CompletedTask;
}

public async Task DeleteAsync(string path, bool recurse, CancellationToken cancellationToken = default)
{
Uri pathUri = new(_basePath, Normalize(path));
Expand Down
Loading

0 comments on commit 6362191

Please sign in to comment.