Skip to content

Commit

Permalink
fix: lazy resource leasing (Hotswap bug)
Browse files Browse the repository at this point in the history
  • Loading branch information
ValdemarGr committed Oct 12, 2023
1 parent 3b06382 commit d9e52b3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import cats.implicits._
import skunk.util.Typer
import cats.effect.std.UUIDGen
import munit.AnyFixture
import gql.relational.skunk.dsl._
import skunk.data.TransactionStatus

class SkunkRelationalSuite extends RelationalSuiteTables(SkunkIntegration) {
def intDecoder: Decoder[Int] = int4
Expand All @@ -46,25 +48,37 @@ class SkunkRelationalSuite extends RelationalSuiteTables(SkunkIntegration) {

def postgres = connect("postgres")

Resource.eval(UUIDGen.randomString[IO]).flatMap { dbid =>
val res = Resource.eval(UUIDGen.randomString[IO]).flatMap { dbid =>
Resource
.make(postgres.use(_.execute(sql"""create database "#$dbid"""".command)))(_ =>
postgres.use(_.execute(sql"""drop database "#$dbid"""".command).void)
)
.as(connect(dbid))
}

res >>= lazyPool[IO]
}
)

override def munitFixtures: Seq[AnyFixture[_]] = Seq(connF) ++ super.munitFixtures

lazy val conn = connF.apply()

lazy val transactionalConn = conn.get.flatTap(_.transaction)

test("setup") {
conn.use { ses =>
transactionalConn.use { ses =>
(ddlQueries ++ dataQueries).traverse_(x => ses.execute(sql"#$x".command))
}
}

tests(conn)
tests(transactionalConn)

test("transaction should still be going") {
transactionalConn.use { ses =>
ses.transactionStatus.get.map { status =>
assertEquals(status, TransactionStatus.Active)
}
}
}
}
80 changes: 66 additions & 14 deletions modules/relational/src/main/scala/gql/relational/LazyResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ package gql.relational

import cats._
import cats.implicits._
import cats.effect.std.Hotswap
import cats.effect.std.Mutex
import cats.effect._
import cats.effect.std._

trait LazyResource[F[_], A] { self =>
def get: Resource[F, A]
Expand All @@ -34,21 +33,74 @@ trait LazyResource[F[_], A] { self =>
}

object LazyResource {
def fromResource[F[_], A](res: Resource[F, A])(implicit F: Concurrent[F]): Resource[F, LazyResource[F, A]] =
Hotswap.create[F, A].evalMap { hs =>
Mutex[F].map { mtx =>
new LazyResource[F, A] {
override def forceClose: F[Unit] = hs.clear

override def get: Resource[F, A] =
mtx.lock >>
hs.get.evalMap {
case None => hs.swap(res)
case Some(ses) => F.pure(ses)
}
def fromResource[F[_], A](res: Resource[F, A])(implicit F: Concurrent[F]): Resource[F, LazyResource[F, A]] = {
val leases = Long.MaxValue

case class State(
value: A,
users: Semaphore[F],
clean: F[Unit]
)

Supervisor[F](await = true).flatMap { sup =>
Resource.eval(Mutex[F]).flatMap { mtx =>
def excl(s: State) = Resource.make(s.users.acquireN(leases))(_ => s.users.releaseN(leases))

def forceClose0(state: Ref[F, Option[State]]) = {
type Ret = F[Either[Throwable, Unit]]
val noop: (Option[State], Ret) = (None, F.pure(Right(())))
mtx.lock.surround {
F.uncancelable { poll =>
state
.modify[Ret] {
// Nothing in the state, cool it is a noop!
case None => noop
// We found something, let's await that there are no more users
case Some(s) =>
// Since we remove the reference immidiately we must ensure that the cleanup is done
(None, sup.supervise(excl(s).surround(s.clean)).flatMap(fib => poll(fib.joinWithNever)).attempt)
}
.flatten
.rethrow
}
}
}

val stateR =
Resource.make(F.ref[Option[State]](None))(_.get.flatMap(_.traverse_(_.clean)))

stateR.map { state =>
new LazyResource[F, A] {
override def forceClose: F[Unit] = forceClose0(state)

override def get: Resource[F, A] =
mtx.lock >>
Resource
.eval {
state
.modify[F[State]] {
case Some(s) => (Some(s), F.pure(s))
case None =>
val newStateF = F.uncancelable { poll =>
poll(res.allocated).flatMap { case (value, clean) =>
Semaphore[F](leases).flatMap { users =>
val s = State(value, users, clean)
state.set(Some(s)) as s
}
}
}
(None, newStateF)
}
.flatten
}
.flatTap(_.users.permit)
.map(_.value)

}
}
}
}
}

implicit def functorForLazyResource[F[_]]: Functor[LazyResource[F, *]] = new Functor[LazyResource[F, *]] {
override def map[A, B](fa: LazyResource[F, A])(f: A => B): LazyResource[F, B] = new LazyResource[F, B] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object BatchAccumulator {
def groupBatches(
plan: OptimizedDAG
): Chain[(Step.BatchKey[?, ?], NonEmptyChain[UniqueBatchInstance[?, ?]])] = Chain.fromSeq {
plan.plan.values.toList.map{ case (bs, _) => bs }.distinct.mapFilter { bs =>
plan.plan.values.toList.map { case (bs, _) => bs }.distinct.mapFilter { bs =>
plan.tree.lookup(bs.head).batchId.map(_.batcherId).map { bk =>
val vs = NonEmptyChain
.fromChainUnsafe(Chain.fromIterableOnce(bs))
Expand Down

0 comments on commit d9e52b3

Please sign in to comment.