Skip to content

Commit

Permalink
broken
Browse files Browse the repository at this point in the history
  • Loading branch information
johnml1135 committed Oct 29, 2024
1 parent da20548 commit 3a06a6b
Show file tree
Hide file tree
Showing 35 changed files with 244 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,23 @@ public static IMachineBuilder AddMongoHangfireJobClient(

public static IMachineBuilder AddHangfireJobServer(
this IMachineBuilder builder,
IEnumerable<TranslationEngineType>? engineTypes = null
IEnumerable<EngineType>? engineTypes = null
)
{
engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
builder.Configuration?.GetSection("TranslationEngines").Get<EngineType[]?>()
?? [EngineType.SmtTransfer, EngineType.Nmt];
var queues = new List<string>();
foreach (TranslationEngineType engineType in engineTypes.Distinct())
foreach (EngineType engineType in engineTypes.Distinct())
{
switch (engineType)
{
case TranslationEngineType.SmtTransfer:
case EngineType.SmtTransfer:
builder.Services.AddSingleton<SmtTransferEngineStateService>();
builder.AddThotSmtModel().AddTransferEngine().AddUnigramTruecaser();
queues.Add("smt_transfer");
break;
case TranslationEngineType.Nmt:
case EngineType.Nmt:
queues.Add("nmt");
break;
}
Expand Down Expand Up @@ -371,7 +371,7 @@ public static IMachineBuilder AddServalPlatformService(
public static IMachineBuilder AddServalTranslationEngineService(
this IMachineBuilder builder,
string? connectionString = null,
IEnumerable<TranslationEngineType>? engineTypes = null
IEnumerable<EngineType>? engineTypes = null
)
{
builder.Services.AddGrpc(options =>
Expand All @@ -383,19 +383,19 @@ public static IMachineBuilder AddServalTranslationEngineService(
builder.AddServalPlatformService(connectionString);

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
foreach (TranslationEngineType engineType in engineTypes.Distinct())
builder.Configuration?.GetSection("TranslationEngines").Get<EngineType[]?>()
?? [EngineType.SmtTransfer, EngineType.Nmt];
foreach (EngineType engineType in engineTypes.Distinct())
{
switch (engineType)
{
case TranslationEngineType.SmtTransfer:
case EngineType.SmtTransfer:
builder.Services.AddSingleton<SmtTransferEngineStateService>();
builder.Services.AddHostedService<SmtTransferEngineCommitService>();
builder.AddThotSmtModel().AddTransferEngine().AddUnigramTruecaser();
builder.Services.AddScoped<ITranslationEngineService, SmtTransferEngineService>();
break;
case TranslationEngineType.Nmt:
case EngineType.Nmt:
builder.Services.AddScoped<ITranslationEngineService, NmtEngineService>();
break;
}
Expand All @@ -406,7 +406,8 @@ public static IMachineBuilder AddServalTranslationEngineService(

public static IMachineBuilder AddBuildJobService(this IMachineBuilder builder, string? smtTransferEngineDir = null)
{
builder.Services.AddScoped<IBuildJobService, BuildJobService>();
builder.Services.AddScoped<IBuildJobService<TranslationEngine>, TranslationBuildJobService>();
builder.Services.AddScoped<IBuildJobService<WordAlignmentEngine>, BuildJobService<WordAlignmentEngine>>();

builder.Services.AddScoped<IBuildJobRunner, ClearMLBuildJobRunner>();
builder.Services.AddScoped<IClearMLBuildJobFactory, NmtClearMLBuildJobFactory>();
Expand Down
11 changes: 11 additions & 0 deletions src/Machine/src/Serval.Machine.Shared/Models/ITrainingEngine.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Serval.Machine.Shared.Models;

public interface ITrainingEngine : IEntity
{
public string EngineId { get; init; }
public EngineType Type { get; init; }
public string SourceLanguage { get; init; }
public string TargetLanguage { get; init; }
public int BuildRevision { get; init; }
public Build? CurrentBuild { get; init; }
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
namespace Serval.Machine.Shared.Models;

public record TranslationEngine : IEntity
public record TranslationEngine : ITrainingEngine
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required string EngineId { get; init; }
public required TranslationEngineType Type { get; init; }
public required EngineType Type { get; init; }
public required string SourceLanguage { get; init; }
public required string TargetLanguage { get; init; }
public required bool IsModelPersisted { get; init; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
namespace Serval.Machine.Shared.Models;

public record WordAlignmentEngine : IEntity
public record WordAlignmentEngine : ITrainingEngine
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required string EngineId { get; init; }
public required WordAlignmentEngineType Type { get; init; }
public required EngineType Type { get; init; }
public required string SourceLanguage { get; init; }
public required string TargetLanguage { get; init; }
public int BuildRevision { get; init; }
Expand Down
52 changes: 26 additions & 26 deletions src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
namespace Serval.Machine.Shared.Services;

public class BuildJobService(IEnumerable<IBuildJobRunner> runners, IRepository<TranslationEngine> engines)
: IBuildJobService
public class BuildJobService<TEngine>(IEnumerable<IBuildJobRunner> runners, IRepository<TEngine> engines)
: IBuildJobService<TEngine>

Check failure on line 4 in src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs

View workflow job for this annotation

GitHub Actions / Build

'BuildJobService<TEngine>' does not implement interface member 'IBuildJobService<TEngine>.GetBuildingEnginesAsync(BuildJobRunnerType, CancellationToken)'. 'BuildJobService<TEngine>.GetBuildingEnginesAsync(BuildJobRunnerType, CancellationToken)' cannot implement 'IBuildJobService<TEngine>.GetBuildingEnginesAsync(BuildJobRunnerType, CancellationToken)' because it does not have the matching return type of 'Task<IReadOnlyList<ITrainingEngine>>'.
where TEngine : ITrainingEngine
{
private readonly Dictionary<BuildJobRunnerType, IBuildJobRunner> _runners = runners.ToDictionary(r => r.Type);
private readonly IRepository<TranslationEngine> _engines = engines;
// TODO: make some sort of service to get the engine repos.
protected readonly Dictionary<BuildJobRunnerType, IBuildJobRunner> Runners = runners.ToDictionary(r => r.Type);
protected readonly IRepository<TEngine> Engines = engines;

public Task<bool> IsEngineBuilding(string engineId, CancellationToken cancellationToken = default)
{
return _engines.ExistsAsync(e => e.EngineId == engineId && e.CurrentBuild != null, cancellationToken);
return Engines.ExistsAsync(e => e.EngineId == engineId && e.CurrentBuild != null, cancellationToken);
}

public Task<IReadOnlyList<TranslationEngine>> GetBuildingEnginesAsync(
public Task<IReadOnlyList<TEngine>> GetBuildingEnginesAsync(
BuildJobRunnerType runner,
CancellationToken cancellationToken = default
)
{
return _engines.GetAllAsync(
return Engines.GetAllAsync(
e => e.CurrentBuild != null && e.CurrentBuild.BuildJobRunner == runner,
cancellationToken
);
Expand All @@ -28,7 +30,7 @@ public Task<IReadOnlyList<TranslationEngine>> GetBuildingEnginesAsync(
CancellationToken cancellationToken = default
)
{
TranslationEngine? engine = await _engines.GetAsync(
TEngine? engine = await Engines.GetAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.BuildId == buildId,
cancellationToken
);
Expand All @@ -41,25 +43,25 @@ public async Task CreateEngineAsync(
CancellationToken cancellationToken = default
)
{
foreach (BuildJobRunnerType runnerType in _runners.Keys)
foreach (BuildJobRunnerType runnerType in Runners.Keys)
{
IBuildJobRunner runner = _runners[runnerType];
IBuildJobRunner runner = Runners[runnerType];
await runner.CreateEngineAsync(engineId, name, cancellationToken);
}
}

public async Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default)
{
foreach (BuildJobRunnerType runnerType in _runners.Keys)
foreach (BuildJobRunnerType runnerType in Runners.Keys)
{
IBuildJobRunner runner = _runners[runnerType];
IBuildJobRunner runner = Runners[runnerType];
await runner.DeleteEngineAsync(engineId, cancellationToken);
}
}

public async Task<bool> StartBuildJobAsync(
BuildJobRunnerType runnerType,
TranslationEngineType engineType,
EngineType engineType,
string engineId,
string buildId,
BuildStage stage,
Expand All @@ -68,7 +70,7 @@ public async Task<bool> StartBuildJobAsync(
CancellationToken cancellationToken = default
)
{
IBuildJobRunner runner = _runners[runnerType];
IBuildJobRunner runner = Runners[runnerType];
string jobId = await runner.CreateJobAsync(
engineType,
engineId,
Expand All @@ -80,7 +82,7 @@ public async Task<bool> StartBuildJobAsync(
);
try
{
TranslationEngine? engine = await _engines.UpdateAsync(
TEngine? engine = await Engines.UpdateAsync(
e =>
e.EngineId == engineId
&& (
Expand Down Expand Up @@ -121,39 +123,38 @@ public async Task<bool> StartBuildJobAsync(
}
}

public async Task<(string? BuildId, BuildJobState State)> CancelBuildJobAsync(
public virtual async Task<(string? BuildId, BuildJobState State)> CancelBuildJobAsync(
string engineId,
CancellationToken cancellationToken = default
)
{
// cancel a job that hasn't started yet
TranslationEngine? engine = await _engines.UpdateAsync(
TEngine? engine = await Engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Pending,
u =>
{
u.Unset(b => b.CurrentBuild);
u.Set(e => e.CollectTrainSegmentPairs, false);
},
returnOriginal: true,
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
// job will be deleted from the queue
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];
IBuildJobRunner runner = Runners[engine.CurrentBuild.BuildJobRunner];
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.None);
}

// cancel a job that is already running
engine = await _engines.UpdateAsync(
engine = await Engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Active,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling),
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];
IBuildJobRunner runner = Runners[engine.CurrentBuild.BuildJobRunner];
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.Canceling);
}
Expand All @@ -167,7 +168,7 @@ public async Task<bool> BuildJobStartedAsync(
CancellationToken cancellationToken = default
)
{
TranslationEngine? engine = await _engines.UpdateAsync(
TEngine? engine = await Engines.UpdateAsync(
e =>
e.EngineId == engineId
&& e.CurrentBuild != null
Expand All @@ -179,19 +180,18 @@ public async Task<bool> BuildJobStartedAsync(
return engine is not null;
}

public Task BuildJobFinishedAsync(
public virtual Task BuildJobFinishedAsync(
string engineId,
string buildId,
bool buildComplete,
CancellationToken cancellationToken = default
)
{
return _engines.UpdateAsync(
return Engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.BuildId == buildId,
u =>
{
u.Unset(e => e.CurrentBuild);
u.Set(e => e.CollectTrainSegmentPairs, false);
if (buildComplete)
u.Inc(e => e.BuildRevision);
},
Expand All @@ -201,7 +201,7 @@ public Task BuildJobFinishedAsync(

public Task BuildJobRestartingAsync(string engineId, string buildId, CancellationToken cancellationToken = default)
{
return _engines.UpdateAsync(
return Engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.BuildId == buildId,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending),
cancellationToken: cancellationToken
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ IOptionsMonitor<BuildJobOptions> options
) : IBuildJobRunner
{
private readonly IClearMLService _clearMLService = clearMLService;
private readonly Dictionary<TranslationEngineType, IClearMLBuildJobFactory> _buildJobFactories =
private readonly Dictionary<EngineType, IClearMLBuildJobFactory> _buildJobFactories =
buildJobFactories.ToDictionary(f => f.EngineType);

private readonly Dictionary<TranslationEngineType, ClearMLBuildQueue> _options =
options.CurrentValue.ClearML.ToDictionary(o => o.EngineType);
private readonly Dictionary<EngineType, ClearMLBuildQueue> _options = options.CurrentValue.ClearML.ToDictionary(o =>
o.EngineType
);

public BuildJobRunnerType Type => BuildJobRunnerType.ClearML;

Expand All @@ -32,7 +33,7 @@ public async Task DeleteEngineAsync(string engineId, CancellationToken cancellat
}

public async Task<string> CreateJobAsync(
TranslationEngineType engineType,
EngineType engineType,
string engineId,
string buildId,
BuildStage stage,
Expand Down Expand Up @@ -74,7 +75,7 @@ public Task<bool> DeleteJobAsync(string jobId, CancellationToken cancellationTok

public Task<bool> EnqueueJobAsync(
string jobId,
TranslationEngineType engineType,
EngineType engineType,
CancellationToken cancellationToken = default
)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Serval.Machine.Shared.Services;
using System.Linq;

namespace Serval.Machine.Shared.Services;

public class ClearMLMonitorService(
IServiceProvider services,
Expand Down Expand Up @@ -33,27 +35,37 @@ ILogger<ClearMLMonitorService> logger
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.EngineType, x => 0)
);

public int GetQueueSize<TEnum>(TEnum engineType)
where TEnum : Enum
public int GetQueueSize(EngineType engineType)
{
return _queueSizePerEngineType[engineType.ToString()];
}

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
await MonitorClearMLTasksPerDomain<TranslationEngineType>(scope, cancellationToken);
await MonitorClearMLTasksPerDomain<WordAlignmentEngineType>(scope, cancellationToken);
await MonitorClearMLTasksPerDomain<EngineType>(scope, cancellationToken);
}

private async Task MonitorClearMLTasksPerDomain<TEngine>(IServiceScope scope, CancellationToken cancellationToken)
where TEngine : Enum
{
try
{
var buildJobService = scope.ServiceProvider.GetRequiredService<IBuildJobService>();
IReadOnlyList<TranslationEngine> trainingEngines = await buildJobService.GetBuildingEnginesAsync(
BuildJobRunnerType.ClearML,
cancellationToken
var translationBuildJobService = scope.ServiceProvider.GetRequiredService<
IBuildJobService<TranslationEngine>
>();
var wordAlignmentBuildJobService = scope.ServiceProvider.GetRequiredService<
IBuildJobService<WordAlignmentEngine>
>();

Dictionary<ITrainingEngine, IBuildJobService<ITrainingEngine>> trainingEngines = (
await translationBuildJobService.GetBuildingEnginesAsync(BuildJobRunnerType.ClearML, cancellationToken)
).ToDictionary(e => e, e => translationBuildJobService as IBuildJobService<ITrainingEngine>);

trainingEngines.AddRange(
await wordAlignmentBuildJobService.GetBuildingEnginesAsync(
BuildJobRunnerType.ClearML,
cancellationToken
)
);
if (trainingEngines.Count == 0)
return;
Expand Down Expand Up @@ -245,7 +257,7 @@ private async Task<bool> TrainJobStartedAsync(

private async Task<bool> TrainJobCompletedAsync(
IBuildJobService buildJobService,
TranslationEngineType engineType,
EngineType engineType,
string engineId,
string buildId,
int corpusSize,
Expand Down
Loading

0 comments on commit 3a06a6b

Please sign in to comment.