Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 9, 2024
1 parent 9081d0a commit fca85fa
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,31 @@ object SharedResource {

def make[F[_]](res: Resource[F, Unit])(implicit F: Async[F]): Resource[F, Resource[F, Option[Int]]] =
Resource.applyFull { poll =>
val id = UUID.randomUUID()
F.delay(println(s"opening $id")) >>
((poll(res.allocated), F.ref[State[F]](State.Open(1))).tupled).flatMap { case ((_, release0), ref) =>
F.deferred[Unit].map { d =>
val release = release0 *> d.complete(()).void
val open = ref.modify {
case State.Closed() => (State.Closed[F](), None)
case State.Open(n) =>
println(s"api open $n $id")
val n2 = n + 1
(State.Open(n2), Some(n2))
}

val close = ref.modify {
case State.Closed() => (State.Closed[F](), F.unit)
case State.Open(n) =>
println(s"api close $n $id")
val n2 = n - 1
if (n2 === 0) (State.Closed[F](), F.delay(println(s"releasing $id")) *> release *> F.delay(println(s"released $id")))
else (State.Open(n2), F.unit)
}.flatten
((poll(res.allocated), F.ref[State[F]](State.Open(1))).tupled).flatMap { case ((_, release0), ref) =>
F.deferred[Unit].map { d =>
val release = release0 *> d.complete(()).void
val open = ref.modify {
case State.Closed() => (State.Closed[F](), None)
case State.Open(n) =>
val n2 = n + 1
(State.Open(n2), Some(n2))
}

val api = Resource.make(open) {
case None => F.unit
case Some(_) => close
}
val close = ref.modify {
case State.Closed() => (State.Closed[F](), F.unit)
case State.Open(n) =>
val n2 = n - 1
if (n2 === 0) (State.Closed[F](), release)
else (State.Open(n2), F.unit)
}.flatten

(api, ((_: ExitCase) => close >> d.tryGet.flatMap(o => ref.get.map(s => println(s"outer resource exiting, state is $s, did we release? ${o.isDefined}")))))
val api = Resource.make(open) {
case None => F.unit
case Some(_) => close
}

(api, ((_: ExitCase) => close))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,10 @@ object StreamInterpreter {
takeOne: Boolean = false,
throttle: F ~> F = FunctionK.id[F]
): Stream[F, Result] = Stream.eval(EvalState.init[F]).flatMap { state =>
val id = UUID.randomUUID()
Stream.resource(
Supervisor[F].onFinalize(F.delay(println(s"cleaned up supervisor $id"))) <* Resource.onFinalize(F.delay(println(s"cleaning up supervisor $id")))
).flatMap { sup =>
println("starting")
Stream.resource(Supervisor[F]).flatMap { sup =>
def doRound: F[(Resource[F, List[EvalState.Entry[F, ?]]], F[Unit])] = {
debug("awaiting updates") >>
state.get.flatMap(_.ps.produced) >>
state.get.flatMap(_.ps.produced) >>
debug("updates arrived") >>
accumulate.traverse_(F.sleep(_)) >>
EvalState.ProduceConsume.make[F].flatMap { ps2 =>
Expand Down Expand Up @@ -124,7 +120,6 @@ object StreamInterpreter {
}.unNone

Stream.eval(interpreter.interpretAll(NonEmptyList.one(initial))).flatMap { res =>
println(s"init gave $res")
val jo: JsonObject = res.data.map { case (_, j) => j }.reduceLeft(_ deepMerge _).asObject.get

Stream.emit(Result(res.errors, jo)) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.circe.syntax._
import org.typelevel.scalaccompat.annotation._
import cats.effect.implicits._
import java.util.UUID
import cats.effect.kernel.Resource

class SubqueryInterpreter[F[_]](
state: Ref[F, EvalState[F]],
Expand Down Expand Up @@ -178,24 +179,19 @@ class SubqueryInterpreter[F[_]](
ps: PreparedStep[F, I0, O0],
cont: Prepared[F, O0],
en: EvalNode[F, I0]
): F[Json] = {
// println(s"interpretEffect ${en.value}")
): F[Json] =
goStep[I0, O0](ps, Continuation.Done(cont), en)
}

var n = 0
def subscribeStream[A](
stream: Stream[F, A],
cont: Continuation[F, A],
en: EvalNode[F, ?]
): F[Json] = {
// println(s"subscribeStream ${en.value}")
F.unique.flatMap { tok =>
def mkEn(
isKilled: F[Unit],
leaseThis: Resource[F, Option[Int]],
a: A
): EvalNode[F, A] =
): F[Json] = for {
tok <- F.unique
d <- F.deferred[Json]

_ <- sup.supervise {
def mkEn(isKilled: F[Unit], leaseThis: Resource[F, Option[Int]], a: A): EvalNode[F, A] =
en.addParentPath(tok).setInterruptContext(isKilled).setParentLease(leaseThis).setValue(a)

def tryUpdate(poll: Poll[F], lease: Resource[F, Option[Int]], c: EvalNode[F, A]): F[Unit] =
Expand All @@ -213,29 +209,36 @@ class SubqueryInterpreter[F[_]](
}
}.flatten

def repeatUncons(
stream: Stream[F, A]
// interruptPrevious: F[Unit]
): Pull[F, Nothing, Unit] =
stream.pull.uncons1.flatMap {
def go(a: A, handle: (EvalNode[F, A], Resource[F, Option[Int]]) => F[Unit]): Stream[F, F[Unit]] =
for {
killSig <- Stream.eval(F.deferred[Unit])
kill = killSig.complete(()).void
// leaseScope <- fs2.UnsafeFs2Access.leaseScope[F].flatMap(Pull.output1(_)).streamNoScope
// safeLease <- Stream.resource(SharedResource.make[F](leaseScope))
safeLease = Resource.pure[F, Option[Int]](Some(1))
en = mkEn(kill, safeLease, a)
_ <- Stream.eval(handle(en, safeLease))
} yield kill

def walk(xs: Stream[F, A], kill: F[Unit]): Stream[F, Unit] =
xs.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
val en = mkEn(F.unit, Resource.pure(Some(1)), hd)
Pull.eval(F.uncancelable(tryUpdate(_, Resource.pure(Some(1)), en))) >> repeatUncons(tl)
}

F.deferred[Json].flatMap { d =>
val sout = stream.pull.uncons1.flatMap {
case None => Pull.eval(F.never)
case Some((hd, tl)) =>
val en = mkEn(F.unit, Resource.pure(Some(1)), hd)
Pull.eval(goCont(cont, en).flatMap(d.complete(_))) >> repeatUncons(tl)
}

sup.supervise(sout.stream.compile.drain) *> d.get
}

// stream.head.compile.lastOrError.flatMap(x => goCont(cont, en.setValue(x)))
Pull.eval(kill) >>
go(hd, (en, sl) => F.uncancelable(tryUpdate(_, sl, en)).void).flatMap(walk(tl, _)).pull.echo
}.streamNoScope

val sout = stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
go(hd, (en, _) => goCont(cont, en).flatMap(d.complete(_)).void)
.flatMap(x => walk(tl, x).interruptWhen((en.interruptContext).attempt))
.pull
.echo
}.streamNoScope

sout.compile.drain
}
}
o <- d.get
} yield o
}
33 changes: 21 additions & 12 deletions modules/server/src/test/scala/gql/StreamingTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ final case class Level2(value: Int)
class StreamingTest extends CatsEffectSuite {
val level1UsersRef = IO.ref(0).unsafeRunSync()
def level1Users = level1UsersRef.get.unsafeRunSync()
val level1Resource = Resource.make(level1UsersRef.updateAndGet(_ + 1).map(x => println(s"raw open1 $x")))(_ => level1UsersRef.updateAndGet(_ - 1).map(x => println(s"raw close1 $x")))
val level1Resource = Resource.make(level1UsersRef.update(_ + 1))(_ => level1UsersRef.update(_ - 1))

val level2UsersRef = IO.ref(0).unsafeRunSync()
def level2Users = level2UsersRef.get.unsafeRunSync()
val level2Resource = Resource.make(level2UsersRef.updateAndGet(_ + 1).map(x => println(s"raw open2 $x")))(_ => level2UsersRef.updateAndGet(_ - 1).map(x => println(s"raw close2 $x")))
val level2Resource = Resource.make(level2UsersRef.update(_ + 1))(_ => level2UsersRef.update(_ - 1))

implicit lazy val level1: Type[IO, Level1] = builder[IO, Level1] { b =>
tpe[IO, Level1](
Expand Down Expand Up @@ -86,7 +86,7 @@ class StreamingTest extends CatsEffectSuite {
lazy val schema = Schema.simple(schemaShape).unsafeRunSync()

def query(q: String, variables: Map[String, Json] = Map.empty): Stream[IO, JsonObject] =
Compiler[IO].compile(schema, q, variables = variables, debug = gql.server.interpreter.DebugPrinter[IO](IO.println)) match {
Compiler[IO].compile(schema, q, variables = variables) match {
case Left(err) => Stream(err.asJsonObject)
case Right(Application.Subscription(s)) => s.map(_.asJsonObject)
case _ => ???
Expand Down Expand Up @@ -168,7 +168,7 @@ class StreamingTest extends CatsEffectSuite {
assertEquals(clue(level1Users), 0)
assertEquals(clue(level2Users), 0)
// Run test some times
(0 to 2000).toList.parTraverse { _ =>
(0 to 2000).toList.parTraverse_ { _ =>
// if inner re-emits, outer will remain the same
// if outer re-emits, inner will restart
val q = """
Expand All @@ -192,7 +192,7 @@ class StreamingTest extends CatsEffectSuite {
}
.compile
.drain
} >> IO {
} >> IO {
assertEquals(clue(level1Users), 0)
assertEquals(clue(level2Users), 0)
}
Expand Down Expand Up @@ -267,15 +267,24 @@ class StreamingTest extends CatsEffectSuite {

query(q).pull.uncons1
.flatMap {
case None => ???
case None => ???
case Some((_, _)) =>
Pull.eval {
IO {
// There should be one lease on both resources
assert(clue(level1Users) >= 1)
assert(clue(level2Users) >= 1)
}
// There should be one lease on both resources if we await
// When a new element arrives, that is, tl.head, then a lease on level2 and level1 should be present
val check = IO {
assert(clue(level1Users) >= 1)
assert(clue(level2Users) >= 1)
}

val tryComplete =
Stream
.repeatEval(check.attempt)
.meteredStartImmediately(100.millis)
.find(_.isRight)
.compile
.drain

Pull.eval(IO.race(tryComplete, IO.sleep(5.seconds) >> check)).void
}
.stream
.compile
Expand Down

0 comments on commit fca85fa

Please sign in to comment.