Skip to content

Commit

Permalink
a machine start
Browse files Browse the repository at this point in the history
  • Loading branch information
johnml1135 committed Oct 15, 2024
1 parent f72b568 commit 82c30fe
Show file tree
Hide file tree
Showing 14 changed files with 493 additions and 26 deletions.
15 changes: 12 additions & 3 deletions src/Machine/src/Serval.Machine.EngineServer/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,27 @@
"SmtTransfer",
"Nmt"
],
"WordAlignmentEngines": [
"Statistical"
],
"BuildJob": {
"ClearML": [
{
"TranslationEngineType": "Nmt",
"EngineType": "Nmt",
"ModelType": "huggingface",
"Queue": "jobs_backlog",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
},
{
"TranslationEngineType": "SmtTransfer",
"EngineType": "SmtTransfer",
"ModelType": "thot",
"Queue": "jobs_backlog.cpu_only",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
},
{
"EngineType": "Statistical",
"ModelType": "thot",
"Queue": "cpu_only",
"Queue": "jobs_backlog.cpu_only",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
}
]
Expand Down
13 changes: 11 additions & 2 deletions src/Machine/src/Serval.Machine.JobServer/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,25 @@
"SmtTransfer",
"Nmt"
],
"WordAlignmentEngines": [
"Statistical"
],
"BuildJob": {
"ClearML": [
{
"TranslationEngineType": "Nmt",
"EngineType": "Nmt",
"ModelType": "huggingface",
"Queue": "jobs_backlog",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
},
{
"TranslationEngineType": "SmtTransfer",
"EngineType": "SmtTransfer",
"ModelType": "thot",
"Queue": "jobs_backlog.cpu_only",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
},
{
"EngineType": "Statistical",
"ModelType": "thot",
"Queue": "jobs_backlog.cpu_only",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class ClearMLBuildQueue
{
public TranslationEngineType TranslationEngineType { get; set; }
public string EngineType { get; set; } = "";
public string ModelType { get; set; } = "";
public string Queue { get; set; } = "default";
public string DockerImage { get; set; } = "";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Serval.Machine.Shared.Models;

public record WordAlignmentEngine : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required string EngineId { get; init; }
public required WordAlignmentEngineType Type { get; init; }
public required string SourceLanguage { get; init; }
public required string TargetLanguage { get; init; }
public int BuildRevision { get; init; }
public Build? CurrentBuild { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ IOptionsMonitor<BuildJobOptions> options
buildJobFactories.ToDictionary(f => f.EngineType);

private readonly Dictionary<TranslationEngineType, ClearMLBuildQueue> _options =
options.CurrentValue.ClearML.ToDictionary(o => o.TranslationEngineType);
options.CurrentValue.ClearML.ToDictionary(o => o.EngineType);

public BuildJobRunnerType Type => BuildJobRunnerType.ClearML;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,27 @@ ILogger<ClearMLMonitorService> logger
private readonly ILogger<IClearMLQueueService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

private readonly IReadOnlyDictionary<TranslationEngineType, string> _queuePerEngineType =
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.TranslationEngineType, x => x.Queue);
private readonly IReadOnlyDictionary<string, string> _queuePerEngineType =
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.EngineType, x => x.Queue);

private readonly IDictionary<TranslationEngineType, int> _queueSizePerEngineType = new ConcurrentDictionary<
TranslationEngineType,
int
>(buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.TranslationEngineType, x => 0));
private readonly IDictionary<string, int> _queueSizePerEngineType = new ConcurrentDictionary<string, int>(
buildJobOptions.CurrentValue.ClearML.ToDictionary(x => x.EngineType, x => 0)
);

public int GetQueueSize(TranslationEngineType engineType)
public int GetQueueSize<TEnum>(TEnum engineType)
where TEnum : Enum
{
return _queueSizePerEngineType[engineType];
return _queueSizePerEngineType[engineType.ToString()];
}

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
await MonitorClearMLTasksPerDomain<TranslationEngineType>(scope, cancellationToken);
await MonitorClearMLTasksPerDomain<WordAlignmentEngineType>(scope, cancellationToken);
}

private async Task MonitorClearMLTasksPerDomain<TEngine>(IServiceScope scope, CancellationToken cancellationToken)
where TEngine : Enum
{
try
{
Expand All @@ -57,13 +64,13 @@ await _clearMLService.GetTasksByIdAsync(
cancellationToken
)
).ToDictionary(t => t.Id);
Dictionary<TranslationEngineType, Dictionary<string, int>> queuePositionsPerEngineType = new();
Dictionary<string, Dictionary<string, int>> queuePositionsPerEngineType = new();

foreach ((TranslationEngineType engineType, string queueName) in _queuePerEngineType)
foreach ((string engineType, string queueName) in _queuePerEngineType)
{
var tasksPerEngineType = tasks
.Where(kvp =>
trainingEngines.Where(te => te.CurrentBuild?.JobId == kvp.Key).FirstOrDefault()?.Type
trainingEngines.Where(te => te.CurrentBuild?.JobId == kvp.Key).FirstOrDefault()?.Type.ToString()
== engineType
)
.Select(kvp => kvp.Value)
Expand Down Expand Up @@ -96,7 +103,7 @@ await UpdateTrainJobStatus(
engine.CurrentBuild.BuildId,
new ProgressStatus(step: 0, percentCompleted: 0.0),
//CurrentBuild.BuildId should always equal the corresponding task.Name
queuePositionsPerEngineType[engine.Type][engine.CurrentBuild.BuildId] + 1,
queuePositionsPerEngineType[engine.Type.ToString()][engine.CurrentBuild.BuildId] + 1,
cancellationToken
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Serval.Machine.Shared.Services;

public interface IWordAlignmentEngineService
{
WordAlignmentEngineType WordAlignmentEngine { get; }

Task<WordAlignmentEngine> CreateAsync(
string engineId,
string? engineName,
string sourceLanguage,
string targetLanguage,
bool? isModelPersisted = null,
CancellationToken cancellationToken = default
);
Task DeleteAsync(string engineId, CancellationToken cancellationToken = default);

Task<TranslationResult> GetBestPhraseAlignmentAsync(
string sourceSegment,
string targetSegment,
CancellationToken cancellationToken = default
);

Task StartBuildAsync(
string engineId,
string buildId,
string? buildOptions,
IReadOnlyList<ParallelCorpus> corpora,
CancellationToken cancellationToken = default
);

Task CancelBuildAsync(string engineId, CancellationToken cancellationToken = default);

int GetQueueSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
using Google.Protobuf.WellKnownTypes;
using Serval.WordAlignment.V1;

namespace Serval.Machine.Shared.Services;

public class ServalWordAlignmentEngineServiceV1(IEnumerable<IWordAlignmentEngineService> engineServices)
: WordAlignmentEngineApi.WordAlignmentEngineApiBase
{
private static readonly Empty Empty = new();

private readonly Dictionary<WordAlignmentEngineType, IWordAlignmentEngineService> _engineServices =
engineServices.ToDictionary(es => es.WordAlignmentEngine);

public override async Task<Empty> Create(CreateRequest request, ServerCallContext context)
{
IWordAlignmentEngineService engineService = GetEngineService(request.EngineType);
await engineService.CreateAsync(
request.EngineId,
request.HasEngineName ? request.EngineName : null,
request.SourceLanguage,
request.TargetLanguage,
isModelPersisted: true,
cancellationToken: context.CancellationToken
);
return Empty;
}

public override async Task<Empty> Delete(DeleteRequest request, ServerCallContext context)
{
IWordAlignmentEngineService engineService = GetEngineService(request.EngineType);
await engineService.DeleteAsync(request.EngineId, context.CancellationToken);
return Empty;
}

public override async Task<GetWordAlignmentResponse> GetWordAlignment(
GetWordAlignmentRequest request,
ServerCallContext context
)
{
IWordAlignmentEngineService engineService = GetEngineService(request.EngineType);
TranslationResult result;
try
{
result = await engineService.GetBestPhraseAlignmentAsync(
request.SourceSegment,
request.TargetSegment,
context.CancellationToken
);
}
catch (EngineNotBuiltException e)
{
throw new RpcException(new Status(StatusCode.Aborted, e.Message, e));
}

return new GetWordAlignmentResponse { Result = Map(result) };
}

public override async Task<Empty> StartBuild(StartBuildRequest request, ServerCallContext context)
{
IWordAlignmentEngineService engineService = GetEngineService(request.EngineType);
Models.ParallelCorpus[] corpora = request.Corpora.Select(Map).ToArray();
try
{
await engineService.StartBuildAsync(
request.EngineId,
request.BuildId,
request.HasOptions ? request.Options : null,
corpora,
context.CancellationToken
);
}
catch (InvalidOperationException e)
{
throw new RpcException(new Status(StatusCode.Aborted, e.Message, e));
}
return Empty;
}

public override async Task<Empty> CancelBuild(CancelBuildRequest request, ServerCallContext context)
{
IWordAlignmentEngineService engineService = GetEngineService(request.EngineType);
try
{
await engineService.CancelBuildAsync(request.EngineId, context.CancellationToken);
}
catch (InvalidOperationException e)
{
throw new RpcException(new Status(StatusCode.Aborted, e.Message, e));
}
return Empty;
}

public override Task<GetQueueSizeResponse> GetQueueSize(GetQueueSizeRequest request, ServerCallContext context)
{
IWordAlignmentEngineService engineService = GetEngineService(request.EngineType);
return Task.FromResult(new GetQueueSizeResponse { Size = engineService.GetQueueSize() });
}

private IWordAlignmentEngineService GetEngineService(string engineTypeStr)
{
if (_engineServices.TryGetValue(GetEngineType(engineTypeStr), out IWordAlignmentEngineService? service))
return service;
throw new RpcException(new Status(StatusCode.InvalidArgument, "The engine type is invalid."));
}

private static WordAlignmentEngineType GetEngineType(string engineTypeStr)
{
engineTypeStr = engineTypeStr[0].ToString().ToUpperInvariant() + engineTypeStr[1..];
if (System.Enum.TryParse(engineTypeStr, out WordAlignmentEngineType engineType))
return engineType;
throw new RpcException(new Status(StatusCode.InvalidArgument, "The engine type is invalid."));
}

private static WordAlignmentResult Map(TranslationResult source)
{
return new WordAlignmentResult
{
SourceTokens = { source.SourceTokens },
TargetTokens = { source.TargetTokens },
Confidences = { source.Confidences },
Alignment = { Map(source.Alignment) },
};
}

private static IEnumerable<WordAlignment.V1.AlignedWordPair> Map(WordAlignmentMatrix source)
{
for (int i = 0; i < source.RowCount; i++)
{
for (int j = 0; j < source.ColumnCount; j++)
{
if (source[i, j])
yield return new WordAlignment.V1.AlignedWordPair { SourceIndex = i, TargetIndex = j };
}
}
}

private static Models.ParallelCorpus Map(WordAlignment.V1.ParallelCorpus source)
{
return new Models.ParallelCorpus
{
Id = source.Id,
SourceCorpora = source.SourceCorpora.Select(Map).ToList(),
TargetCorpora = source.TargetCorpora.Select(Map).ToList()
};
}

private static Models.MonolingualCorpus Map(WordAlignment.V1.MonolingualCorpus source)
{
var trainOnChapters = source.TrainOnChapters.ToDictionary(
kvp => kvp.Key,
kvp => kvp.Value.Chapters.ToHashSet()
);
var trainOnTextIds = source.TrainOnTextIds.ToHashSet();
FilterChoice trainingFilter = GetFilterChoice(trainOnChapters, trainOnTextIds);

var pretranslateChapters = source.WordAlignOnChapters.ToDictionary(
kvp => kvp.Key,
kvp => kvp.Value.Chapters.ToHashSet()
);
var pretranslateTextIds = source.WordAlignOnTextIds.ToHashSet();
FilterChoice pretranslateFilter = GetFilterChoice(pretranslateChapters, pretranslateTextIds);

return new Models.MonolingualCorpus
{
Id = source.Id,
Language = source.Language,
Files = source.Files.Select(Map).ToList(),
TrainOnChapters = trainingFilter == FilterChoice.Chapters ? trainOnChapters : null,
TrainOnTextIds = trainingFilter == FilterChoice.TextIds ? trainOnTextIds : null,
PretranslateChapters = pretranslateFilter == FilterChoice.Chapters ? pretranslateChapters : null,
PretranslateTextIds = pretranslateFilter == FilterChoice.TextIds ? pretranslateTextIds : null
};
}

private static Models.CorpusFile Map(WordAlignment.V1.CorpusFile source)
{
return new Models.CorpusFile
{
Location = source.Location,
Format = (Models.FileFormat)source.Format,
TextId = source.TextId
};
}

private enum FilterChoice
{
Chapters,
TextIds,
None
}

private static FilterChoice GetFilterChoice(
IReadOnlyDictionary<string, HashSet<int>> chapters,
HashSet<string> textIds
)
{
// Only either textIds or Scripture Range will be used at a time
// TextIds may be an empty array, so prefer that if both are empty (which applies to both scripture and text)
if (chapters is null && textIds is null)
return FilterChoice.None;
if (chapters is null || chapters.Count == 0)
return FilterChoice.TextIds;
return FilterChoice.Chapters;
}
}
Loading

0 comments on commit 82c30fe

Please sign in to comment.