From c8e7ec80c79919006c41afc191abd2f5516721b1 Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Fri, 23 Aug 2024 04:09:56 +0200 Subject: [PATCH] limited par join --- .../gql/server/interpreter/SignalScopes.scala | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/modules/server/src/main/scala/gql/server/interpreter/SignalScopes.scala b/modules/server/src/main/scala/gql/server/interpreter/SignalScopes.scala index 4d46ecd6e..ff916bb2b 100644 --- a/modules/server/src/main/scala/gql/server/interpreter/SignalScopes.scala +++ b/modules/server/src/main/scala/gql/server/interpreter/SignalScopes.scala @@ -180,6 +180,9 @@ object SignalScopes { F.deferred[(Scope[F], A)].flatMap { head => val stream0 = if (takeOne) stream.take(1) else stream + def size(s: Scope[F]): F[Long] = + s.children.flatMap(_.foldMapA(size)).map(_ + 1) + scope .openChild { parentScope => val si = StreamInfo(parentScope, signal, cursor) @@ -195,27 +198,29 @@ object SignalScopes { } } - stream0.zipWithIndex - .evalMap { case (a, i) => - F.deferred[Unit].flatMap { d => - parentScope - .openChild { scope => - Resource.onFinalize(d.complete(()).void) >> - reserveShowMapping(scope.id, s"resource-$i") - } - .flatMap { o => - o match { - // This is totally legal, maybe someone shut us down while we are emitting - case None => F.pure(fs2.Stream[F, Unit]()) - case Some((childScope, _)) => publish1(i, a, childScope).as(fs2.Stream.eval(d.get)) + stream0.zipWithIndex + .evalMap { case (a, i) => + F.deferred[Unit].flatMap { d => + parentScope + .openChild { scope => + Resource.onFinalize(d.complete(()).void) >> + reserveShowMapping(scope.id, s"resource-$i") } - } + .flatMap { o => + o match { + // This is totally legal, maybe someone shut us down while we are emitting + case None => F.pure(fs2.Stream[F, Unit]()) + case Some((childScope, _)) => publish1(i, a, childScope).as(fs2.Stream.eval(d.get)) + } + } + } } - } - .parJoinUnbounded - .compile - .drain - .background <* + // .flatten + .parJoin(2) + // .parJoinUnbounded + .compile + .drain + .background <* reserveShowMapping( parentScope.id, s"${cursor.show} (signal = $signal)"