Skip to content

Commit

Permalink
response streaming working
Browse files Browse the repository at this point in the history
  • Loading branch information
coronabytes committed Sep 14, 2024
1 parent 12be938 commit 7ed987e
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions Core.ServiceMesh/Internal/ServiceMeshWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Core.ServiceMesh.Internal;

Expand Down Expand Up @@ -443,8 +444,11 @@ private async ValueTask AsyncEnumerableWrapper<T>(string sub, IAsyncEnumerable<T
{
await foreach (var val in stream)
{
await nats.PublishAsync(sub, options.Serialize(val!, true));
var data = options.Serialize(val!, true);
await nats.PublishAsync(sub, data);
}

await nats.PublishAsync(sub, Array.Empty<byte>());
}

private async Task DurableWorker(CancellationToken stoppingToken)
Expand Down Expand Up @@ -609,11 +613,13 @@ public async IAsyncEnumerable<T> StreamAsync<T>(string subject, object[] args, T

var subId = Guid.NewGuid().ToString("N");

headers["reply-sub-id"] = subId;
headers["return-sub-id"] = subId;
await nats.PublishAsync(subject, body, headers: headers);

await foreach (var msg in nats.SubscribeAsync<byte[]>(subId))
{
if (msg.Data == null)
yield break;
yield return (T)options.Deserialize(msg.Data!, typeof(T), true)!;
}
}
Expand Down

0 comments on commit 7ed987e

Please sign in to comment.