From d9e52b314e1184396278f97c8466304fb615a058 Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Thu, 12 Oct 2023 21:00:19 +0200 Subject: [PATCH] fix: lazy resource leasing (Hotswap bug) --- .../gql/relational/SkunkRelationalSuite.scala | 20 ++++- .../scala/gql/relational/LazyResource.scala | 80 +++++++++++++++---- .../server/interpreter/BatchAccumulator.scala | 2 +- 3 files changed, 84 insertions(+), 18 deletions(-) diff --git a/modules/relational-skunk/src/test/scala/gql/relational/SkunkRelationalSuite.scala b/modules/relational-skunk/src/test/scala/gql/relational/SkunkRelationalSuite.scala index a35cd9fa8..9a2af3308 100644 --- a/modules/relational-skunk/src/test/scala/gql/relational/SkunkRelationalSuite.scala +++ b/modules/relational-skunk/src/test/scala/gql/relational/SkunkRelationalSuite.scala @@ -24,6 +24,8 @@ import cats.implicits._ import skunk.util.Typer import cats.effect.std.UUIDGen import munit.AnyFixture +import gql.relational.skunk.dsl._ +import skunk.data.TransactionStatus class SkunkRelationalSuite extends RelationalSuiteTables(SkunkIntegration) { def intDecoder: Decoder[Int] = int4 @@ -46,13 +48,15 @@ class SkunkRelationalSuite extends RelationalSuiteTables(SkunkIntegration) { def postgres = connect("postgres") - Resource.eval(UUIDGen.randomString[IO]).flatMap { dbid => + val res = Resource.eval(UUIDGen.randomString[IO]).flatMap { dbid => Resource .make(postgres.use(_.execute(sql"""create database "#$dbid"""".command)))(_ => postgres.use(_.execute(sql"""drop database "#$dbid"""".command).void) ) .as(connect(dbid)) } + + res >>= lazyPool[IO] } ) @@ -60,11 +64,21 @@ class SkunkRelationalSuite extends RelationalSuiteTables(SkunkIntegration) { lazy val conn = connF.apply() + lazy val transactionalConn = conn.get.flatTap(_.transaction) + test("setup") { - conn.use { ses => + transactionalConn.use { ses => (ddlQueries ++ dataQueries).traverse_(x => ses.execute(sql"#$x".command)) } } - tests(conn) + tests(transactionalConn) + + test("transaction should still be going") { + transactionalConn.use { ses => + ses.transactionStatus.get.map { status => + assertEquals(status, TransactionStatus.Active) + } + } + } } diff --git a/modules/relational/src/main/scala/gql/relational/LazyResource.scala b/modules/relational/src/main/scala/gql/relational/LazyResource.scala index a8dbc6a53..03dba0c27 100644 --- a/modules/relational/src/main/scala/gql/relational/LazyResource.scala +++ b/modules/relational/src/main/scala/gql/relational/LazyResource.scala @@ -17,9 +17,8 @@ package gql.relational import cats._ import cats.implicits._ -import cats.effect.std.Hotswap -import cats.effect.std.Mutex import cats.effect._ +import cats.effect.std._ trait LazyResource[F[_], A] { self => def get: Resource[F, A] @@ -34,21 +33,74 @@ trait LazyResource[F[_], A] { self => } object LazyResource { - def fromResource[F[_], A](res: Resource[F, A])(implicit F: Concurrent[F]): Resource[F, LazyResource[F, A]] = - Hotswap.create[F, A].evalMap { hs => - Mutex[F].map { mtx => - new LazyResource[F, A] { - override def forceClose: F[Unit] = hs.clear - - override def get: Resource[F, A] = - mtx.lock >> - hs.get.evalMap { - case None => hs.swap(res) - case Some(ses) => F.pure(ses) - } + def fromResource[F[_], A](res: Resource[F, A])(implicit F: Concurrent[F]): Resource[F, LazyResource[F, A]] = { + val leases = Long.MaxValue + + case class State( + value: A, + users: Semaphore[F], + clean: F[Unit] + ) + + Supervisor[F](await = true).flatMap { sup => + Resource.eval(Mutex[F]).flatMap { mtx => + def excl(s: State) = Resource.make(s.users.acquireN(leases))(_ => s.users.releaseN(leases)) + + def forceClose0(state: Ref[F, Option[State]]) = { + type Ret = F[Either[Throwable, Unit]] + val noop: (Option[State], Ret) = (None, F.pure(Right(()))) + mtx.lock.surround { + F.uncancelable { poll => + state + .modify[Ret] { + // Nothing in the state, cool it is a noop! + case None => noop + // We found something, let's await that there are no more users + case Some(s) => + // Since we remove the reference immidiately we must ensure that the cleanup is done + (None, sup.supervise(excl(s).surround(s.clean)).flatMap(fib => poll(fib.joinWithNever)).attempt) + } + .flatten + .rethrow + } + } + } + + val stateR = + Resource.make(F.ref[Option[State]](None))(_.get.flatMap(_.traverse_(_.clean))) + + stateR.map { state => + new LazyResource[F, A] { + override def forceClose: F[Unit] = forceClose0(state) + + override def get: Resource[F, A] = + mtx.lock >> + Resource + .eval { + state + .modify[F[State]] { + case Some(s) => (Some(s), F.pure(s)) + case None => + val newStateF = F.uncancelable { poll => + poll(res.allocated).flatMap { case (value, clean) => + Semaphore[F](leases).flatMap { users => + val s = State(value, users, clean) + state.set(Some(s)) as s + } + } + } + (None, newStateF) + } + .flatten + } + .flatTap(_.users.permit) + .map(_.value) + + } } } } + } implicit def functorForLazyResource[F[_]]: Functor[LazyResource[F, *]] = new Functor[LazyResource[F, *]] { override def map[A, B](fa: LazyResource[F, A])(f: A => B): LazyResource[F, B] = new LazyResource[F, B] { diff --git a/modules/server/src/main/scala/gql/server/interpreter/BatchAccumulator.scala b/modules/server/src/main/scala/gql/server/interpreter/BatchAccumulator.scala index d3eede6bd..620bf5546 100644 --- a/modules/server/src/main/scala/gql/server/interpreter/BatchAccumulator.scala +++ b/modules/server/src/main/scala/gql/server/interpreter/BatchAccumulator.scala @@ -43,7 +43,7 @@ object BatchAccumulator { def groupBatches( plan: OptimizedDAG ): Chain[(Step.BatchKey[?, ?], NonEmptyChain[UniqueBatchInstance[?, ?]])] = Chain.fromSeq { - plan.plan.values.toList.map{ case (bs, _) => bs }.distinct.mapFilter { bs => + plan.plan.values.toList.map { case (bs, _) => bs }.distinct.mapFilter { bs => plan.tree.lookup(bs.head).batchId.map(_.batcherId).map { bk => val vs = NonEmptyChain .fromChainUnsafe(Chain.fromIterableOnce(bs))