diff --git a/docs/server/schema/resolvers.md b/docs/server/schema/resolvers.md index 4f5217250..7f6ceb82b 100644 --- a/docs/server/schema/resolvers.md +++ b/docs/server/schema/resolvers.md @@ -385,6 +385,66 @@ That is, the interpreter cycles through the following two phases: * Interpret for the current values. * Await new values (and values that arrived during the previous step). +:::tip +Since gql is free to ignore updates when a stream is a signal, one should prefer `evalMap` on a `Resolver` instead of a stream if possible. +::: + +:::warning +For a given stream it must hold all child resources alive (maybe the child resources are also streams that may emit). +As such, for a given stream, gql must await a next element from the stream before releasing any currently held resources sub-tree. +This means that gql must be able to pull one element before closing the old one. +::: + +:::tip +If you have streams of updates where you are only interested in that something changed (`Stream[F, Unit]`) there may be room for significant optimization. +In `fs2` you can merge streams with combinators such as `parJoin`, but they have to assume that there may be resources to account for. +If you are discarding the output of the stream or you are absolutely sure that the output does not depend on a resource lifetime, +one can write more optimized versions functions for this purpose. + +
+Some examples of potentially more performant implementations + +In a crude benchmarks, these combinators may perform an order of magnitude faster than `parJoin` or `merge`. +```scala mdoc +import fs2.{Pipe, Stream} +import fs2.concurrent._ +def parListen[A]: Pipe[IO, Stream[IO, A], Unit] = + streams => + for { + d <- Stream.eval(IO.deferred[Either[Throwable, Unit]]) + c <- Stream.eval(IO.deferred[Unit]) + sigRef <- Stream.eval(SignallingRef[IO, Unit](())) + + bg = streams.flatMap { sub => + Stream.supervise { + sub + .evalMap(_ => sigRef.set(())) + .compile + .drain + .onError(e => d.complete(Left(e)).void) + .onCancel(c.complete(()).void) + }.void + } + + listenCancel = (c.get *> IO.canceled).as(Right(()): Either[Throwable, Unit]) + fg = sigRef.discrete.interruptWhen(d).interruptWhen(listenCancel) + + _ <- fg.concurrently(bg) + } yield () + +def parListenSignal[A]: Pipe[IO, Stream[IO, A], A] = + streams => + Stream.eval(SignallingRef.of[IO, Option[A]](None)).flatMap { sig => + sig.discrete.unNone.concurrently { + streams.parEvalMapUnorderedUnbounded { x => + x.evalMap(x => sig.set(Some(x))).compile.drain + } + } + } +``` +
+::: + Here is an example of some streams in action: ```scala mdoc import scala.concurrent.duration._ diff --git a/modules/server/src/test/scala/gql/PerformanceTest.scala b/modules/server/src/test/scala/gql/PerformanceTest.scala index b1ef1faa8..de4091451 100644 --- a/modules/server/src/test/scala/gql/PerformanceTest.scala +++ b/modules/server/src/test/scala/gql/PerformanceTest.scala @@ -15,20 +15,79 @@ */ package gql +import fs2.Stream import cats.implicits._ import munit.CatsEffectSuite import gql._ import gql.ast._ import gql.dsl.all._ import cats.effect._ +import scala.concurrent.duration._ +import fs2.concurrent.SignallingRef class PerformanceTest extends CatsEffectSuite { + def parListen[A]: fs2.Pipe[IO, fs2.Stream[IO, A], Unit] = + streams => + for { + d <- Stream.eval(IO.deferred[Either[Throwable, Unit]]) + c <- Stream.eval(IO.deferred[Unit]) + sigRef <- Stream.eval(SignallingRef[IO, Unit](())) + + bg = streams.flatMap { sub => + Stream.supervise { + sub + .evalMap(_ => sigRef.set(())) + .compile + .drain + .onError(e => d.complete(Left(e)).void) + .onCancel(c.complete(()).void) + }.void + } + + listenCancel = (c.get *> IO.canceled).as(Right(()): Either[Throwable, Unit]) + fg = sigRef.discrete.interruptWhen(d).interruptWhen(listenCancel) + + _ <- fg.concurrently(bg) + } yield () + + def parListenSignal[A]: fs2.Pipe[IO, fs2.Stream[IO, A], A] = + streams => + Stream.eval(SignallingRef.of[IO, Option[A]](None)).flatMap { sig => + sig.discrete.unNone.concurrently { + streams.parEvalMapUnorderedUnbounded { x => + x.evalMap(x => sig.set(Some(x))).compile.drain + } + } + } + case object Data implicit lazy val data: Type[IO, Data.type] = builder[IO, Data.type] { b => b.tpe( "Data", "value" -> b(_.streamMap(_ => (fs2.Stream(1) ++ fs2.Stream(2)))), - "value2" -> b(_.evalMap(_ => IO(1))) + "value2" -> b(_.evalMap(_ => IO(1))), + "concurrent1" -> b( + _.streamMap(_ => + fs2 + .Stream(0) + .covary[IO] + .repeatN(10) + .meteredStartImmediately(100.millis) + .switchMap(_ => fs2.Stream(0).covary[IO].repeatN(10).meteredStartImmediately(100.millis)) + ) + ), + "concurrent2" -> b( + _.streamMap(_ => + fs2 + .Stream(0) + .covary[IO] + .repeatN(10) + .meteredStartImmediately(100.millis) + .map(_ => fs2.Stream(0).covary[IO].repeatN(10).meteredStartImmediately(100.millis)) + .through(parListen) + .as(0) + ) + ) ) } @@ -102,4 +161,44 @@ class PerformanceTest extends CatsEffectSuite { case _ => ??? } } + + test("performance for concurrent streams") { + Compiler[IO] + .compile( + schema, + """ + subscription { + data { + concurrent1 + } + } + """ + ) + .toOption + .get match { + case Application.Subscription(run) => + run.take(10).compile.drain.timed.map { case (dur, _) => fail(dur.toMillis.toString() + " ms"): Unit } + case _ => ??? + } + } + + test("performance for concurrent streams 2") { + Compiler[IO] + .compile( + schema, + """ + subscription { + data { + concurrent2 + } + } + """ + ) + .toOption + .get match { + case Application.Subscription(run) => + run.take(10).compile.drain.timed.map { case (dur, _) => fail(dur.toMillis.toString() + " ms"): Unit } + case _ => ??? + } + } }