Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 6, 2024
1 parent 66470a8 commit 1a39f00
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 15 deletions.
23 changes: 23 additions & 0 deletions modules/server/src/main/scala/gql/server/interpreter/Finally.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package gql.server.interpreter

import cats.implicits._
import cats.effect._

trait Finally[F[_]] {
def rememberResource[A](fa: Resource[F, A]): F[Unit]

def remember[A](fa: F[A]): F[Unit] =
rememberResource(Resource.eval(fa))
}

object Finally {
def make[F[_]](implicit F: Concurrent[F]): Resource[F, Finally[F]] =
Resource.make(F.ref(List.empty[F[Unit]]))(_.get.flatMap(_.sequence_)).map { state =>
new Finally[F] {
def rememberResource[A](fa: Resource[F, A]): F[Unit] =
F.uncancelable { poll =>
poll(fa.allocated).flatMap { case (_, release) => state.update(release :: _).void }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gql.server.interpreter

import cats._
import cats.implicits._
import cats.effect._
import fs2.Stream
Expand All @@ -10,6 +11,7 @@ import cats.effect
import cats.data.Chain
import fs2.Chunk
import fs2.concurrent.Channel
import fs2.UnsafeFs2Access

object NewDesign {
object Pass1 {
Expand Down Expand Up @@ -101,36 +103,87 @@ object NewDesign {
final case class Hierarchy[A](children: List[(A, Hierarchy[A])])

trait Node[F[_], A] {
def eval(a: A, plan: QueryPlan): PlanEvalEffect[F]
def eval(a: A, plan: QueryPlan, lease: Resource[F, Unit]): PlanEvalEffect[F]
}

def evalStreamNode[A](
stream: Stream[IO, A],
child: Node[IO, A],
plan: QueryPlan,
nodeInfo: NodeInfo
nodeInfo: NodeInfo,
parentLease: Resource[IO, Unit]
): PlanEvalEffect[IO] =
stream.pull.uncons1.flatMap {
case None => Pull.eval(IO.never)
case Some((hd, updates)) =>
val tl = updates.map(a => PlanEval(nodeInfo, child.eval(a, _)))
child.eval(hd, plan).map { case (jsons, childUpdates) =>
jsons -> Hierarchy(List(tl -> childUpdates))
val tl = updates.map(a => PlanEval(nodeInfo, child.eval(a, _, parentLease)))
UnsafeFs2Access.leaseScope[IO].flatMap { thislease =>
child.eval(hd, plan, thislease).map { case (jsons, childUpdates) =>
jsons -> Hierarchy(List(tl -> childUpdates))
}
}
}

/*
*
*/
final case class StateEntry[F[_]](
token: Unique.Token,
parent: Set[Unique.Token],
collect: Collect[F],
release: F[Unit]
)

final case class State[F[_]](
// None if we are currently not consuming
values: Option[List[StateEntry[F]]],
// The next state, if we cannot publish values, await next state
consumed: F[State[F]]
)

def registerHierarchy(
path: Set[Unique.Token],
h: Hierarchy[Stream[IO, Collect[IO]]],
chan: IO[Channel[IO, Collect[IO]]]
path: Set[Unique.Token],
h: Hierarchy[Stream[IO, Collect[IO]]],
state: Ref[IO, State[IO]]
): IO[List[Fiber[IO, Throwable, Unit]]] = {
h.children.map{ case (col, child) =>
IO.unique.map{ tok =>
h.children.map { case (col, child) =>
IO.unique.map { tok =>
val publishThisNode: IO[Unit] = {
def rec(s: Stream[IO, Collect[IO]]): Pull[IO, Nothing, Unit] =
s.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource =>
// keep trying to submit hd
// every attempt we reserve the resource
def tryUpdate(poll: IO ~> IO): IO[Unit] =
poll(leaseResource.allocated).flatMap { case (_, release) =>
state.modify { s =>
s.values match {
// If we are not consuming, we await next and try again
case None =>
(s, release >> poll((s.consumed >> tryUpdate(poll))))
// If we are consuming, publish the value and await next consumption
case Some(xs) =>
(
s.copy(values = Some(StateEntry(tok, path, hd, release) :: xs)),
poll(s.consumed.void)
)
}
}.flatten
}

Pull.eval(IO.uncancelable(tryUpdate)) >> rec(tl)
}
}

rec(col).stream.compile.drain
}

val newPath = path + tok
registerHierarchy(newPath, child, chan).map{ children =>

registerHierarchy(
newPath,
child,
state
).map { children =>
val killAll = children.parTraverse(_.cancel)
}
}
Expand Down Expand Up @@ -391,5 +444,23 @@ object NewDesign {
}
.compile
.drain

Stream
.bracket(IO.println("open"))(_ => IO.println("close"))
.repeatN(4)
.zipWithIndex
.map { case (_, i) => i }
.pull
.uncons1
.flatMap(Pull.output1(_))
.streamNoScope
.flatMap {
case None => Stream.empty
case Some((hd, tl)) =>
Stream.eval(IO.println(s"unconsed $hd")) >> tl
}
.evalMap { i => IO.println(s"use $i") }
.compile
.drain
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import scala.concurrent.duration.FiniteDuration
import gql.resolver._
import io.circe.syntax._
import org.typelevel.scalaccompat.annotation._

import cats.effect.implicits._

final case class StreamData[F[_], I](
cont: Continuation[F, I],
value: Either[Throwable, I]
Expand Down
13 changes: 13 additions & 0 deletions modules/server/src/main/scala/gql/server/interpreter/Unsafe.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package fs2

import fs2.internal.Scope
import cats.effect.Resource
import cats._
import cats.implicits._

object UnsafeFs2Access {
def getScope[F[_]]: Pull[F, Nothing, Scope[F]] = Pull.getScope[F]

def leaseScope[F[_]: MonadThrow]: Pull[F, Nothing, Resource[F, Unit]] =
getScope[F].map(scope => Resource.make(scope.lease)(x => x.cancel.rethrow).void)
}

0 comments on commit 1a39f00

Please sign in to comment.