Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 7, 2024
1 parent 931d205 commit 7ff8c29
Showing 1 changed file with 9 additions and 228 deletions.
237 changes: 9 additions & 228 deletions modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,46 +85,18 @@ object NewDesign {

object Pass2 {
trait QueryPlan

trait JsonPath
trait Json

trait NodeInfo

def plan(nis: List[NodeInfo]): QueryPlan = ???

final case class PlanEval[F[_], A, B](
plan: A,
eval: B => F[PlanEval[F, A, B]]
)

type Effect[F[_], A] = Pull[F, Nothing, (Json, List[Fiber[F, Throwable, Unit]])]
type Collect[F[_]] = PlanEval[Effect[F, *], NodeInfo, QueryPlan]
type PlanEvalEffect[F[_]] = Effect[F, Collect[F]]

final case class Hierarchy[A](children: List[(A, Hierarchy[A])])

// trait Node[F[_], A] {
// def eval(a: A, plan: QueryPlan, lease: Resource[F, Unit]): PlanEvalEffect[F]
// }
type Effect[F[_], A] = Pull[F, Nothing, A]
type CollectValue[F[_]] = (Json, List[Fiber[F, Throwable, Unit]])

// def evalStreamNode[A](
// stream: Stream[IO, A],
// child: Node[IO, A],
// plan: QueryPlan,
// nodeInfo: NodeInfo,
// parentLease: Resource[IO, Unit]
// ): PlanEvalEffect[IO] =
// stream.pull.uncons1.flatMap {
// case None => Pull.eval(IO.never)
// case Some((hd, updates)) =>
// val tl = updates.map(a => PlanEval(nodeInfo, child.eval(a, _, parentLease)))
// UnsafeFs2Access.leaseScope[IO].flatMap { thislease =>
// child.eval(hd, plan, thislease).map { case (jsons, childUpdates) =>
// jsons -> ??? // Hierarchy(List(tl -> childUpdates))
// }
// }
// }
final case class Collect[F[_]](
nodeInfo: NodeInfo,
eval: QueryPlan => Effect[F, CollectValue[F]]
)

final case class StateEntry[F[_]](
// id of the node in the tree
Expand All @@ -143,63 +115,6 @@ object NewDesign {
consumed: F[State[F]]
)

// def registerHierarchy(
// path: Set[Unique.Token],
// h: Hierarchy[Stream[IO, Collect[IO]]],
// state: Ref[IO, State[IO]]
// ): IO[List[Fiber[IO, Throwable, Unit]]] = {
// h.children.map { case (col, child) =>
// IO.unique.map { tok =>
// val newPath = path + tok
// registerHierarchy(newPath, child, state).flatMap { children =>
// val cancelChildren: IO[Unit] = children.parTraverse_(_.cancel)

// val publishThisNode: IO[Unit] = {
// def rec(s: Stream[IO, Collect[IO]]): Pull[IO, Nothing, Unit] =
// s.pull.uncons1.flatMap {
// case None => Pull.done
// case Some((hd, tl)) =>
// UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource =>
// // keep trying to submit hd
// // every attempt we reserve the resource
// def tryUpdate(poll: IO ~> IO): IO[Unit] =
// poll(leaseResource.allocated).flatMap { case (_, release) =>
// state.modify { s =>
// s.values match {
// // If we are not consuming, we await next and try again
// case None =>
// (s, release >> poll((s.consumed >> tryUpdate(poll))))
// // If we are consuming, publish the value and await next consumption
// case Some(xs) =>
// (
// s.copy(values = Some(StateEntry(tok, path, hd, release, leaseResource) :: xs)),
// poll(s.consumed.void)
// )
// }
// }.flatten
// }

// Pull.eval(IO.uncancelable(tryUpdate)) >> rec(tl)
// }
// }

// rec(col).stream.compile.drain
// }
// ???
// }

// registerHierarchy(
// newPath,
// child,
// state
// ).map { children =>
// val killAll = children.parTraverse(_.cancel)
// }
// }
// }
// ???
// }

final case class Context[F[_]](
sup: effect.std.Supervisor[F],
path: Set[Unique.Token],
Expand All @@ -218,7 +133,7 @@ object NewDesign {
}

trait Cont[F[_], A] {
def eval(a: A, ctx: Context[F]): PlanEvalEffect[F]
def eval(a: A, ctx: Context[F]): Effect[F, CollectValue[F]]
}

def visit[A](
Expand Down Expand Up @@ -270,8 +185,8 @@ object NewDesign {
_ <- Pull.eval(interruptPrevious)
killThis <- Pull.eval(IO.deferred[Unit])
leaseResource <- resourcePull
c: Collect[IO] = PlanEval[Effect[IO, *], NodeInfo, QueryPlan](
plan = nodeInfo,
c: Collect[IO] = Collect(
nodeInfo = nodeInfo,
eval = qp => child.eval(hd, mkCtx(killThis.get, leaseResource).setPlan(qp))
)

Expand Down Expand Up @@ -302,140 +217,6 @@ object NewDesign {
}
}

// pulle side:
// 1. pull one
// 2. eval child
// 3. ensure that child is on this pull's lease
// 4. ensure that our tail emission stops child emission
//
// pull side:
// 1. pull one
// 2. compute roots (all nodes that don't have a parent in the emitted nodes)
// 3. plan query
// 4. eval next pull for result
// 5. do pull
// 6. parjoin the new resulting streams
//
// this strategy closes child streams on the pulle side, but could this occur on the pull side?
//

/*
1.
scope = root
stream = FlatMapOutput(
InScope(Bind(
Acquire(IO.println("open"), _ => IO.println("close"), false),
x => Output(x)
)),
_ => Bind(_ => IO.println("use"), Eval(Output(Chunk(()))))
)
runner = OuterRun
2. go then viewL then match
scope = root
stream = InScope(Bind(
Acquire(IO.println("open"), _ => IO.println("close"), false),
x => Output(x)
))
// FlatMapOutput implies the FlatMapR runner
runner = FlatMapR(
view=IdContP,
fun=_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))),
runner=OuterRunner
)
// since FlatMapOutput <: Action then getCont = IdContP
contP = IdContP
3. recurse then viewL then match then goInScope
scope = root -> child
// goInScope creates a Bind node over the pull of child and adds endScope as the continuation
stream = Bind(Bind(
Acquire(IO.println("open"), _ => IO.println("close"), false),
x => Output(x)
), r => endScope(child, r))
runner = ViewRunner(
view=IdContP,
runner=FlatMapR(
IdContP,
_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))),
runner=OuterRunner
)
)
4. recurse then viewL
viewL = FlatMapOutput(
Acquire(IO.println("open"), _ => IO.println("close"), false),
x => Output(x)
)
getCont = Bind(..., r => endScope(child, r))
scope = root -> child
runner = ViewRunner(
IdContP,
runner=FlatMapR(
IdContP,
_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))),
runner=OuterRunner
)
)
5. match
scope = root -> child
stream = Acquire(IO.println("open"), _ => IO.println("close"), false)
runner = FlatMapR(
view = Bind(..., r => endScope(child, r)),
fun=x => Output(x),
runner = ViewRunner(
IdContP,
runner=FlatMapR(
IdContP,
_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))),
runner=OuterRunner
)
)
)
6. recurse then viewL then match then> goAcquire (attach bracket to child scope)
scope = root -> child
stream = Succeeded(()) // since IdContP(Succeeded(())) = Succeeded(())
runner = FlatMapR(
view = (r => endScope(child, r)),
fun = x => Output(x),
runner = ViewRunner(
IdContP,
runner=FlatMapR(
IdContP,
_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))),
runner=OuterRunner
)
)
)
7. recurse then viewL then match then (runner: FlatMapR).done(child)
scope = root -> child
runner.done(child) = go(
child,
None,
F ~> F,
ViewRunner(
IdContP,
runner=FlatMapR(
IdContP,
_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))),
runner=OuterRunner
)
),
// FlatMapR's view(unit) where view=Bind(..., r => endScope(child, r))
SucceedScope(child.id)
)
8. recurse then viewL then goCloseScope(SucceedScope(child.id), IdContP)
we hit the `case Some(toClose) => ...` case such that child.close
and go(child.openAncestor, None, F ~> F, sameRunnerAsLast, unit)
I suppose that child should be closed now and "close" should appear in the console
However, the resource is alive when "use" is printed?
*/

Stream
.bracketWeak(IO.println("open"))(_ => IO.println("close"))
.scope
Expand Down

0 comments on commit 7ff8c29

Please sign in to comment.