Skip to content

Commit

Permalink
Merge branch 'main' into build_summary_gprc
Browse files Browse the repository at this point in the history
  • Loading branch information
mudiagaobrikisil authored Dec 5, 2024
2 parents 264dfb1 + 6d22621 commit 6551cf6
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 29 deletions.
2 changes: 1 addition & 1 deletion deploy/serval/templates/alert-manager-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
from: {{ .Values.alertEmail }}
requireTLS: true
sendResolved: true
smarthost: mail.languagetechnology.org:587
smarthost: smtppro.zoho.com:587
tlsConfig: {}
to: {{ .Values.emailsToAlert }}
name: alert-serval
Expand Down
7 changes: 7 additions & 0 deletions src/Serval/src/Serval.Shared/Models/IInitializableEntity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Shared.Models;

public interface IInitializableEntity : IEntity
{
bool? IsInitialized { get; set; }
DateTime? DateCreated { get; set; }
}
2 changes: 1 addition & 1 deletion src/Serval/src/Serval.Shared/Services/EntityServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public abstract class EntityServiceBase<T>(IRepository<T> entities)
{
protected IRepository<T> Entities { get; } = entities;

public async Task<T> GetAsync(string id, CancellationToken cancellationToken = default)
public virtual async Task<T> GetAsync(string id, CancellationToken cancellationToken = default)
{
T? entity = await Entities.GetAsync(id, cancellationToken);
if (entity is null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Microsoft.Extensions.DependencyInjection;
using SIL.ServiceToolkit.Services;

namespace Serval.Shared.Services;

public abstract class UninitializedCleanupService<T>(
IServiceProvider services,
ILogger<UninitializedCleanupService<T>> logger,
TimeSpan? timeout = null
) : RecurrentTask($"{typeof(T)} Cleanup Service", services, RefreshPeriod, logger)
where T : IInitializableEntity
{
private readonly ILogger<UninitializedCleanupService<T>> _logger = logger;
private readonly TimeSpan _timeout = timeout ?? TimeSpan.FromMinutes(2);
private static readonly TimeSpan RefreshPeriod = TimeSpan.FromDays(1);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
_logger.LogInformation("Running build cleanup job");
var entities = scope.ServiceProvider.GetRequiredService<IRepository<T>>();
await CheckEntitiesAsync(entities, cancellationToken);
}

public async Task CheckEntitiesAsync(IRepository<T> entities, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
IEnumerable<T> uninitializedEntities = await entities.GetAllAsync(
e =>
e.DateCreated != null
&& e.DateCreated < now - _timeout
&& e.IsInitialized != null
&& !e.IsInitialized.Value,
cancellationToken
);

foreach (T entity in uninitializedEntities)
{
_logger.LogInformation(
"Deleting {type} {id} because it was never successfully initialized.",
typeof(T),
entity.Id
);
await DeleteEntityAsync(entities, entity, cancellationToken);
}
}

protected virtual async Task DeleteEntityAsync(
IRepository<T> entities,
T entity,
CancellationToken cancellationToken
)
{
await entities.DeleteAsync(e => e.Id == entity.Id, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ this IMongoDataAccessConfigurator configurator
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Engine>(Builders<Engine>.IndexKeys.Ascending(e => e.Owner))
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Engine>(Builders<Engine>.IndexKeys.Ascending(e => e.DateCreated))
);
}
);
configurator.AddRepository<Build>(
"translation.builds",
init: c =>
c.Indexes.CreateOrUpdateAsync(
init: async c =>
{
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Build>(Builders<Build>.IndexKeys.Ascending(b => b.EngineRef))
)
);
await c.Indexes.CreateOrUpdateAsync(
new CreateIndexModel<Build>(Builders<Build>.IndexKeys.Ascending(b => b.DateCreated))
);
}
);
configurator.AddRepository<Pretranslation>(
"translation.pretranslations",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public static IServalBuilder AddTranslation(this IServalBuilder builder)
builder.Services.AddScoped<IPretranslationService, PretranslationService>();
builder.Services.AddScoped<IEngineService, EngineService>();

builder.Services.AddSingleton<EngineCleanupService>();
builder.Services.AddSingleton<BuildCleanupService>();

var translationOptions = new TranslationOptions();
builder.Configuration.GetSection(TranslationOptions.Key).Bind(translationOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,8 @@ private Engine Map(TranslationEngineConfigDto source)
Type = source.Type.ToPascalCase(),
Owner = Owner,
Corpora = [],
IsModelPersisted = source.IsModelPersisted
IsModelPersisted = source.IsModelPersisted,
IsInitialized = false
};
}

Expand All @@ -1333,7 +1334,8 @@ private static Build Map(Engine engine, TranslationBuildConfigDto source, string
Pretranslate = Map(engine, source.Pretranslate),
TrainOn = Map(engine, source.TrainOn),
Options = Map(source.Options),
DeploymentVersion = deploymentVersion
DeploymentVersion = deploymentVersion,
IsInitialized = false
};
}

Expand Down
5 changes: 3 additions & 2 deletions src/Serval/src/Serval.Translation/Models/Build.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Serval.Translation.Models;

public record Build : IEntity
public record Build : IInitializableEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
Expand All @@ -13,9 +13,10 @@ public record Build : IEntity
public string? Message { get; init; }
public int? QueueDepth { get; init; }
public JobState State { get; init; } = JobState.Pending;
public DateTime DateCreated { get; init; } = DateTime.UtcNow;
public DateTime? DateFinished { get; init; }
public IReadOnlyDictionary<string, object>? Options { get; init; }
public string? DeploymentVersion { get; init; }
public IReadOnlyDictionary<string, string> ExecutionData { get; init; } = new Dictionary<string, string>();
public bool? IsInitialized { get; set; }
public DateTime? DateCreated { get; set; }
}
4 changes: 3 additions & 1 deletion src/Serval/src/Serval.Translation/Models/Engine.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Serval.Translation.Models;

public record Engine : IOwnedEntity
public record Engine : IOwnedEntity, IInitializableEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
Expand All @@ -16,4 +16,6 @@ public record Engine : IOwnedEntity
public int ModelRevision { get; init; }
public double Confidence { get; init; }
public int CorpusSize { get; init; }
public bool? IsInitialized { get; set; }
public DateTime? DateCreated { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Services;

public class BuildCleanupService(
IServiceProvider services,
ILogger<BuildCleanupService> logger,
TimeSpan? timeout = null
) : UninitializedCleanupService<Build>(services, logger, timeout) { }
32 changes: 28 additions & 4 deletions src/Serval/src/Serval.Translation/Services/BuildService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,30 @@ public class BuildService(IRepository<Build> builds) : EntityServiceBase<Build>(
{
public async Task<IEnumerable<Build>> GetAllAsync(string parentId, CancellationToken cancellationToken = default)
{
return await Entities.GetAllAsync(e => e.EngineRef == parentId, cancellationToken);
return await Entities.GetAllAsync(
e => e.EngineRef == parentId && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
}

public override async Task<Build> GetAsync(string id, CancellationToken cancellationToken = default)
{
Build? build = await Entities.GetAsync(
e => e.Id == id && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
if (build == null)
throw new EntityNotFoundException($"Could not find the {typeof(Build).Name} '{id}'.");
return build;
}

public Task<Build?> GetActiveAsync(string parentId, CancellationToken cancellationToken = default)
{
return Entities.GetAsync(
b => b.EngineRef == parentId && (b.State == JobState.Active || b.State == JobState.Pending),
b =>
b.EngineRef == parentId
&& (b.IsInitialized == null || b.IsInitialized.Value)
&& (b.State == JobState.Active || b.State == JobState.Pending),
cancellationToken
);
}
Expand All @@ -21,7 +38,11 @@ public Task<EntityChange<Build>> GetNewerRevisionAsync(
CancellationToken cancellationToken = default
)
{
return GetNewerRevisionAsync(e => e.Id == id, minRevision, cancellationToken);
return GetNewerRevisionAsync(
e => e.Id == id && (e.IsInitialized == null || e.IsInitialized.Value),
minRevision,
cancellationToken
);
}

public Task<EntityChange<Build>> GetActiveNewerRevisionAsync(
Expand All @@ -31,7 +52,10 @@ public Task<EntityChange<Build>> GetActiveNewerRevisionAsync(
)
{
return GetNewerRevisionAsync(
b => b.EngineRef == parentId && (b.State == JobState.Active || b.State == JobState.Pending),
b =>
b.EngineRef == parentId
&& (b.IsInitialized == null || b.IsInitialized.Value)
&& (b.State == JobState.Active || b.State == JobState.Pending),
minRevision,
cancellationToken
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Serval.Translation.Services;

public class EngineCleanupService(
IServiceProvider services,
ILogger<EngineCleanupService> logger,
TimeSpan? timeout = null
) : UninitializedCleanupService<Engine>(services, logger, timeout) { }
66 changes: 51 additions & 15 deletions src/Serval/src/Serval.Translation/Services/EngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ IScriptureDataFileService scriptureDataFileService
private readonly ILogger<EngineService> _logger = loggerFactory.CreateLogger<EngineService>();
private readonly IScriptureDataFileService _scriptureDataFileService = scriptureDataFileService;

public override async Task<Engine> GetAsync(string id, CancellationToken cancellationToken = default)
{
Engine engine = await base.GetAsync(id, cancellationToken);
if (!(engine.IsInitialized ?? true))
throw new EntityNotFoundException($"Could not find the {typeof(Engine).Name} '{id}'.");
return engine;
}

public override async Task<IEnumerable<Engine>> GetAllAsync(
string owner,
CancellationToken cancellationToken = default
)
{
return await Entities.GetAllAsync(
e => e.Owner == owner && (e.IsInitialized == null || e.IsInitialized.Value),
cancellationToken
);
}

public async Task<Models.TranslationResult> TranslateAsync(
string engineId,
string segment,
Expand Down Expand Up @@ -120,9 +139,9 @@ await client.TrainSegmentPairAsync(

public override async Task<Engine> CreateAsync(Engine engine, CancellationToken cancellationToken = default)
{
bool updateIsModelPersisted = engine.IsModelPersisted is null;
try
{
engine.DateCreated = DateTime.UtcNow;
await Entities.InsertAsync(engine, cancellationToken);
TranslationEngineApi.TranslationEngineApiClient? client =
_grpcClientFactory.CreateClient<TranslationEngineApi.TranslationEngineApiClient>(engine.Type);
Expand All @@ -146,6 +165,15 @@ public override async Task<Engine> CreateAsync(Engine engine, CancellationToken
{
IsModelPersisted = createResponse.IsModelPersisted
};
await Entities.UpdateAsync(
engine,
u =>
{
u.Set(e => e.IsInitialized, true);
u.Set(e => e.IsModelPersisted, engine.IsModelPersisted);
},
cancellationToken: CancellationToken.None
);
}
catch (RpcException rpcex)
{
Expand All @@ -164,14 +192,6 @@ public override async Task<Engine> CreateAsync(Engine engine, CancellationToken
await Entities.DeleteAsync(engine, CancellationToken.None);
throw;
}
if (updateIsModelPersisted)
{
await Entities.UpdateAsync(
engine,
u => u.Set(e => e.IsModelPersisted, engine.IsModelPersisted),
cancellationToken: cancellationToken
);
}
return engine;
}

Expand Down Expand Up @@ -216,6 +236,7 @@ private Dictionary<string, List<int>> GetChapters(string fileLocation, string sc

public async Task StartBuildAsync(Build build, CancellationToken cancellationToken = default)
{
build.DateCreated = DateTime.UtcNow;
Engine engine = await GetAsync(build.EngineRef, cancellationToken);
await _builds.InsertAsync(build, cancellationToken);

Expand Down Expand Up @@ -325,6 +346,11 @@ pretranslate is null
_logger.LogInformation("{request}", JsonSerializer.Serialize(request));
}
await client.StartBuildAsync(request, cancellationToken: cancellationToken);
await _builds.UpdateAsync(
b => b.Id == build.Id,
u => u.Set(e => e.IsInitialized, true),
cancellationToken: CancellationToken.None
);
}
catch
{
Expand Down Expand Up @@ -382,7 +408,11 @@ public async Task<ModelDownloadUrl> GetModelDownloadUrlAsync(

public Task AddCorpusAsync(string engineId, Models.Corpus corpus, CancellationToken cancellationToken = default)
{
return Entities.UpdateAsync(engineId, u => u.Add(e => e.Corpora, corpus), cancellationToken: cancellationToken);
return Entities.UpdateAsync(
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.Add(e => e.Corpora, corpus),
cancellationToken: cancellationToken
);
}

public async Task<Models.Corpus> UpdateCorpusAsync(
Expand All @@ -394,7 +424,10 @@ public Task AddCorpusAsync(string engineId, Models.Corpus corpus, CancellationTo
)
{
Engine? engine = await Entities.UpdateAsync(
e => e.Id == engineId && e.Corpora.Any(c => c.Id == corpusId),
e =>
e.Id == engineId
&& (e.IsInitialized == null || e.IsInitialized.Value)
&& e.Corpora.Any(c => c.Id == corpusId),
u =>
{
if (sourceFiles is not null)
Expand All @@ -421,7 +454,7 @@ await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
originalEngine = await Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.RemoveAll(e => e.Corpora, c => c.Id == corpusId),
returnOriginal: true,
cancellationToken: ct
Expand Down Expand Up @@ -456,7 +489,7 @@ public Task AddParallelCorpusAsync(
)
{
return Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.Add(e => e.ParallelCorpora, corpus),
cancellationToken: cancellationToken
);
Expand All @@ -471,7 +504,10 @@ public Task AddParallelCorpusAsync(
)
{
Engine? engine = await Entities.UpdateAsync(
e => e.Id == engineId && e.ParallelCorpora.Any(c => c.Id == parallelCorpusId),
e =>
e.Id == engineId
&& (e.IsInitialized == null || e.IsInitialized.Value)
&& e.ParallelCorpora.Any(c => c.Id == parallelCorpusId),
u =>
{
if (sourceCorpora is not null)
Expand Down Expand Up @@ -502,7 +538,7 @@ await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
originalEngine = await Entities.UpdateAsync(
engineId,
e => e.Id == engineId && (e.IsInitialized == null || e.IsInitialized.Value),
u => u.RemoveAll(e => e.ParallelCorpora, c => c.Id == parallelCorpusId),
returnOriginal: true,
cancellationToken: ct
Expand Down
Loading

0 comments on commit 6551cf6

Please sign in to comment.