Skip to content

Commit

Permalink
fix resource leak
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Sep 9, 2024
1 parent cc7fd3c commit 9f23ef2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import gql.resolver._
import io.circe.syntax._
import org.typelevel.scalaccompat.annotation._
import cats.effect.implicits._
import java.util.UUID
import cats.effect.kernel.Resource

class SubqueryInterpreter[F[_]](
Expand Down Expand Up @@ -189,7 +190,7 @@ class SubqueryInterpreter[F[_]](
tok <- F.unique
d <- F.deferred[Json]

_ <- sup.supervise {
background = {
def mkEn(isKilled: F[Unit], leaseThis: Resource[F, Option[Int]], a: A): EvalNode[F, A] =
en.addParentPath(tok).setInterruptContext(isKilled).setParentLease(leaseThis).setValue(a)

Expand All @@ -208,32 +209,48 @@ class SubqueryInterpreter[F[_]](
}
}.flatten

def go(kill: F[Unit], a: A, handle: (EvalNode[F, A], Resource[F, Option[Int]]) => F[Unit]): Stream[F, F[Unit]] =
def go(a: A, handle: (EvalNode[F, A], Resource[F, Option[Int]]) => F[Unit]): Stream[F, F[Unit]] =
for {
killSig <- Stream.eval(F.deferred[Unit])
kill = killSig.complete(()).void
leaseScope <- fs2.UnsafeFs2Access.leaseScope[F].flatMap(Pull.output1(_)).streamNoScope
safeLease <- Stream.resource(SharedResource.make[F](leaseScope))
_ <- Stream.eval(handle(mkEn(kill, safeLease, a), safeLease))
} yield kill

val sout = stream.zipWithIndex
.evalMap[F, (Option[Deferred[F, Unit]], A)] {
case (a, 0) => (none[Deferred[F, Unit]], a).pure[F]
case (a, _) => F.deferred[Unit].map(d => (Some(d), a))
def walk(xs: Stream[F, A], kill: F[Unit]): Stream[F, Unit] =
xs.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
Pull.eval(kill) >>
go(hd, (en, sl) => F.uncancelable(tryUpdate(_, sl, en)).void).flatMap(walk(tl, _)).pull.echo
}.streamNoScope

}
.zipWithPrevious
.flatMap {
case (None, (_, a)) => go(F.unit, a, (en, _) => goCont(cont, en).flatMap(d.complete(_)).void)
case (Some((killPrev, _)), (killMe, a)) =>
val kp: F[Unit] = killPrev.traverse_(_.complete(()))
val km = killMe.traverse_(_.get)
Stream.eval(kp) >> go(km, a, (en, sl) => F.uncancelable(tryUpdate(_, sl, en)))
}
// we may only interrupt after first element
.interruptWhen(d.get >> en.interruptContext.attempt)
val sout = stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
go(hd, (en, _) => goCont(cont, en).flatMap(d.complete(_)).void)
.flatMap(x => walk(tl, x).interruptWhen((en.interruptContext).attempt))
.pull
.echo
}.streamNoScope

sout.compile.drain
sout
}

// _ <- sup.supervise(background.compile.drain)
// to fix stream resource safety
interruptStream <- F.deferred[Unit]
streamDone <- F.deferred[Unit]
_ <- F.uncancelable { _ =>
val killStream = interruptStream.complete(()).void
val markDone = streamDone.complete(()).void
val s = background.interruptWhen(interruptStream.get.attempt).compile.drain.guarantee(markDone)
sup
.supervise(F.never[Unit].guarantee(killStream *> streamDone.get))
.onError(_ => killStream.void) *> s.start.void
}

o <- d.get
} yield o
}
25 changes: 10 additions & 15 deletions modules/server/src/test/scala/gql/StreamingTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,16 @@ class StreamingTest extends CatsEffectSuite {
}
}

test("example".only) {
IO.uncancelable{ poll =>
IO.println("hey") *> poll(IO.canceled) *> IO.println("world") *> IO(assert(false))
}
}

test("resource leak") {
IO.ref(0).flatMap { ref =>
val res = Stream.resource(Resource.make(ref.update(_ + 1))(_ => ref.update(_ - 1)))
// test("resource leak") {
// IO.ref(0).flatMap { ref =>
// val res = Stream.resource(Resource.make(ref.update(_ + 1))(_ => ref.update(_ - 1)))

(0 to 100000).toList.parTraverse_ { _ =>
val fa = res.evalMap(_ => IO.sleep(10.millis)).take(10).compile.drain
IO.race(fa, IO.sleep(90.millis))
} >> ref.get.map(assertEquals(_, 0))
}
}
// (0 to 100000).toList.parTraverse_ { _ =>
// val fa = res.evalMap(_ => IO.sleep(10.millis)).take(10).compile.drain
// IO.race(fa, IO.sleep(90.millis))
// } >> ref.get.map(assertEquals(_, 0))
// }
// }

test("should stream out some nested elements") {
assertEquals(clue(level1Users), 0)
Expand Down Expand Up @@ -291,6 +285,7 @@ class StreamingTest extends CatsEffectSuite {
// There should be one lease on both resources if we await
// When a new element arrives, that is, tl.head, then a lease on level2 and level1 should be present
val check = IO {
println((level1Users, level2Users))
assert(clue(level1Users) >= 1)
assert(clue(level2Users) >= 1)
}
Expand Down

0 comments on commit 9f23ef2

Please sign in to comment.