Skip to content

Commit

Permalink
Preprocess should be async
Browse files Browse the repository at this point in the history
  • Loading branch information
johnml1135 committed Nov 26, 2024
1 parent 0e96e57 commit 3c8bd73
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ await client.BuildStartedAsync(
try
{
List<InsertPretranslationsRequest> pretranslationsRequests = [];
_parallelCorpusPreprocessingService.Preprocess(
await _parallelCorpusPreprocessingService.Preprocess(
request.Corpora.Select(Map).ToList(),
row => { },
row => Task.CompletedTask,
(row, corpus) =>
{
pretranslationsRequests.Add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,27 @@ CancellationToken cancellationToken
if (buildOptions is not null)
buildOptionsObject = JsonSerializer.Deserialize<JsonObject>(buildOptions);

using MemoryStream sourceStream = new();
using MemoryStream targetStream = new();
using MemoryStream pretranslationStream = new();

using StreamWriter targetTrainWriter = new(targetStream, Encoding.Default);
using StreamWriter sourceTrainWriter = new(sourceStream, Encoding.Default);
await using Utf8JsonWriter pretranslateWriter = new(pretranslationStream, PretranslateWriterOptions);
await using StreamWriter sourceTrainWriter =
new(await _sharedFileService.OpenWriteAsync($"builds/{buildId}/train.src.txt", cancellationToken));
await using StreamWriter targetTrainWriter =
new(await _sharedFileService.OpenWriteAsync($"builds/{buildId}/train.trg.txt", cancellationToken));
await using Utf8JsonWriter pretranslateWriter =
new(
await _sharedFileService.OpenWriteAsync($"builds/{buildId}/pretranslate.src.json", cancellationToken),
PretranslateWriterOptions
);

int trainCount = 0;
int pretranslateCount = 0;
pretranslateWriter.WriteStartArray();
_parallelCorpusPreprocessingService.Preprocess(
await _parallelCorpusPreprocessingService.Preprocess(
corpora,
row =>
async row =>
{
if (row.SourceSegment.Length > 0 || row.TargetSegment.Length > 0)
{
sourceTrainWriter.WriteLine(row.SourceSegment);
targetTrainWriter.WriteLine(row.TargetSegment);
await sourceTrainWriter.WriteLineAsync(row.SourceSegment);
await targetTrainWriter.WriteLineAsync(row.TargetSegment);
}
if (row.SourceSegment.Length > 0 && row.TargetSegment.Length > 0)
trainCount++;
Expand All @@ -138,22 +140,6 @@ CancellationToken cancellationToken

pretranslateWriter.WriteEndArray();

await sourceTrainWriter.FlushAsync(cancellationToken);
await targetTrainWriter.FlushAsync(cancellationToken);
await pretranslateWriter.FlushAsync(cancellationToken);

async Task WriteStreamAsync(MemoryStream stream, string path)
{
stream.Position = 0;
await using StreamWriter writer = new(await _sharedFileService.OpenWriteAsync(path, cancellationToken));
await writer.WriteAsync(Encoding.Default.GetString(stream.ToArray()));
await writer.FlushAsync(cancellationToken);
}

await WriteStreamAsync(sourceStream, $"builds/{buildId}/train.src.txt");
await WriteStreamAsync(targetStream, $"builds/{buildId}/train.trg.txt");
await WriteStreamAsync(pretranslationStream, $"builds/{buildId}/pretranslate.src.json");

return (trainCount, pretranslateCount);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Nito.AsyncEx;

namespace SIL.ServiceToolkit.Utils;

public interface IParallelCorpusPreprocessingService
{
void Preprocess(
Task Preprocess(
IReadOnlyList<ParallelCorpus> corpora,
Action<Row> train,
Func<Row, Task> train,
Action<Row, ParallelCorpus> pretranslate,
bool useKeyTerms = false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ internal int Seed
}
}

public void Preprocess(
public async Task Preprocess(
IReadOnlyList<ParallelCorpus> corpora,
Action<Row> train,
Func<Row, Task> train,
Action<Row, ParallelCorpus> pretranslate,
bool useKeyTerms = false
)
Expand Down Expand Up @@ -77,7 +77,7 @@ public void Preprocess(

foreach (Row row in CollapseRanges(trainingRows))
{
train(row);
await train(row);
}

if (useKeyTerms)
Expand All @@ -93,7 +93,7 @@ public void Preprocess(
IParallelTextCorpus parallelKeyTermsCorpus = sourceTermCorpus.AlignRows(targetTermCorpus);
foreach (ParallelTextRow row in parallelKeyTermsCorpus)
{
train(new Row(row.TextId, row.Refs, row.SourceText, row.TargetText, 1));
await train(new Row(row.TextId, row.Refs, row.SourceText, row.TargetText, 1));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void TestParallelCorpusPreprocessor()
{
if (row.SourceSegment.Length > 0 && row.TargetSegment.Length > 0)
trainCount++;
return Task.CompletedTask;
},
(row, _) =>
{
Expand Down

0 comments on commit 3c8bd73

Please sign in to comment.