Skip to content

Commit

Permalink
limited par join
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Aug 23, 2024
1 parent 2566465 commit c8e7ec8
Showing 1 changed file with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ object SignalScopes {
F.deferred[(Scope[F], A)].flatMap { head =>
val stream0 = if (takeOne) stream.take(1) else stream

def size(s: Scope[F]): F[Long] =
s.children.flatMap(_.foldMapA(size)).map(_ + 1)

scope
.openChild { parentScope =>
val si = StreamInfo(parentScope, signal, cursor)
Expand All @@ -195,27 +198,29 @@ object SignalScopes {
}
}

stream0.zipWithIndex
.evalMap { case (a, i) =>
F.deferred[Unit].flatMap { d =>
parentScope
.openChild { scope =>
Resource.onFinalize(d.complete(()).void) >>
reserveShowMapping(scope.id, s"resource-$i")
}
.flatMap { o =>
o match {
// This is totally legal, maybe someone shut us down while we are emitting
case None => F.pure(fs2.Stream[F, Unit]())
case Some((childScope, _)) => publish1(i, a, childScope).as(fs2.Stream.eval(d.get))
stream0.zipWithIndex
.evalMap { case (a, i) =>
F.deferred[Unit].flatMap { d =>
parentScope
.openChild { scope =>
Resource.onFinalize(d.complete(()).void) >>
reserveShowMapping(scope.id, s"resource-$i")
}
}
.flatMap { o =>
o match {
// This is totally legal, maybe someone shut us down while we are emitting
case None => F.pure(fs2.Stream[F, Unit]())
case Some((childScope, _)) => publish1(i, a, childScope).as(fs2.Stream.eval(d.get))
}
}
}
}
}
.parJoinUnbounded
.compile
.drain
.background <*
// .flatten
.parJoin(2)
// .parJoinUnbounded
.compile
.drain
.background <*
reserveShowMapping(
parentScope.id,
s"${cursor.show} (signal = $signal)"
Expand Down

0 comments on commit c8e7ec8

Please sign in to comment.