Skip to content

Commit

Permalink
doc: streaming optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Nov 18, 2023
1 parent e3e16b7 commit dcfc570
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 1 deletion.
60 changes: 60 additions & 0 deletions docs/server/schema/resolvers.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,66 @@ That is, the interpreter cycles through the following two phases:
* Interpret for the current values.
* Await new values (and values that arrived during the previous step).

:::tip
Since gql is free to ignore updates when a stream is a signal, one should prefer `evalMap` on a `Resolver` instead of a stream if possible.
:::

:::warning
For a given stream it must hold all child resources alive (maybe the child resources are also streams that may emit).
As such, for a given stream, gql must await a next element from the stream before releasing any currently held resources sub-tree.
This means that gql must be able to pull one element before closing the old one.
:::

:::tip
If you have streams of updates where you are only interested in that something changed (`Stream[F, Unit]`) there may be room for significant optimization.
In `fs2` you can merge streams with combinators such as `parJoin`, but they have to assume that there may be resources to account for.
If you are discarding the output of the stream or you are absolutely sure that the output does not depend on a resource lifetime,
one can write more optimized versions functions for this purpose.

<details>
<summary>Some examples of potentially more performant implementations</summary>

In a crude benchmarks, these combinators may perform an order of magnitude faster than `parJoin` or `merge`.
```scala mdoc
import fs2.{Pipe, Stream}
import fs2.concurrent._
def parListen[A]: Pipe[IO, Stream[IO, A], Unit] =
streams =>
for {
d <- Stream.eval(IO.deferred[Either[Throwable, Unit]])
c <- Stream.eval(IO.deferred[Unit])
sigRef <- Stream.eval(SignallingRef[IO, Unit](()))

bg = streams.flatMap { sub =>
Stream.supervise {
sub
.evalMap(_ => sigRef.set(()))
.compile
.drain
.onError(e => d.complete(Left(e)).void)
.onCancel(c.complete(()).void)
}.void
}

listenCancel = (c.get *> IO.canceled).as(Right(()): Either[Throwable, Unit])
fg = sigRef.discrete.interruptWhen(d).interruptWhen(listenCancel)

_ <- fg.concurrently(bg)
} yield ()

def parListenSignal[A]: Pipe[IO, Stream[IO, A], A] =
streams =>
Stream.eval(SignallingRef.of[IO, Option[A]](None)).flatMap { sig =>
sig.discrete.unNone.concurrently {
streams.parEvalMapUnorderedUnbounded { x =>
x.evalMap(x => sig.set(Some(x))).compile.drain
}
}
}
```
</details>
:::

Here is an example of some streams in action:
```scala mdoc
import scala.concurrent.duration._
Expand Down
101 changes: 100 additions & 1 deletion modules/server/src/test/scala/gql/PerformanceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,79 @@
*/
package gql

import fs2.Stream
import cats.implicits._
import munit.CatsEffectSuite
import gql._
import gql.ast._
import gql.dsl.all._
import cats.effect._
import scala.concurrent.duration._
import fs2.concurrent.SignallingRef

class PerformanceTest extends CatsEffectSuite {
def parListen[A]: fs2.Pipe[IO, fs2.Stream[IO, A], Unit] =
streams =>
for {
d <- Stream.eval(IO.deferred[Either[Throwable, Unit]])
c <- Stream.eval(IO.deferred[Unit])
sigRef <- Stream.eval(SignallingRef[IO, Unit](()))

bg = streams.flatMap { sub =>
Stream.supervise {
sub
.evalMap(_ => sigRef.set(()))
.compile
.drain
.onError(e => d.complete(Left(e)).void)
.onCancel(c.complete(()).void)
}.void
}

listenCancel = (c.get *> IO.canceled).as(Right(()): Either[Throwable, Unit])
fg = sigRef.discrete.interruptWhen(d).interruptWhen(listenCancel)

_ <- fg.concurrently(bg)
} yield ()

def parListenSignal[A]: fs2.Pipe[IO, fs2.Stream[IO, A], A] =
streams =>
Stream.eval(SignallingRef.of[IO, Option[A]](None)).flatMap { sig =>
sig.discrete.unNone.concurrently {
streams.parEvalMapUnorderedUnbounded { x =>
x.evalMap(x => sig.set(Some(x))).compile.drain
}
}
}

case object Data
implicit lazy val data: Type[IO, Data.type] = builder[IO, Data.type] { b =>
b.tpe(
"Data",
"value" -> b(_.streamMap(_ => (fs2.Stream(1) ++ fs2.Stream(2)))),
"value2" -> b(_.evalMap(_ => IO(1)))
"value2" -> b(_.evalMap(_ => IO(1))),
"concurrent1" -> b(
_.streamMap(_ =>
fs2
.Stream(0)
.covary[IO]
.repeatN(10)
.meteredStartImmediately(100.millis)
.switchMap(_ => fs2.Stream(0).covary[IO].repeatN(10).meteredStartImmediately(100.millis))
)
),
"concurrent2" -> b(
_.streamMap(_ =>
fs2
.Stream(0)
.covary[IO]
.repeatN(10)
.meteredStartImmediately(100.millis)
.map(_ => fs2.Stream(0).covary[IO].repeatN(10).meteredStartImmediately(100.millis))
.through(parListen)
.as(0)
)
)
)
}

Expand Down Expand Up @@ -102,4 +161,44 @@ class PerformanceTest extends CatsEffectSuite {
case _ => ???
}
}

test("performance for concurrent streams") {
Compiler[IO]
.compile(
schema,
"""
subscription {
data {
concurrent1
}
}
"""
)
.toOption
.get match {
case Application.Subscription(run) =>
run.take(10).compile.drain.timed.map { case (dur, _) => fail(dur.toMillis.toString() + " ms"): Unit }
case _ => ???
}
}

test("performance for concurrent streams 2") {
Compiler[IO]
.compile(
schema,
"""
subscription {
data {
concurrent2
}
}
"""
)
.toOption
.get match {
case Application.Subscription(run) =>
run.take(10).compile.drain.timed.map { case (dur, _) => fail(dur.toMillis.toString() + " ms"): Unit }
case _ => ???
}
}
}

0 comments on commit dcfc570

Please sign in to comment.