Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 7, 2024
1 parent 08f952a commit 9249dbd
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 65 deletions.
23 changes: 17 additions & 6 deletions modules/server/src/main/scala/gql/server/interpreter/Finally.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import cats.implicits._
import cats.effect._

trait Finally[F[_]] {
def rememberResource[A](fa: Resource[F, A]): F[Unit]
def rememberResource[A](fa: Resource[F, A]): F[Unique.Token]

def remember[A](fa: F[A]): F[Unit] =
def remember[A](fa: F[A]): F[Unique.Token] =
rememberResource(Resource.eval(fa))

def cancel(token: Unique.Token): F[Unit]

// def leased(fa: Resource[F, Unit]): F[SharedResource[F]]
}

object Finally {
def make[F[_]](implicit F: Concurrent[F]): Resource[F, Finally[F]] =
Resource.make(F.ref(List.empty[F[Unit]]))(_.get.flatMap(_.sequence_)).map { state =>
Resource.make(F.ref(Map.empty[Unique.Token, F[Unit]]))(_.get.flatMap(_.values.toList.sequence_)).map { state =>
new Finally[F] {
def rememberResource[A](fa: Resource[F, A]): F[Unit] =
F.uncancelable { poll =>
poll(fa.allocated).flatMap { case (_, release) => state.update(release :: _).void }
def rememberResource[A](fa: Resource[F, A]): F[Unique.Token] =
F.unique.flatTap { tok =>
F.uncancelable { poll =>
poll(fa.allocated).flatMap { case (_, release) => state.update(_ + ((tok, release))) }
}
}

def cancel(token: Unique.Token): F[Unit] =
F.uncancelable { _ =>
state.modify(m => (m - token, m.get(token))).flatMap(_.sequence_)
}
}
}
Expand Down
178 changes: 119 additions & 59 deletions modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import cats.data.Chain
import fs2.Chunk
import fs2.concurrent.Channel
import fs2.UnsafeFs2Access
import java.lang
import scala.collection.immutable

object NewDesign {
object Pass1 {
Expand Down Expand Up @@ -97,7 +99,7 @@ object NewDesign {
)

type Effect[F[_], A] = Pull[F, Nothing, (Json, List[Fiber[F, Throwable, Unit]])]
type Collect[F[_]] = PlanEval[Effect[F, *], (F[Unit], NodeInfo), QueryPlan]
type Collect[F[_]] = PlanEval[Effect[F, *], NodeInfo, QueryPlan]
type PlanEvalEffect[F[_]] = Effect[F, Collect[F]]

final case class Hierarchy[A](children: List[(A, Hierarchy[A])])
Expand Down Expand Up @@ -130,10 +132,8 @@ object NewDesign {
parents: Set[Unique.Token],
// the next evaluation informaiton
collect: Collect[F],
// release all resources currently allocated on this node
release: F[Unit],
// to allow children to lease this node
lease: Resource[F, Unit]
lease: Resource[F, Option[Int]]
)

final case class State[F[_]](
Expand Down Expand Up @@ -203,14 +203,17 @@ object NewDesign {
final case class Context[F[_]](
path: Set[Unique.Token],
state: Ref[F, State[F]],
parentLease: Resource[F, Unit],
plan: QueryPlan
parentLease: Resource[F, Option[Int]],
plan: QueryPlan,
interruptContext: F[Unit]
) {
def addPath(tok: Unique.Token): Context[F] = copy(path = path + tok)

def setLease(lease: Resource[F, Unit]): Context[F] = copy(parentLease = lease)
def setLease(lease: Resource[F, Option[Int]]): Context[F] = copy(parentLease = lease)

def setPlan(qp: QueryPlan): Context[F] = copy(plan = qp)

def setInterruptContext(interrupt: F[Unit]): Context[F] = copy(interruptContext = interrupt)
}

trait Cont[F[_], A] {
Expand All @@ -222,61 +225,77 @@ object NewDesign {
child: Cont[IO, A],
nodeInfo: NodeInfo,
ctx: Context[IO]
) = {
): Pull[IO, Nothing, (Json, List[Fiber[IO, Throwable, Unit]])] = {
Pull.eval(IO.unique).flatMap { tok =>
val withCtx = ctx.addPath(tok)
stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource =>
val withLease = withCtx.setLease(leaseResource)
// on head pull we evaluate
child.eval(hd, withLease).flatMap { case (result, fibs) =>
val cancelAll: IO[Unit] = fibs.parTraverse_(_.cancel)

// on subsequent pulls we submit to main queue
def repeatUncons(
stream: Stream[IO, A],
interruptPrevious: IO[Unit]
): Pull[IO, Nothing, Unit] =
stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
Pull.eval(interruptPrevious) >>
Pull.eval(IO.deferred[Unit]).flatMap { killThis =>
val c: Collect[IO] = PlanEval[Effect[IO, *], (IO[Unit], NodeInfo), QueryPlan](
plan = killThis.get -> nodeInfo,
eval = qp => child.eval(hd, withLease.setPlan(qp))
)

def tryUpdate(poll: IO ~> IO): IO[Unit] =
poll(leaseResource.allocated).flatMap { case (_, release) =>
ctx.state.modify { s =>
s.values match {
// If we are not consuming, we await next and try again
case None =>
(s, release >> poll((s.consumed >> tryUpdate(poll))))
// If we are consuming, publish the value and await next consumption
case Some(xs) =>
(
s.copy(values = Some(StateEntry(tok, ctx.path, c, release, leaseResource) :: xs)),
poll(s.consumed.void)
)
}
}.flatten
}

val upd: IO[Unit] = IO.uncancelable(tryUpdate)

Pull.eval(upd) >> repeatUncons(tl, killThis.complete(()).void)
}
}

Pull.eval(repeatUncons(tl, cancelAll).stream.compile.drain.start).map { fib =>
result -> List(fib)
}
def mkCtx(isKilled: IO[Unit], leaseThis: Resource[IO, Option[Int]]): Context[IO] =
ctx.addPath(tok).setInterruptContext(isKilled).setLease(leaseThis)

val leaseTree = UnsafeFs2Access.leaseScope[IO].map[Resource[IO, Resource[IO, Option[Int]]]] { r =>
ctx.parentLease.flatMap {
case Some(_) => SharedResource.make[IO](r)
case None =>
Resource.raiseError[IO, Resource[IO, Option[Int]], Throwable] {
new RuntimeException("impossible, no parent lease")
}
}
}

def resourcePull: Pull[IO, Nothing, Resource[IO, Option[Int]]] =
leaseTree.flatMap(r => Stream.resource(r).pull.uncons1.map(_.get._1))

def tryUpdate(lease: Resource[IO, Option[Int]], c: Collect[IO]): IO[Unit] =
ctx.state.modify { s =>
s.values match {
// If we are not consuming, we await next and try again
case None =>
(s, (s.consumed >> tryUpdate(lease, c)))
// If we are consuming, publish the value and await next consumption
case Some(xs) =>
(
s.copy(values = Some(StateEntry(tok, ctx.path, c, lease) :: xs)),
s.consumed.void
)
}
}.flatten

def repeatUncons(
stream: Stream[IO, A],
interruptPrevious: IO[Unit]
): Pull[IO, Nothing, Unit] =
stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
for {
_ <- Pull.eval(interruptPrevious)
killThis <- Pull.eval(IO.deferred[Unit])
leaseResource <- resourcePull
c: Collect[IO] = PlanEval[Effect[IO, *], NodeInfo, QueryPlan](
plan = nodeInfo,
eval = qp => child.eval(hd, mkCtx(killThis.get, leaseResource).setPlan(qp))
)

_ <- Pull.eval(tryUpdate(leaseResource, c))

_ <- repeatUncons(tl, killThis.complete(()).void)
} yield ()
}

stream.pull.uncons1.flatMap {
case None => Pull.eval(IO.never)
case Some((hd, tl)) =>
for {
killThis <- Pull.eval(IO.deferred[Unit])
leaseResource <- resourcePull
(result, fibs) <- child.eval(hd, mkCtx(killThis.get, leaseResource))
cancelAll = fibs.parTraverse_(_.cancel)
bigProg = repeatUncons(tl, cancelAll).stream
.interruptWhen(ctx.interruptContext.attempt)
.compile
.drain
.start
ensureCleanup = (ctx.interruptContext *> cancelAll).start
fiber <- Pull.eval(ensureCleanup *> bigProg)
} yield result -> List(fiber)
}
}
}
Expand Down Expand Up @@ -593,5 +612,46 @@ object NewDesign {
.evalMap(x => IO.println(s"using $x"))
.compile
.drain

{
def rec(x: Stream[IO, Int]): Pull[IO, Int, Unit] =
x.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) => Pull.eval(IO.println(s"using $hd")).flatMap(_ => rec(tl))
}
rec {
Stream
.emits((0 to 10))
.covary[IO]
.flatTap(i => Stream.bracket(IO.println(s"open $i"))(_ => IO.println(s"close $i")))
}.stream
.evalMap(x => IO.println(s"using $x"))
.compile
.drain
}

Stream
.emits((0 to 10))
.covary[IO]
.flatTap(i => Stream.bracket(IO.println(s"open $i"))(_ => IO.println(s"close $i")))
.pull
.uncons1
.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
val tlProg = Pull.eval(IO.println(s"using $hd")) >>
Stream.emits((0 to 10)).covary[IO].evalMap(i => IO.println(s"inner $i").as(i)).pull.echo >>
tl.pull.echo

Stream
.bracketWeak(IO.println(s"inner bracket open"))(_ => IO.println(s"inner bracket close"))
.pull
.uncons1
.flatMap(_ => tlProg)
}
.stream
.evalMap(x => IO.println(s"using $x"))
.compile
.drain
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package gql.server.interpreter

import cats.effect._
import cats.implicits._
import cats.effect.kernel.Resource.ExitCase

object SharedResource {
sealed trait State[F[_]]
object State {
final case class Closed[F[_]]() extends State[F]
final case class Open[F[_]](leases: Int) extends State[F]
}

def make[F[_]](res: Resource[F, Unit])(implicit F: Async[F]): Resource[F, Resource[F, Option[Int]]] =
Resource.applyFull { poll =>
poll((res.allocated, F.ref[State[F]](State.Open(1))).tupled).map { case ((_, release), ref) =>
val open = ref.modify {
case State.Closed() => (State.Closed[F](), None)
case State.Open(n) =>
val n2 = n + 1
(State.Open(n2), Some(n2))
}

val close = ref.modify {
case State.Closed() => (State.Closed[F](), F.unit)
case State.Open(n) =>
val n2 = n - 1
if (n2 === 0) (State.Closed[F](), release)
else (State.Open(n2), F.unit)
}.flatten

val api = Resource.make(open) {
case None => F.unit
case Some(_) => close
}

(api, ((_: ExitCase) => close))
}
}
}

0 comments on commit 9249dbd

Please sign in to comment.