diff --git a/modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala b/modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala index 11678913..1c1a9ce9 100644 --- a/modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala +++ b/modules/server/src/main/scala/gql/server/interpreter/NewDesign.scala @@ -85,46 +85,18 @@ object NewDesign { object Pass2 { trait QueryPlan - - trait JsonPath trait Json - trait NodeInfo def plan(nis: List[NodeInfo]): QueryPlan = ??? - final case class PlanEval[F[_], A, B]( - plan: A, - eval: B => F[PlanEval[F, A, B]] - ) - - type Effect[F[_], A] = Pull[F, Nothing, (Json, List[Fiber[F, Throwable, Unit]])] - type Collect[F[_]] = PlanEval[Effect[F, *], NodeInfo, QueryPlan] - type PlanEvalEffect[F[_]] = Effect[F, Collect[F]] - - final case class Hierarchy[A](children: List[(A, Hierarchy[A])]) - - // trait Node[F[_], A] { - // def eval(a: A, plan: QueryPlan, lease: Resource[F, Unit]): PlanEvalEffect[F] - // } + type Effect[F[_], A] = Pull[F, Nothing, A] + type CollectValue[F[_]] = (Json, List[Fiber[F, Throwable, Unit]]) - // def evalStreamNode[A]( - // stream: Stream[IO, A], - // child: Node[IO, A], - // plan: QueryPlan, - // nodeInfo: NodeInfo, - // parentLease: Resource[IO, Unit] - // ): PlanEvalEffect[IO] = - // stream.pull.uncons1.flatMap { - // case None => Pull.eval(IO.never) - // case Some((hd, updates)) => - // val tl = updates.map(a => PlanEval(nodeInfo, child.eval(a, _, parentLease))) - // UnsafeFs2Access.leaseScope[IO].flatMap { thislease => - // child.eval(hd, plan, thislease).map { case (jsons, childUpdates) => - // jsons -> ??? // Hierarchy(List(tl -> childUpdates)) - // } - // } - // } + final case class Collect[F[_]]( + nodeInfo: NodeInfo, + eval: QueryPlan => Effect[F, CollectValue[F]] + ) final case class StateEntry[F[_]]( // id of the node in the tree @@ -143,63 +115,6 @@ object NewDesign { consumed: F[State[F]] ) - // def registerHierarchy( - // path: Set[Unique.Token], - // h: Hierarchy[Stream[IO, Collect[IO]]], - // state: Ref[IO, State[IO]] - // ): IO[List[Fiber[IO, Throwable, Unit]]] = { - // h.children.map { case (col, child) => - // IO.unique.map { tok => - // val newPath = path + tok - // registerHierarchy(newPath, child, state).flatMap { children => - // val cancelChildren: IO[Unit] = children.parTraverse_(_.cancel) - - // val publishThisNode: IO[Unit] = { - // def rec(s: Stream[IO, Collect[IO]]): Pull[IO, Nothing, Unit] = - // s.pull.uncons1.flatMap { - // case None => Pull.done - // case Some((hd, tl)) => - // UnsafeFs2Access.leaseScope[IO].flatMap { leaseResource => - // // keep trying to submit hd - // // every attempt we reserve the resource - // def tryUpdate(poll: IO ~> IO): IO[Unit] = - // poll(leaseResource.allocated).flatMap { case (_, release) => - // state.modify { s => - // s.values match { - // // If we are not consuming, we await next and try again - // case None => - // (s, release >> poll((s.consumed >> tryUpdate(poll)))) - // // If we are consuming, publish the value and await next consumption - // case Some(xs) => - // ( - // s.copy(values = Some(StateEntry(tok, path, hd, release, leaseResource) :: xs)), - // poll(s.consumed.void) - // ) - // } - // }.flatten - // } - - // Pull.eval(IO.uncancelable(tryUpdate)) >> rec(tl) - // } - // } - - // rec(col).stream.compile.drain - // } - // ??? - // } - - // registerHierarchy( - // newPath, - // child, - // state - // ).map { children => - // val killAll = children.parTraverse(_.cancel) - // } - // } - // } - // ??? - // } - final case class Context[F[_]]( sup: effect.std.Supervisor[F], path: Set[Unique.Token], @@ -218,7 +133,7 @@ object NewDesign { } trait Cont[F[_], A] { - def eval(a: A, ctx: Context[F]): PlanEvalEffect[F] + def eval(a: A, ctx: Context[F]): Effect[F, CollectValue[F]] } def visit[A]( @@ -270,8 +185,8 @@ object NewDesign { _ <- Pull.eval(interruptPrevious) killThis <- Pull.eval(IO.deferred[Unit]) leaseResource <- resourcePull - c: Collect[IO] = PlanEval[Effect[IO, *], NodeInfo, QueryPlan]( - plan = nodeInfo, + c: Collect[IO] = Collect( + nodeInfo = nodeInfo, eval = qp => child.eval(hd, mkCtx(killThis.get, leaseResource).setPlan(qp)) ) @@ -302,140 +217,6 @@ object NewDesign { } } - // pulle side: - // 1. pull one - // 2. eval child - // 3. ensure that child is on this pull's lease - // 4. ensure that our tail emission stops child emission - // - // pull side: - // 1. pull one - // 2. compute roots (all nodes that don't have a parent in the emitted nodes) - // 3. plan query - // 4. eval next pull for result - // 5. do pull - // 6. parjoin the new resulting streams - // - // this strategy closes child streams on the pulle side, but could this occur on the pull side? - // - - /* - 1. - scope = root - stream = FlatMapOutput( - InScope(Bind( - Acquire(IO.println("open"), _ => IO.println("close"), false), - x => Output(x) - )), - _ => Bind(_ => IO.println("use"), Eval(Output(Chunk(())))) - ) - runner = OuterRun - - 2. go then viewL then match - scope = root - stream = InScope(Bind( - Acquire(IO.println("open"), _ => IO.println("close"), false), - x => Output(x) - )) - // FlatMapOutput implies the FlatMapR runner - runner = FlatMapR( - view=IdContP, - fun=_ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))), - runner=OuterRunner - ) - // since FlatMapOutput <: Action then getCont = IdContP - contP = IdContP - - 3. recurse then viewL then match then goInScope - scope = root -> child - // goInScope creates a Bind node over the pull of child and adds endScope as the continuation - stream = Bind(Bind( - Acquire(IO.println("open"), _ => IO.println("close"), false), - x => Output(x) - ), r => endScope(child, r)) - runner = ViewRunner( - view=IdContP, - runner=FlatMapR( - IdContP, - _ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))), - runner=OuterRunner - ) - ) - - 4. recurse then viewL - viewL = FlatMapOutput( - Acquire(IO.println("open"), _ => IO.println("close"), false), - x => Output(x) - ) - getCont = Bind(..., r => endScope(child, r)) - scope = root -> child - runner = ViewRunner( - IdContP, - runner=FlatMapR( - IdContP, - _ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))), - runner=OuterRunner - ) - ) - - 5. match - scope = root -> child - stream = Acquire(IO.println("open"), _ => IO.println("close"), false) - runner = FlatMapR( - view = Bind(..., r => endScope(child, r)), - fun=x => Output(x), - runner = ViewRunner( - IdContP, - runner=FlatMapR( - IdContP, - _ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))), - runner=OuterRunner - ) - ) - ) - - 6. recurse then viewL then match then> goAcquire (attach bracket to child scope) - scope = root -> child - stream = Succeeded(()) // since IdContP(Succeeded(())) = Succeeded(()) - runner = FlatMapR( - view = (r => endScope(child, r)), - fun = x => Output(x), - runner = ViewRunner( - IdContP, - runner=FlatMapR( - IdContP, - _ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))), - runner=OuterRunner - ) - ) - ) - - 7. recurse then viewL then match then (runner: FlatMapR).done(child) - scope = root -> child - runner.done(child) = go( - child, - None, - F ~> F, - ViewRunner( - IdContP, - runner=FlatMapR( - IdContP, - _ => Bind(_ => IO.println("use"), Eval(Output(Singleton(())))), - runner=OuterRunner - ) - ), - // FlatMapR's view(unit) where view=Bind(..., r => endScope(child, r)) - SucceedScope(child.id) - ) - - 8. recurse then viewL then goCloseScope(SucceedScope(child.id), IdContP) - we hit the `case Some(toClose) => ...` case such that child.close - and go(child.openAncestor, None, F ~> F, sameRunnerAsLast, unit) - - I suppose that child should be closed now and "close" should appear in the console - However, the resource is alive when "use" is printed? - */ - Stream .bracketWeak(IO.println("open"))(_ => IO.println("close")) .scope