Skip to content

Commit

Permalink
Fix replyStream interruption (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Mar 6, 2024
1 parent 7390aa1 commit 8333e6a
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ private[shardcake] object ReplyChannel {
def replyStream(stream: ZStream[Any, Throwable, A]): UIO[Unit] =
(stream
.runForeachChunk(chunk => queue.offer(Take.chunk(chunk)))
.foldCauseZIO(cause => queue.offer(Take.failCause(cause)), _ => queue.offer(Take.end)) race await).fork.unit
.onExit(e => queue.offer(e.foldExit(Take.failCause, _ => Take.end)))
.ignore race await).fork.unit
val output: ZStream[Any, Throwable, A] = ZStream.fromQueueWithShutdown(queue).flattenTake.onError(fail)
}

Expand Down

0 comments on commit 8333e6a

Please sign in to comment.