Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 6, 2024
1 parent 752e817 commit 08f952a
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 68 deletions.
238 changes: 170 additions & 68 deletions modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,33 @@ object NewDesign {
eval: B => F[PlanEval[F, A, B]]
)

type Effect[F[_], A] = Pull[F, Nothing, (Chain[(JsonPath, Json)], Hierarchy[Stream[F, A]])]
type Collect[F[_]] = PlanEval[Effect[F, *], NodeInfo, QueryPlan]
type Effect[F[_], A] = Pull[F, Nothing, (Json, List[Fiber[F, Throwable, Unit]])]
type Collect[F[_]] = PlanEval[Effect[F, *], (F[Unit], NodeInfo), QueryPlan]
type PlanEvalEffect[F[_]] = Effect[F, Collect[F]]

final case class Hierarchy[A](children: List[(A, Hierarchy[A])])

trait Node[F[_], A] {
def eval(a: A, plan: QueryPlan, lease: Resource[F, Unit]): PlanEvalEffect[F]
}

def evalStreamNode[A](
stream: Stream[IO, A],
child: Node[IO, A],
plan: QueryPlan,
nodeInfo: NodeInfo,
parentLease: Resource[IO, Unit]
): PlanEvalEffect[IO] =
stream.pull.uncons1.flatMap {
case None => Pull.eval(IO.never)
case Some((hd, updates)) =>
val tl = updates.map(a => PlanEval(nodeInfo, child.eval(a, _, parentLease)))
UnsafeFs2Access.leaseScope[IO].flatMap { thislease =>
child.eval(hd, plan, thislease).map { case (jsons, childUpdates) =>
jsons -> Hierarchy(List(tl -> childUpdates))
}
}
}
// trait Node[F[_], A] {
// def eval(a: A, plan: QueryPlan, lease: Resource[F, Unit]): PlanEvalEffect[F]
// }

// def evalStreamNode[A](
// stream: Stream[IO, A],
// child: Node[IO, A],
// plan: QueryPlan,
// nodeInfo: NodeInfo,
// parentLease: Resource[IO, Unit]
// ): PlanEvalEffect[IO] =
// stream.pull.uncons1.flatMap {
// case None => Pull.eval(IO.never)
// case Some((hd, updates)) =>
// val tl = updates.map(a => PlanEval(nodeInfo, child.eval(a, _, parentLease)))
// UnsafeFs2Access.leaseScope[IO].flatMap { thislease =>
// child.eval(hd, plan, thislease).map { case (jsons, childUpdates) =>
// jsons -> ??? // Hierarchy(List(tl -> childUpdates))
// }
// }
// }

final case class StateEntry[F[_]](
// id of the node in the tree
Expand All @@ -143,57 +143,142 @@ object NewDesign {
consumed: F[State[F]]
)

def registerHierarchy(
// def registerHierarchy(
// path: Set[Unique.Token],
// h: Hierarchy[Stream[IO, Collect[IO]]],
// state: Ref[IO, State[IO]]
// ): IO[List[Fiber[IO, Throwable, Unit]]] = {
// h.children.map { case (col, child) =>
// IO.unique.map { tok =>
// val newPath = path + tok
// registerHierarchy(newPath, child, state).flatMap { children =>
// val cancelChildren: IO[Unit] = children.parTraverse_(_.cancel)

// val publishThisNode: IO[Unit] = {
// def rec(s: Stream[IO, Collect[IO]]): Pull[IO, Nothing, Unit] =
// s.pull.uncons1.flatMap {
// case None => Pull.done
// case Some((hd, tl)) =>
// UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource =>
// // keep trying to submit hd
// // every attempt we reserve the resource
// def tryUpdate(poll: IO ~> IO): IO[Unit] =
// poll(leaseResource.allocated).flatMap { case (_, release) =>
// state.modify { s =>
// s.values match {
// // If we are not consuming, we await next and try again
// case None =>
// (s, release >> poll((s.consumed >> tryUpdate(poll))))
// // If we are consuming, publish the value and await next consumption
// case Some(xs) =>
// (
// s.copy(values = Some(StateEntry(tok, path, hd, release, leaseResource) :: xs)),
// poll(s.consumed.void)
// )
// }
// }.flatten
// }

// Pull.eval(IO.uncancelable(tryUpdate)) >> rec(tl)
// }
// }

// rec(col).stream.compile.drain
// }
// ???
// }

// registerHierarchy(
// newPath,
// child,
// state
// ).map { children =>
// val killAll = children.parTraverse(_.cancel)
// }
// }
// }
// ???
// }

final case class Context[F[_]](
path: Set[Unique.Token],
h: Hierarchy[Stream[IO, Collect[IO]]],
state: Ref[IO, State[IO]]
): IO[List[Fiber[IO, Throwable, Unit]]] = {
h.children.map { case (col, child) =>
IO.unique.map { tok =>
val publishThisNode: IO[Unit] = {
def rec(s: Stream[IO, Collect[IO]]): Pull[IO, Nothing, Unit] =
s.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource =>
// keep trying to submit hd
// every attempt we reserve the resource
def tryUpdate(poll: IO ~> IO): IO[Unit] =
poll(leaseResource.allocated).flatMap { case (_, release) =>
state.modify { s =>
s.values match {
// If we are not consuming, we await next and try again
case None =>
(s, release >> poll((s.consumed >> tryUpdate(poll))))
// If we are consuming, publish the value and await next consumption
case Some(xs) =>
(
s.copy(values = Some(StateEntry(tok, path, hd, release, leaseResource) :: xs)),
poll(s.consumed.void)
)
}
}.flatten
}

Pull.eval(IO.uncancelable(tryUpdate)) >> rec(tl)
}
}
state: Ref[F, State[F]],
parentLease: Resource[F, Unit],
plan: QueryPlan
) {
def addPath(tok: Unique.Token): Context[F] = copy(path = path + tok)

def setLease(lease: Resource[F, Unit]): Context[F] = copy(parentLease = lease)

rec(col).stream.compile.drain
}
def setPlan(qp: QueryPlan): Context[F] = copy(plan = qp)
}

val newPath = path + tok
trait Cont[F[_], A] {
def eval(a: A, ctx: Context[F]): PlanEvalEffect[F]
}

registerHierarchy(
newPath,
child,
state
).map { children =>
val killAll = children.parTraverse(_.cancel)
}
def visit[A](
stream: Stream[IO, A],
child: Cont[IO, A],
nodeInfo: NodeInfo,
ctx: Context[IO]
) = {
Pull.eval(IO.unique).flatMap { tok =>
val withCtx = ctx.addPath(tok)
stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource =>
val withLease = withCtx.setLease(leaseResource)
// on head pull we evaluate
child.eval(hd, withLease).flatMap { case (result, fibs) =>
val cancelAll: IO[Unit] = fibs.parTraverse_(_.cancel)

// on subsequent pulls we submit to main queue
def repeatUncons(
stream: Stream[IO, A],
interruptPrevious: IO[Unit]
): Pull[IO, Nothing, Unit] =
stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
Pull.eval(interruptPrevious) >>
Pull.eval(IO.deferred[Unit]).flatMap { killThis =>
val c: Collect[IO] = PlanEval[Effect[IO, *], (IO[Unit], NodeInfo), QueryPlan](
plan = killThis.get -> nodeInfo,
eval = qp => child.eval(hd, withLease.setPlan(qp))
)

def tryUpdate(poll: IO ~> IO): IO[Unit] =
poll(leaseResource.allocated).flatMap { case (_, release) =>
ctx.state.modify { s =>
s.values match {
// If we are not consuming, we await next and try again
case None =>
(s, release >> poll((s.consumed >> tryUpdate(poll))))
// If we are consuming, publish the value and await next consumption
case Some(xs) =>
(
s.copy(values = Some(StateEntry(tok, ctx.path, c, release, leaseResource) :: xs)),
poll(s.consumed.void)
)
}
}.flatten
}

val upd: IO[Unit] = IO.uncancelable(tryUpdate)

Pull.eval(upd) >> repeatUncons(tl, killThis.complete(()).void)
}
}

Pull.eval(repeatUncons(tl, cancelAll).stream.compile.drain.start).map { fib =>
result -> List(fib)
}
}
}
}
}
???
}

// pulle side:
Expand Down Expand Up @@ -491,5 +576,22 @@ object NewDesign {
.interruptWhen(IO.sleep(500.millis) >> IO.println("cancelling").as(().asRight[Throwable])) ++
Stream.eval(IO.println("whee")).repeatN(2).meteredStartImmediately(1.second)
).compile.drain

Stream
.emits((0 to 10))
.covary[IO]
.flatTap(i => Stream.bracket(IO.println(s"open $i"))(_ => IO.println(s"close $i")))
.interruptWhen(IO.never[Unit].attempt)
.pull
.uncons1
.flatMap {
case None => Pull.pure(None)
case Some((hd, tl)) => Pull.pure(Some(hd -> tl))
}
.flatMap(xs => Pull.output(Chunk.fromOption(xs)))
.stream
.evalMap(x => IO.println(s"using $x"))
.compile
.drain
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ object UnsafeFs2Access {

def leaseScope[F[_]: MonadThrow]: Pull[F, Nothing, Resource[F, Unit]] =
getScope[F].map(scope => Resource.make(scope.lease)(x => x.cancel.rethrow).void)

def interruptWhen[F[_]: MonadThrow, O](fa: F[Unit])(p: Pull[F, O, Unit]): Pull[F, O, Unit] =
Pull.interruptScope(Pull.scope(Pull.interruptWhen(fa.attempt) >> p))
}

0 comments on commit 08f952a

Please sign in to comment.