Skip to content

Commit

Permalink
retry hangfire jobs - #158
Browse files Browse the repository at this point in the history
* Fixed auto-retry as per this forum post: https://discuss.hangfire.io/t/recurring-jobs-do-not-automatically-get-retried-after-application-crash-net-core-service/9160
* MongoDB can't handle documents greater than 16MB
* Treat messages from one id as a group
* Kill failing messages over 4 days old
* Universal message incrementer
* Add testing
  • Loading branch information
johnml1135 committed May 20, 2024
1 parent f4b27e5 commit 2aa8fe8
Show file tree
Hide file tree
Showing 22 changed files with 789 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ await c.Indexes.CreateOrUpdateAsync(
)
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
o.AddRepository<Sequence>("outbox_message_index");
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -284,6 +286,9 @@ public static IMachineBuilder AddServalPlatformService(
throw new InvalidOperationException("Serval connection string is required");

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IMessageOutboxService, MessageOutboxService>();

builder
.Services.AddGrpcClient<TranslationPlatformApi.TranslationPlatformApiClient>(o =>
{
Expand Down Expand Up @@ -338,6 +343,9 @@ public static IMachineBuilder AddServalTranslationEngineService(
options.Interceptors.Add<UnimplementedInterceptor>();
});
builder.AddServalPlatformService(connectionString);

builder.Services.AddHostedService<MessageOutboxHandlerService>();

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
Expand Down
23 changes: 23 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace SIL.Machine.AspNetCore.Models;

public enum OutboxMessageMethod
{
BuildStarted,
BuildCompleted,
BuildCanceled,
BuildFaulted,
BuildRestarting,
InsertPretranslations,
IncrementTranslationEngineCorpusSize
}

public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required OutboxMessageMethod Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
public int Attempts { get; set; } = 0;
}
16 changes: 16 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/Sequence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace SIL.Machine.AspNetCore.Models;

public record Sequence : IEntity
{
public string Id { get; set; } = "";

public int Revision { get; set; }

public string Context { get; set; } = "";
public int CurrentIndex { get; set; }

public static string IndexToObjectIdString(int value)
{
return value.ToString("x24");
}
}
62 changes: 41 additions & 21 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class ClearMLMonitorService(
IServiceProvider services,
IClearMLService clearMLService,
ISharedFileService sharedFileService,
IDataAccessContext dataAccessContext,
IOptions<ClearMLOptions> options,
ILogger<ClearMLMonitorService> logger
)
Expand All @@ -24,6 +25,7 @@ ILogger<ClearMLMonitorService> logger

private readonly IClearMLService _clearMLService = clearMLService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly ILogger<ClearMLMonitorService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

Expand Down Expand Up @@ -191,16 +193,22 @@ private async Task<bool> TrainJobStartedAsync(
)
{
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken))
return false;
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await using (await @lock.WriterLockAsync(cancellationToken: ct))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);

await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({0})", buildId);
return true;
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, ct);
_logger.LogInformation("Build started ({0})", buildId);
return true;
},
cancellationToken: cancellationToken
);
}

private async Task<bool> TrainJobCompletedAsync(
Expand Down Expand Up @@ -252,12 +260,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildFaultedAsync(buildId, message, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogError("Build faulted ({0}). Error: {1}", buildId, message);
Expand All @@ -282,12 +296,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildCanceledAsync(buildId, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogInformation("Build canceled ({0})", buildId);
Expand Down
51 changes: 36 additions & 15 deletions src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ public abstract class HangfireBuildJob(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob> logger
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, buildJobService, logger)
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger)
{
public virtual Task RunAsync(
string engineId,
Expand All @@ -23,13 +24,15 @@ public abstract class HangfireBuildJob<T>(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob<T>> logger
)
{
protected IPlatformService PlatformService { get; } = platformService;
protected IRepository<TranslationEngine> Engines { get; } = engines;
protected IDistributedReaderWriterLockFactory LockFactory { get; } = lockFactory;
protected IDataAccessContext DataAccessContext { get; } = dataAccessContext;
protected IBuildJobService BuildJobService { get; } = buildJobService;
protected ILogger<HangfireBuildJob<T>> Logger { get; } = logger;

Expand Down Expand Up @@ -69,12 +72,18 @@ CancellationToken cancellationToken
completionStatus = JobCompletionStatus.Canceled;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogInformation("Build canceled ({0})", buildId);
Expand All @@ -86,8 +95,14 @@ await BuildJobService.BuildJobFinishedAsync(
completionStatus = JobCompletionStatus.Restarting;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
},
cancellationToken: CancellationToken.None
);
}
throw;
}
Expand All @@ -101,12 +116,18 @@ await BuildJobService.BuildJobFinishedAsync(
completionStatus = JobCompletionStatus.Faulted;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogError(0, e, "Build faulted ({0})", buildId);
Expand Down
11 changes: 11 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync(
OutboxMessageMethod method,
string groupId,
string requestContent,
CancellationToken cancellationToken
);
}
Loading

0 comments on commit 2aa8fe8

Please sign in to comment.