Skip to content

Commit

Permalink
DRAFT: Only use locking when accessing SMT model
Browse files Browse the repository at this point in the history
  • Loading branch information
ddaspit committed Sep 7, 2024
1 parent 1d4032d commit 72f753e
Show file tree
Hide file tree
Showing 38 changed files with 704 additions and 845 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ public class BuildJobOptions
public const string Key = "BuildJob";

public IList<ClearMLBuildQueue> ClearML { get; set; } = new List<ClearMLBuildQueue>();
public TimeSpan PostProcessLockLifetime { get; set; } = TimeSpan.FromSeconds(120);
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ public static IMachineBuilder AddServalTranslationEngineService(
{
options.Interceptors.Add<CancellationInterceptor>();
options.Interceptors.Add<UnimplementedInterceptor>();
options.Interceptors.Add<TimeoutInterceptor>();
});
builder.AddServalPlatformService(connectionString);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public class SmtTransferEngineOptions
public string EnginesDir { get; set; } = "translation_engines";
public TimeSpan EngineCommitFrequency { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpan InactiveEngineTimeout { get; set; } = TimeSpan.FromMinutes(10);
public TimeSpan SaveModelTimeout { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpan EngineCommitTimeout { get; set; } = TimeSpan.FromMinutes(2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public record TranslationEngine : IEntity
public required bool IsModelPersisted { get; init; }
public int BuildRevision { get; init; }
public Build? CurrentBuild { get; init; }
public bool? CollectTrainSegmentPairs { get; init; }
}
91 changes: 45 additions & 46 deletions src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public async Task DeleteEngineAsync(string engineId, CancellationToken cancellat

public async Task<bool> StartBuildJobAsync(
BuildJobRunnerType runnerType,
TranslationEngineType engineType,
string engineId,
string buildId,
BuildStage stage,
Expand All @@ -67,18 +68,9 @@ public async Task<bool> StartBuildJobAsync(
CancellationToken cancellationToken = default
)
{
TranslationEngine? engine = await _engines.GetAsync(
e =>
e.EngineId == engineId
&& (e.CurrentBuild == null || e.CurrentBuild.JobState != BuildJobState.Canceling),
cancellationToken
);
if (engine is null)
return false;

IBuildJobRunner runner = _runners[runnerType];
string jobId = await runner.CreateJobAsync(
engine.Type,
engineType,
engineId,
buildId,
stage,
Expand All @@ -88,8 +80,17 @@ public async Task<bool> StartBuildJobAsync(
);
try
{
await _engines.UpdateAsync(
e => e.EngineId == engineId,
TranslationEngine? engine = await _engines.UpdateAsync(
e =>
e.EngineId == engineId
&& (
(stage == BuildStage.Preprocess && e.CurrentBuild == null)
|| (
stage != BuildStage.Preprocess
&& e.CurrentBuild != null
&& e.CurrentBuild.JobState != BuildJobState.Canceling
)
),
u =>
u.Set(
e => e.CurrentBuild,
Expand All @@ -105,6 +106,11 @@ await _engines.UpdateAsync(
),
cancellationToken: cancellationToken
);
if (engine is null)
{
await runner.DeleteJobAsync(jobId, CancellationToken.None);
return false;
}
await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken);
return true;
}
Expand All @@ -120,44 +126,36 @@ await _engines.UpdateAsync(
CancellationToken cancellationToken = default
)
{
TranslationEngine? engine = await _engines.GetAsync(
e => e.EngineId == engineId && e.CurrentBuild != null,
cancellationToken
// cancel a job that hasn't started yet
TranslationEngine? engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Pending,
u =>
{
u.Unset(b => b.CurrentBuild);
u.Set(e => e.CollectTrainSegmentPairs, false);
},
returnOriginal: true,
cancellationToken: cancellationToken
);
if (engine is null || engine.CurrentBuild is null)
return (null, BuildJobState.None);

IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];

if (engine.CurrentBuild.JobState is BuildJobState.Pending)
if (engine is not null && engine.CurrentBuild is not null)
{
// cancel a job that hasn't started yet
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null,
u => u.Unset(b => b.CurrentBuild),
returnOriginal: true,
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
// job will be deleted from the queue
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.None);
}
// job will be deleted from the queue
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.None);
}
else if (engine.CurrentBuild.JobState is BuildJobState.Active)

// cancel a job that is already running
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Active,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling),
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
// cancel a job that is already running
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling),
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.Canceling);
}
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.Canceling);
}

return (null, BuildJobState.None);
Expand Down Expand Up @@ -193,6 +191,7 @@ public Task BuildJobFinishedAsync(
u =>
{
u.Unset(e => e.CurrentBuild);
u.Set(e => e.CollectTrainSegmentPairs, false);
if (buildComplete)
u.Inc(e => e.BuildRevision);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ await _clearMLService.GetTasksForQueueAsync(_queuePerEngineType[engineType], can

var dataAccessContext = scope.ServiceProvider.GetRequiredService<IDataAccessContext>();
var platformService = scope.ServiceProvider.GetRequiredService<IPlatformService>();
var lockFactory = scope.ServiceProvider.GetRequiredService<IDistributedReaderWriterLockFactory>();
foreach (TranslationEngine engine in trainingEngines)
{
if (engine.CurrentBuild is null || !tasks.TryGetValue(engine.CurrentBuild.JobId, out ClearMLTask? task))
Expand Down Expand Up @@ -119,7 +118,6 @@ or ClearMLTaskStatus.Completed
{
bool canceled = !await TrainJobStartedAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
engine.EngineId,
Expand Down Expand Up @@ -159,8 +157,8 @@ await UpdateTrainJobStatus(
cancellationToken
);
bool canceling = !await TrainJobCompletedAsync(
lockFactory,
buildJobService,
engine.Type,
engine.EngineId,
engine.CurrentBuild.BuildId,
(int)GetMetric(task, SummaryMetric, TrainCorpusSizeVariant),
Expand All @@ -172,7 +170,6 @@ await UpdateTrainJobStatus(
{
await TrainJobCanceledAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
engine.EngineId,
Expand All @@ -187,7 +184,6 @@ await TrainJobCanceledAsync(
{
await TrainJobCanceledAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
engine.EngineId,
Expand All @@ -201,7 +197,6 @@ await TrainJobCanceledAsync(
{
await TrainJobFaultedAsync(
dataAccessContext,
lockFactory,
buildJobService,
platformService,
engine.EngineId,
Expand All @@ -223,37 +218,31 @@ await TrainJobFaultedAsync(

private async Task<bool> TrainJobStartedAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
string engineId,
string buildId,
CancellationToken cancellationToken = default
)
{
bool success;
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
success = await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return true;
},
cancellationToken: cancellationToken
);
}
bool success = await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return true;
},
cancellationToken: cancellationToken
);
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({BuildId})", buildId);
return success;
}

private async Task<bool> TrainJobCompletedAsync(
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
TranslationEngineType engineType,
string engineId,
string buildId,
int corpusSize,
Expand All @@ -264,19 +253,16 @@ CancellationToken cancellationToken
{
try
{
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
return await buildJobService.StartBuildJobAsync(
BuildJobRunnerType.Hangfire,
engineId,
buildId,
BuildStage.Postprocess,
(corpusSize, confidence),
buildOptions,
cancellationToken
);
}
return await buildJobService.StartBuildJobAsync(
BuildJobRunnerType.Hangfire,
engineType,
engineId,
buildId,
BuildStage.Postprocess,
(corpusSize, confidence),
buildOptions,
cancellationToken
);
}
finally
{
Expand All @@ -286,7 +272,6 @@ CancellationToken cancellationToken

private async Task TrainJobFaultedAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
string engineId,
Expand All @@ -297,23 +282,19 @@ CancellationToken cancellationToken
{
try
{
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
_logger.LogError("Build faulted ({BuildId}). Error: {ErrorMessage}", buildId, message);
}
finally
Expand All @@ -324,7 +305,6 @@ await buildJobService.BuildJobFinishedAsync(

private async Task TrainJobCanceledAsync(
IDataAccessContext dataAccessContext,
IDistributedReaderWriterLockFactory lockFactory,
IBuildJobService buildJobService,
IPlatformService platformService,
string engineId,
Expand All @@ -334,23 +314,19 @@ CancellationToken cancellationToken
{
try
{
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
await dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
_logger.LogInformation("Build canceled ({BuildId})", buildId);
}
finally
Expand Down
Loading

0 comments on commit 72f753e

Please sign in to comment.