From 31a31c3e501386e7091f72247823caab01b84e60 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Wed, 16 Oct 2024 09:41:25 -0400 Subject: [PATCH] Fix async stream issue --- .../TranslationEngineServiceV1.cs | 11 +++++++---- .../Services/PreprocessBuildJob.cs | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Echo/src/EchoTranslationEngine/TranslationEngineServiceV1.cs b/src/Echo/src/EchoTranslationEngine/TranslationEngineServiceV1.cs index c366639f..948467dc 100644 --- a/src/Echo/src/EchoTranslationEngine/TranslationEngineServiceV1.cs +++ b/src/Echo/src/EchoTranslationEngine/TranslationEngineServiceV1.cs @@ -1,4 +1,6 @@ -namespace EchoTranslationEngine; +using Nito.AsyncEx.Synchronous; + +namespace EchoTranslationEngine; public class TranslationEngineServiceV1(BackgroundTaskQueue taskQueue) : TranslationEngineApi.TranslationEngineApiBase { @@ -83,11 +85,11 @@ await client.BuildStartedAsync( ParallelCorpusPreprocessor.PreprocessCorpora( request.Corpora.Select(Map).ToList(), row => { }, - async (row, corpus) => + (row, corpus) => { if (row.SourceSegment.Length > 0 && row.TargetSegment.Length == 0) { - await call.RequestStream.WriteAsync( + call.RequestStream.WriteAsync( new InsertPretranslationsRequest { EngineId = request.EngineId, @@ -97,7 +99,8 @@ await call.RequestStream.WriteAsync( Translation = row.SourceSegment }, cancellationToken - ); + ) + .WaitAndUnwrapException(); } }, true diff --git a/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs index 0694cbc6..11a52c9a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs @@ -107,10 +107,10 @@ CancellationToken cancellationToken pretranslateWriter.WriteStartArray(); ParallelCorpusPreprocessor.PreprocessCorpora( corpora, - async row => + row => { - await sourceTrainWriter.WriteAsync($"{row.SourceSegment}\n"); - await targetTrainWriter.WriteAsync($"{row.TargetSegment}\n"); + sourceTrainWriter.Write($"{row.SourceSegment}\n"); + targetTrainWriter.Write($"{row.TargetSegment}\n"); if (row.SourceSegment.Length > 0 && row.TargetSegment.Length > 0) trainCount++; },