Skip to content

Commit

Permalink
More intermediate work
Browse files Browse the repository at this point in the history
  • Loading branch information
johnml1135 committed Oct 8, 2024
1 parent 12c97c9 commit 12a40bd
Show file tree
Hide file tree
Showing 27 changed files with 232 additions and 394 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Serval.Assessment.V1;
using Serval.Health.V1;
using Serval.Health.V1;

namespace Microsoft.Extensions.DependencyInjection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ CancellationToken cancellationToken
[ProducesResponseType(typeof(void), StatusCodes.Status403Forbidden)]
[ProducesResponseType(typeof(void), StatusCodes.Status404NotFound)]
[ProducesResponseType(typeof(void), StatusCodes.Status503ServiceUnavailable)]
public async Task<ActionResult<AssessmentJobDto>> StartJobAsync(
public async Task<ActionResult<AssessmentJobDto>> StartBuildAsync(
[NotNull] string id,
[FromBody] AssessmentJobConfigDto jobConfig,
CancellationToken cancellationToken
Expand All @@ -359,7 +359,7 @@ CancellationToken cancellationToken
AssessmentEngine engine = await _engineService.GetAsync(id, cancellationToken);
await AuthorizeAsync(engine);
AssessmentBuild job = Map(engine, jobConfig);
await _engineService.StartJobAsync(job, cancellationToken);
await _engineService.StartBuildAsync(job, cancellationToken);

AssessmentJobDto dto = Map(job);
return Created(dto.Url, dto);
Expand Down Expand Up @@ -417,14 +417,14 @@ CancellationToken cancellationToken
[ProducesResponseType(typeof(void), StatusCodes.Status404NotFound)]
[ProducesResponseType(typeof(void), StatusCodes.Status405MethodNotAllowed)]
[ProducesResponseType(typeof(void), StatusCodes.Status503ServiceUnavailable)]
public async Task<ActionResult> CancelJobAsync(
public async Task<ActionResult> CancelBuildAsync(
[NotNull] string id,
[NotNull] string jobId,
CancellationToken cancellationToken
)
{
await AuthorizeAsync(id, cancellationToken);
if (!await _engineService.CancelJobAsync(id, jobId, cancellationToken))
if (!await _engineService.CancelBuildAsync(id, jobId, cancellationToken))
return NoContent();
return Ok();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public record AssessmentEngine : IEngine
public required string Owner { get; init; }
public string? Name { get; init; }
public required string Type { get; init; }
public bool IsBuildRunning { get; init; }
public bool IsBuilding { get; init; }
public int BuildRevision { get; init; }
public required Corpus Corpus { get; init; }
public Corpus? ReferenceCorpus { get; init; }
Expand Down
206 changes: 51 additions & 155 deletions src/Serval/src/Serval.Assessment/Services/AssessmentEngineService.cs
Original file line number Diff line number Diff line change
@@ -1,71 +1,74 @@
using Serval.Engine.V1;
using Serval.Assessment.V1;
using Serval.Engine.V1;

namespace Serval.Assessment.Services;

public class AssessmentEngineService(
IRepository<AssessmentEngine> engines,
IRepository<AssessmentBuild> jobs,
IRepository<AssessmentBuild> builds,
IRepository<AssessmentResult> results,
GrpcClientFactory grpcClientFactory,
IOptionsMonitor<DataFileOptions> dataFileOptions,
IDataAccessContext dataAccessContext,
ILoggerFactory loggerFactory,
IScriptureDataFileService scriptureDataFileService
) : OwnedEntityServiceBase<AssessmentEngine>(engines), IAssessmentEngineService
)
: EngineServiceBase<AssessmentEngine, AssessmentBuild>(
engines,
builds,
grpcClientFactory,
dataAccessContext,
loggerFactory
),
IAssessmentEngineService
{
private readonly IRepository<AssessmentBuild> _jobs = jobs;
private readonly IRepository<AssessmentResult> _results = results;
private readonly GrpcClientFactory _grpcClientFactory = grpcClientFactory;
private readonly IOptionsMonitor<DataFileOptions> _dataFileOptions = dataFileOptions;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly ILogger<AssessmentEngineService> _logger = loggerFactory.CreateLogger<AssessmentEngineService>();
private readonly IScriptureDataFileService _scriptureDataFileService = scriptureDataFileService;

public override async Task<AssessmentEngine> CreateAsync(
AssessmentEngine engine,
CancellationToken cancellationToken = default
)
{
try
{
await Entities.InsertAsync(engine, cancellationToken);
var client = _grpcClientFactory.CreateClient<AssessmentEngineApi.AssessmentEngineApiClient>(engine.Type);
if (client is null)
throw new InvalidOperationException($"'{engine.Type}' is an invalid engine type.");
var request = new CreateRequest { EngineType = engine.Type, EngineId = engine.Id, };
if (engine.Name is not null)
request.EngineName = engine.Name;
await client.CreateAsync(request, cancellationToken: cancellationToken);
}
catch
{
await Entities.DeleteAsync(engine, CancellationToken.None);
throw;
}
return engine;
}

public override async Task DeleteAsync(string id, CancellationToken cancellationToken = default)
public async Task StartBuildAsync(AssessmentBuild build, CancellationToken cancellationToken = default)
{
AssessmentEngine? engine = await Entities.GetAsync(id, cancellationToken);
AssessmentEngine engine = await GetAsync(build.EngineRef, cancellationToken);
if (engine is null)
throw new EntityNotFoundException($"Could not find the AssessmentEngine '{id}'.");
throw new EntityNotFoundException($"Could not find the Engine '{build.EngineRef}'.");

var client = _grpcClientFactory.CreateClient<AssessmentEngineApi.AssessmentEngineApiClient>(engine.Type);
await client.DeleteAsync(
new DeleteRequest { EngineType = engine.Type, EngineId = engine.Id },
cancellationToken: cancellationToken
);
IReadOnlyList<string> includeTextIds = build.TextIds ?? new List<string>();
IList<Engine.V1.CorpusFile> files = engine.Corpus.Files.Select(Map).ToList();
IDictionary<string, ScriptureChapters> includeChapters;

await _dataAccessContext.WithTransactionAsync(
async (ct) =>
if (build.ScriptureRange is not null)
{
if (engine.Corpus.Files.Count > 1 || engine.Corpus.Files[0].Format != Shared.Contracts.FileFormat.Paratext)
{
await Entities.DeleteAsync(id, ct);
await _jobs.DeleteAllAsync(b => b.EngineRef == id, ct);
await _results.DeleteAllAsync(r => r.EngineRef == id, ct);
},
CancellationToken.None
);
throw new InvalidOperationException($"The engine is not compatible with using a scripture range.");
}

try
{
ScrVers versification = _scriptureDataFileService
.GetParatextProjectSettings(files[0].Location)
.Versification;
includeChapters = ScriptureRangeParser
.GetChapters(build.ScriptureRange, versification)
.ToDictionary(kvp => kvp.Key, kvp => new ScriptureChapters { Chapters = { kvp.Value } });
}
catch (ArgumentException ae)
{
throw new InvalidOperationException(
$"The scripture range {build.ScriptureRange} is not valid: {ae.Message}"
);
}
}

AssessmentCorpus corpus = new AssessmentCorpus
{
Language = engine.Corpus.Language,
IncludeAll = build.TextIds is null || build.TextIds.Count == 0,
IncludeTextIds = includeTextIds,
IncludeChapters = includeChapters,
Files = { engine.Corpus.Files.Select(Map) },
};

await StartBuildAsync(build, JsonSerializer.Serialize(corpus), build.Options, cancellationToken);
}

public async Task<Shared.Models.Corpus> ReplaceCorpusAsync(
Expand Down Expand Up @@ -100,113 +103,6 @@ await _dataAccessContext.WithTransactionAsync(
return engine.ReferenceCorpus!;
}

public async Task StartJobAsync(AssessmentBuild job, CancellationToken cancellationToken = default)
{
AssessmentEngine engine = await GetAsync(job.EngineRef, cancellationToken);
await _jobs.InsertAsync(job, cancellationToken);

try
{
AssessmentEngineApi.AssessmentEngineApiClient client =
_grpcClientFactory.CreateClient<AssessmentEngineApi.AssessmentEngineApiClient>(engine.Type);
var request = new StartJobRequest
{
EngineType = engine.Type,
EngineId = engine.Id,
JobId = job.Id,
Options = JsonSerializer.Serialize(job.Options),
Corpus = Map(engine.Corpus),
IncludeAll = job.TextIds is null || job.TextIds.Count == 0
};
if (engine.ReferenceCorpus is not null)
request.ReferenceCorpus = Map(engine.ReferenceCorpus);
if (job.TextIds is not null)
request.IncludeTextIds.Add(job.TextIds);
if (job.ScriptureRange is not null)
{
if (
engine.Corpus.Files.Count > 1
|| engine.Corpus.Files[0].Format != Shared.Contracts.FileFormat.Paratext
)
{
throw new InvalidOperationException($"The engine is not compatible with using a scripture range.");
}

try
{
ScrVers versification = _scriptureDataFileService
.GetParatextProjectSettings(request.Corpus.Files[0].Location)
.Versification;
Dictionary<string, ScriptureChapters> chapters = ScriptureRangeParser
.GetChapters(job.ScriptureRange, versification)
.ToDictionary(kvp => kvp.Key, kvp => new ScriptureChapters { Chapters = { kvp.Value } });
request.IncludeChapters.Add(chapters);
}
catch (ArgumentException ae)
{
throw new InvalidOperationException(
$"The scripture range {job.ScriptureRange} is not valid: {ae.Message}"
);
}
}

// Log the job request summary
try
{
var jobRequestSummary = (JsonObject)JsonNode.Parse(JsonSerializer.Serialize(request))!;
// correct job options parsing
jobRequestSummary.Remove("Options");
try
{
jobRequestSummary.Add("Options", JsonNode.Parse(request.Options));
}
catch (JsonException)
{
jobRequestSummary.Add("Options", "Job \"Options\" failed parsing: " + (request.Options ?? "null"));
}
jobRequestSummary.Add("Event", "JobRequest");
jobRequestSummary.Add("ClientId", engine.Owner);
_logger.LogInformation("{request}", jobRequestSummary.ToJsonString());
}
catch (JsonException)
{
_logger.LogInformation("Error parsing job request summary.");
_logger.LogInformation("{request}", JsonSerializer.Serialize(request));
}

await client.StartJobAsync(request, cancellationToken: cancellationToken);
}
catch
{
await _jobs.DeleteAsync(job, CancellationToken.None);
throw;
}
}

public async Task<bool> CancelJobAsync(string id, string jobId, CancellationToken cancellationToken = default)
{
AssessmentEngine? engine = await GetAsync(id, cancellationToken);
if (engine is null)
throw new EntityNotFoundException($"Could not find the AssessmentEngine '{id}'.");

AssessmentEngineApi.AssessmentEngineApiClient client =
_grpcClientFactory.CreateClient<AssessmentEngineApi.AssessmentEngineApiClient>(engine.Type);
try
{
await client.CancelJobAsync(
new CancelJobRequest { EngineType = engine.Type, EngineId = engine.Id, },
cancellationToken: cancellationToken
);
}
catch (RpcException re)
{
if (re.StatusCode is StatusCode.Aborted)
return false;
throw;
}
return true;
}

public Task RemoveDataFileFromAllCorporaAsync(string dataFileId, CancellationToken cancellationToken = default)
{
return Entities.UpdateAllAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,42 @@
namespace Serval.Assessment.Services;

public class AssessmentPlatformServiceV1(
IRepository<AssessmentBuild> jobs,
IRepository<AssessmentBuild> builds,
IRepository<AssessmentEngine> engines,
IRepository<AssessmentResult> results,
IDataAccessContext dataAccessContext,
IPublishEndpoint publishEndpoint
)
: EnginePlatformServiceBaseV1<AssessmentBuild, AssessmentEngine, AssessmentResult>(
jobs,
builds,
engines,
results,
dataAccessContext,
publishEndpoint
)
{
protected override async Task<AssessmentEngine?> UpdateEngineAfterJobCompleted(
protected override async Task<AssessmentEngine?> UpdateEngineAfterBuildCompleted(
AssessmentBuild build,
string engineId,
JobCompletedRequest request,
BuildCompletedRequest request,
CancellationToken ct
)
{
var parameters = JsonSerializer.Deserialize<AssessmentEngineCompletedStatistics>(request.StatisticsSerialized)!;
return await Engines.UpdateAsync(
engineId,
u => u.Set(e => e.IsBuildRunning, false).Inc(e => e.BuildRevision),
u => u.Set(e => e.IsBuilding, false).Inc(e => e.BuildRevision),
cancellationToken: ct
);
}

protected override AssessmentResult CreateResultFromRequest(InsertResultsRequest request, int nextJobRevision)
protected override AssessmentResult CreateResultFromRequest(InsertResultsRequest request, int nextBuildRevision)
{
var content = JsonSerializer.Deserialize<AssessmentResultContent>(request.ContentSerialized)!;
return new AssessmentResult
{
EngineRef = request.EngineId,
BuildRevision = nextJobRevision,
BuildRevision = nextBuildRevision,
TextId = content.TextId,
Ref = content.Ref,
Score = content.Score,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ public interface IAssessmentEngineService : IEngineServiceBase

Task<AssessmentEngine> CreateAsync(AssessmentEngine engine, CancellationToken cancellationToken = default);
Task DeleteAsync(string engineId, CancellationToken cancellationToken = default);
Task StartJobAsync(AssessmentBuild build, CancellationToken cancellationToken = default);
Task<bool> CancelJobAsync(string engineId, string jobId, CancellationToken cancellationToken = default);
Task StartBuildAsync(AssessmentBuild build, CancellationToken cancellationToken = default);
Task<bool> CancelBuildAsync(string engineId, CancellationToken cancellationToken = default);
Task<Corpus> ReplaceCorpusAsync(string id, Corpus corpus, CancellationToken cancellationToken = default);
Task<Corpus> ReplaceReferenceCorpusAsync(
string id,
Expand Down
2 changes: 1 addition & 1 deletion src/Serval/src/Serval.Assessment/Services/ResultService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public async Task<IEnumerable<AssessmentResult>> GetAllAsync(
)
{
return await Entities.GetAllAsync(
r => r.EngineRef == engineId && r.JobRef == jobId && (textId == null || r.TextId == textId),
r => r.EngineRef == engineId && r.BuildRevision == jobId && (textId == null || r.TextId == textId),
cancellationToken
);
}
Expand Down
Loading

0 comments on commit 12a40bd

Please sign in to comment.