Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 8, 2024
1 parent 07404a3 commit 9081d0a
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 175 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ lazy val sharedSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "3.5.4",
"org.typelevel" %% "cats-mtl" % "1.3.1",
"org.typelevel" %% "cats-core" % "2.9.0",
"org.typelevel" %% "cats-free" % "2.9.0",
"co.fs2" %% "fs2-core" % "3.7.0",
"co.fs2" %% "fs2-io" % "3.7.0",
"org.typelevel" %% "cats-core" % "2.12.0",
"org.typelevel" %% "cats-free" % "2.12.0",
"co.fs2" %% "fs2-core" % "3.11.0",
"co.fs2" %% "fs2-io" % "3.11.0",
"org.typelevel" %% "cats-parse" % "0.3.8",
"io.circe" %% "circe-core" % "0.14.6",
"io.circe" %% "circe-parser" % "0.14.6",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package gql.server.interpreter
import cats._
import cats.implicits._
import org.typelevel.paiges._
import cats.effect._
import gql.preparation._

trait DebugPrinter[F[_]] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,9 @@ object EvalState {
} yield ProduceConsume(consumedD.get, consumedD.complete(()).void, producedD.get, producedD.complete(()).void)
}

final case class EvalStateApi[F[_]](
ref: Ref[F, EvalState[F]],
ps: ProduceConsume[F]
)
def init[F[_]](implicit F: Async[F]): F[EvalStateApi[F]] =
def init[F[_]](implicit F: Async[F]): F[Ref[F, EvalState[F]]] =
for {
ps <- ProduceConsume.make[F]
ref <- F.ref(EvalState[F](Some(Nil), ps))
} yield EvalStateApi(ref, ps)
} yield ref
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package gql.server.interpreter

import cats.effect.implicits._
import cats.effect._
import cats.implicits._
import cats.effect.std._

object NaiveSupervisor {
sealed trait State[F[_]]
object State {
final case class Closed[F[_]]() extends State[F]
final case class Open[F[_]](runningFibers: Map[Unique.Token, Fiber[F, Throwable, ?]]) extends State[F]
}

def make[F[_]](implicit F: Async[F]) = {
Resource
.make {
F.ref[State[F]](State.Open[F](Map.empty))
} { state =>
state.get.flatMap {
case State.Closed() => F.raiseError(new RuntimeException("impossible"))
case State.Open(runningFibers) =>
runningFibers.values.toList.parTraverse_(_.cancel)
}
}
.map { state =>
new Supervisor[F] {
override def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] =
F.uncancelable { _ =>
def closed[B] = F.raiseError[B](new RuntimeException("supervisor closed"))
state.get.flatMap {
case State.Closed() => closed
case State.Open(_) =>
F.unique.flatMap { tok =>
fa.start.flatMap { fib =>
val synthetic = new Fiber[F, Throwable, A] {
override def cancel: F[Unit] =
fib.cancel.guarantee(state.update {
case State.Closed() => State.Closed()
case State.Open(runningFibers) => State.Open(runningFibers - tok)
})

override def join: F[Outcome[F, Throwable, A]] = fib.join
}

state
.modify {
case State.Closed() => (State.Closed(), fib.cancel *> closed)
case State.Open(runningFibers) =>
(State.Open(runningFibers + (tok -> synthetic)), F.unit)
}
.as(fib)
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ object QueryInterpreter {
def apply[F[_]](
schemaState: SchemaState[F],
ss: Ref[F, EvalState[F]],
throttle: F ~> F
throttle: F ~> F,
sup: Supervisor[F]
)(implicit stats: Statistics[F], planner: Planner[F], F: Async[F]) =
new QueryInterpreter[F] {
def interpretOne[A](input: Input[F, A], sgb: SubgraphBatches[F], errors: Ref[F, Chain[EvalFailure]]): F[Json] =
Supervisor[F].use { sup =>
val go = new SubqueryInterpreter(ss, sup, stats, throttle, errors, sgb)
go.goCont(input.continuation, input.data)
}
def interpretOne[A](input: Input[F, A], sgb: SubgraphBatches[F], errors: Ref[F, Chain[EvalFailure]]): F[Json] = {
val go = new SubqueryInterpreter(ss, sup, stats, throttle, errors, sgb)
go.goCont(input.continuation, input.data)
}

def interpretAll(inputs: NonEmptyList[Input[F, ?]]): F[Results] = {
/* We perform an alpha renaming for every input to ensure that every node is distinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gql.server.interpreter
import cats.effect._
import cats.implicits._
import cats.effect.kernel.Resource.ExitCase
import java.util.UUID

object SharedResource {
sealed trait State[F[_]]
Expand All @@ -13,28 +14,35 @@ object SharedResource {

def make[F[_]](res: Resource[F, Unit])(implicit F: Async[F]): Resource[F, Resource[F, Option[Int]]] =
Resource.applyFull { poll =>
poll((res.allocated, F.ref[State[F]](State.Open(1))).tupled).map { case ((_, release), ref) =>
val open = ref.modify {
case State.Closed() => (State.Closed[F](), None)
case State.Open(n) =>
val n2 = n + 1
(State.Open(n2), Some(n2))
}
val id = UUID.randomUUID()
F.delay(println(s"opening $id")) >>
((poll(res.allocated), F.ref[State[F]](State.Open(1))).tupled).flatMap { case ((_, release0), ref) =>
F.deferred[Unit].map { d =>
val release = release0 *> d.complete(()).void
val open = ref.modify {
case State.Closed() => (State.Closed[F](), None)
case State.Open(n) =>
println(s"api open $n $id")
val n2 = n + 1
(State.Open(n2), Some(n2))
}

val close = ref.modify {
case State.Closed() => (State.Closed[F](), F.unit)
case State.Open(n) =>
val n2 = n - 1
if (n2 === 0) (State.Closed[F](), release)
else (State.Open(n2), F.unit)
}.flatten
val close = ref.modify {
case State.Closed() => (State.Closed[F](), F.unit)
case State.Open(n) =>
println(s"api close $n $id")
val n2 = n - 1
if (n2 === 0) (State.Closed[F](), F.delay(println(s"releasing $id")) *> release *> F.delay(println(s"released $id")))
else (State.Open(n2), F.unit)
}.flatten

val api = Resource.make(open) {
case None => F.unit
case Some(_) => close
}
val api = Resource.make(open) {
case None => F.unit
case Some(_) => close
}

(api, ((_: ExitCase) => close))
}
(api, ((_: ExitCase) => close >> d.tryGet.flatMap(o => ref.get.map(s => println(s"outer resource exiting, state is $s, did we release? ${o.isDefined}")))))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import gql.preparation._
import fs2.Stream
import scala.concurrent.duration.FiniteDuration
import cats.arrow.FunctionK
import cats.effect.std.Supervisor
import java.util.UUID

/** The [[StreamInterpreter]] is resposible for:
* - Wireing together results for a query.
Expand Down Expand Up @@ -74,66 +76,73 @@ object StreamInterpreter {
takeOne: Boolean = false,
throttle: F ~> F = FunctionK.id[F]
): Stream[F, Result] = Stream.eval(EvalState.init[F]).flatMap { state =>
def doRound: F[(Resource[F, List[EvalState.Entry[F, ?]]], F[Unit])] = {
state.ps.produced >>
accumulate.traverse_(F.sleep(_)) >>
EvalState.ProduceConsume.make[F].flatMap { ps2 =>
state.ref.modify { s =>
val allValues = s.values.getOrElse(Nil)
val updatedKeys = allValues.map(_.token).toSet
val relevant = allValues.filter(_.a.parentLeases.intersect(updatedKeys).isEmpty)
val ts: Resource[F, List[EvalState.Entry[F, _]]] =
relevant.traverse(e => e.a.parentLease.map(_.as(e))).map(_.flatten)
(EvalState[F](None, ps2), (ts, s.ps.notifyConsumed))
val id = UUID.randomUUID()
Stream.resource(
Supervisor[F].onFinalize(F.delay(println(s"cleaned up supervisor $id"))) <* Resource.onFinalize(F.delay(println(s"cleaning up supervisor $id")))
).flatMap { sup =>
println("starting")
def doRound: F[(Resource[F, List[EvalState.Entry[F, ?]]], F[Unit])] = {
debug("awaiting updates") >>
state.get.flatMap(_.ps.produced) >>
debug("updates arrived") >>
accumulate.traverse_(F.sleep(_)) >>
EvalState.ProduceConsume.make[F].flatMap { ps2 =>
state.modify { s =>
val allValues = s.values.getOrElse(Nil)
val updatedKeys = allValues.map(_.token).toSet
val relevant = allValues.filter(x => (x.a.parentLeases - x.token).intersect(updatedKeys).isEmpty)
val ts: Resource[F, List[EvalState.Entry[F, _]]] =
relevant.traverse(e => e.a.parentLease.map(_.as(e))).map(_.flatten)

val reopen = state.update(_.copy(values = Some(Nil))) >> s.ps.notifyConsumed

(EvalState[F](None, ps2), (ts, reopen))
}
}
}
}
}

val interpreter = QueryInterpreter[F](schemaState, state.ref, throttle)
val interpreter = QueryInterpreter[F](schemaState, state, throttle, sup)

val initial = QueryInterpreter.Input.root(root, selection)
val initial = QueryInterpreter.Input.root(root, selection)

val changelog: Stream[F, (List[(Cursor, Json)], Chain[EvalFailure])] = Stream.repeatEval {
doRound.flatMap { case (values, notifyConsumed) =>
values.use { xs =>
val inputs = xs.map { case (entr: EvalState.Entry[F, a]) =>
QueryInterpreter.Input[F, a](entr.cont, entr.a)
}
val changelog: Stream[F, (List[(Cursor, Json)], Chain[EvalFailure])] = Stream.repeatEval {
doRound.flatMap { case (values, reopen) =>
values.use { xs =>
xs.toNel.traverse { nel =>
val inputs = nel.map { case (entr: EvalState.Entry[F, a]) =>
QueryInterpreter.Input[F, a](entr.cont, entr.a)
}

val evalled: F[(List[(Cursor, Json)], Chain[EvalFailure])] =
debug(s"interpreting for ${inputs.size} inputs") >>
inputs.toNel
.traverse(interpreter.interpretAll)
.map {
// Okay there were no inputs (toNel), just emit what we have
case None => (Nil, Chain.empty)
// Okay so an evaluation happened
case Some(res) => (res.data.toList, res.errors)
} <* debug("done interpreting")

evalled <* notifyConsumed
debug(s"interpreting for ${inputs.size} inputs") >>
interpreter
.interpretAll(inputs)
.map { res => (res.data.toList, res.errors) } <*
debug("done interpreting")
}
} <* reopen
}
}
}

Stream.eval(interpreter.interpretAll(NonEmptyList.one(initial))).flatMap { res =>
val jo: JsonObject = res.data.map { case (_, j) => j }.reduceLeft(_ deepMerge _).asObject.get

Stream.emit(Result(res.errors, jo)) ++
changelog
.evalMapAccumulate(jo) { case (prevJo, (jsons, errs)) =>
// Patch the previously emitted json data
val stitched = jsons.foldLeftM(prevJo) { case (accum, (pos, patch)) =>
Temporal[F]
.fromEither {
stitchInto(accum.asJson, patch, pos, Cursor.empty).value.value
}
.map(_.asObject.get)
}.unNone

Stream.eval(interpreter.interpretAll(NonEmptyList.one(initial))).flatMap { res =>
println(s"init gave $res")
val jo: JsonObject = res.data.map { case (_, j) => j }.reduceLeft(_ deepMerge _).asObject.get

Stream.emit(Result(res.errors, jo)) ++
changelog
.evalMapAccumulate(jo) { case (prevJo, (jsons, errs)) =>
// Patch the previously emitted json data
val stitched = jsons.foldLeftM(prevJo) { case (accum, (pos, patch)) =>
Temporal[F]
.fromEither {
stitchInto(accum.asJson, patch, pos, Cursor.empty).value.value
}
.map(_.asObject.get)
}

stitched.map(o => (o, Result(errs, o)))
}

stitched.map(o => (o, Result(errs, o)))
}
.map { case (_, x) => x }
.map { case (_, x) => x }
}
}
}
}
Expand Down
Loading

0 comments on commit 9081d0a

Please sign in to comment.