Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry hangfire jobs #197

Merged
merged 9 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public static IMachineBuilder AddClearMLOptions(this IMachineBuilder builder, IC
return builder;
}

public static IMachineBuilder AddMessageOutboxOptions(
this IMachineBuilder builder,
Action<MessageOutboxOptions> configureOptions
)
{
builder.Services.Configure(configureOptions);
return builder;
}

public static IMachineBuilder AddMessageOutboxOptions(this IMachineBuilder builder, IConfiguration config)
{
builder.Services.Configure<MessageOutboxOptions>(config);
return builder;
}

public static IMachineBuilder AddSharedFileOptions(
this IMachineBuilder builder,
Action<SharedFileOptions> configureOptions
Expand Down Expand Up @@ -219,6 +234,8 @@ public static IMachineBuilder AddMemoryDataAccess(this IMachineBuilder builder)
o.AddRepository<TranslationEngine>();
o.AddRepository<RWLock>();
o.AddRepository<TrainSegmentPair>();
o.AddRepository<OutboxMessage>();
o.AddRepository<Outbox>();
});

return builder;
Expand Down Expand Up @@ -263,6 +280,14 @@ await c.Indexes.CreateOrUpdateAsync(
)
)
);
o.AddRepository<OutboxMessage>(
"outbox_messages",
mapSetup: m => m.MapProperty(m => m.OutboxRef).SetSerializer(new StringSerializer())
);
o.AddRepository<Outbox>(
"outboxes",
mapSetup: m => m.MapIdProperty(o => o.Id).SetSerializer(new StringSerializer())
);
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -280,6 +305,11 @@ public static IMachineBuilder AddServalPlatformService(
throw new InvalidOperationException("Serval connection string is required");

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IOutboxMessageHandler, ServalPlatformOutboxMessageHandler>();

builder.Services.AddScoped<IMessageOutboxService, MessageOutboxService>();

builder
.Services.AddGrpcClient<TranslationPlatformApi.TranslationPlatformApiClient>(o =>
{
Expand Down Expand Up @@ -334,6 +364,7 @@ public static IMachineBuilder AddServalTranslationEngineService(
options.Interceptors.Add<UnimplementedInterceptor>();
});
builder.AddServalPlatformService(connectionString);

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
Expand Down Expand Up @@ -397,4 +428,10 @@ public static IMachineBuilder AddModelCleanupService(this IMachineBuilder builde
builder.Services.AddHostedService<ModelCleanupService>();
return builder;
}

public static IMachineBuilder AddMessageOutboxDeliveryService(this IMachineBuilder builder)
{
builder.Services.AddHostedService<MessageOutboxDeliveryService>();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
services.AddHealthChecks().AddCheck<S3HealthCheck>("S3 Bucket");

services.AddSingleton<ILanguageTagService, LanguageTagService>();
services.AddTransient<IFileSystem, FileSystem>();

services.AddScoped<IDistributedReaderWriterLockFactory, DistributedReaderWriterLockFactory>();
services.AddSingleton<ICorpusService, CorpusService>();
Expand All @@ -28,6 +29,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSmtTransferEngineOptions(o => { });
builder.AddClearMLOptions(o => { });
builder.AddBuildJobOptions(o => { });
builder.AddMessageOutboxOptions(o => { });
}
else
{
Expand All @@ -36,6 +38,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSmtTransferEngineOptions(configuration.GetSection(SmtTransferEngineOptions.Key));
builder.AddClearMLOptions(configuration.GetSection(ClearMLOptions.Key));
builder.AddBuildJobOptions(configuration.GetSection(BuildJobOptions.Key));
builder.AddMessageOutboxOptions(configuration.GetSection(MessageOutboxOptions.Key));
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace SIL.Machine.AspNetCore.Configuration;

public class MessageOutboxOptions
{
public const string Key = "MessageOutbox";

public string OutboxDir { get; set; } = "outbox";
public TimeSpan MessageExpirationTimeout { get; set; } = TimeSpan.FromHours(48);
}
10 changes: 10 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/Outbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace SIL.Machine.AspNetCore.Models;

public record Outbox : IEntity
{
public string Id { get; set; } = "";

public int Revision { get; set; }

public int CurrentIndex { get; init; }
}
15 changes: 15 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace SIL.Machine.AspNetCore.Models;

public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required int Index { get; init; }
public required string OutboxRef { get; init; }
public required string Method { get; init; }
public required string GroupId { get; init; }
public string? Content { get; init; }
public required bool HasContentStream { get; init; }
public DateTimeOffset Created { get; init; } = DateTimeOffset.UtcNow;
public int Attempts { get; init; }
}
61 changes: 44 additions & 17 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ await _clearMLService.GetTasksForQueueAsync(_queuePerEngineType[engineType], can
_queueSizePerEngineType[engineType] = queuePositionsPerEngineType.Count;
}

var dataAccessContext = scope.ServiceProvider.GetRequiredService<IDataAccessContext>();
var platformService = scope.ServiceProvider.GetRequiredService<IPlatformService>();
var lockFactory = scope.ServiceProvider.GetRequiredService<IDistributedReaderWriterLockFactory>();
foreach (TranslationEngine engine in trainingEngines)
Expand Down Expand Up @@ -117,6 +118,7 @@ or ClearMLTaskStatus.Completed
)
{
bool canceled = !await TrainJobStartedAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand Down Expand Up @@ -169,6 +171,7 @@ await UpdateTrainJobStatus(
if (canceling)
{
await TrainJobCanceledAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand All @@ -183,6 +186,7 @@ await TrainJobCanceledAsync(
case ClearMLTaskStatus.Stopped:
{
await TrainJobCanceledAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand All @@ -196,6 +200,7 @@ await TrainJobCanceledAsync(
case ClearMLTaskStatus.Failed:
{
await TrainJobFaultedAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
Expand All @@ -217,6 +222,7 @@ await TrainJobFaultedAsync(
}

private async Task<bool> TrainJobStartedAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
Expand All @@ -225,17 +231,24 @@ private async Task<bool> TrainJobStartedAsync(
CancellationToken cancellationToken = default
)
{
bool success;
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken))
return false;
success = await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return true;
},
cancellationToken: cancellationToken
);
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);

await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({BuildId})", buildId);
return true;
return success;
}

private async Task<bool> TrainJobCompletedAsync(
Expand Down Expand Up @@ -272,6 +285,7 @@ CancellationToken cancellationToken
}

private async Task TrainJobFaultedAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
Expand All @@ -286,12 +300,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildFaultedAsync(buildId, message, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogError("Build faulted ({BuildId}). Error: {ErrorMessage}", buildId, message);
Expand All @@ -303,6 +323,7 @@ await buildJobService.BuildJobFinishedAsync(
}

private async Task TrainJobCanceledAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
Expand All @@ -316,12 +337,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildCanceledAsync(buildId, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogInformation("Build canceled ({BuildId})", buildId);
Expand Down
25 changes: 25 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/FileSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace SIL.Machine.AspNetCore.Services;

public class FileSystem : IFileSystem
{
public void CreateDirectory(string path)
{
Directory.CreateDirectory(path);
}

public void DeleteFile(string path)
{
if (File.Exists(path))
File.Delete(path);
}

public Stream OpenWrite(string path)
{
return File.OpenWrite(path);
}

public Stream OpenRead(string path)
{
return File.OpenRead(path);
}
}
Loading
Loading