Skip to content

Commit

Permalink
Fix #457 - Handle SQL initialization error in Akka (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
cchantep authored Jul 9, 2022
1 parent 702d86d commit 637743a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 11 deletions.
25 changes: 18 additions & 7 deletions akka/src/main/scala/anorm/AkkaStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package anorm

import java.sql.Connection

import scala.util.control.NonFatal

import scala.concurrent.{ Future, Promise }

import akka.stream.Materializer
Expand Down Expand Up @@ -114,13 +116,24 @@ object AkkaStream {

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val result = Promise[Int]()

val logic = new GraphStageLogic(shape) with OutHandler {
private var cursor: Option[Cursor] = None
private var counter: Int = 0

private def failWith(cause: Throwable): Unit = {
result.failure(cause)
fail(out, cause)
()
}

override def preStart(): Unit = {
resultSet = sql.unsafeResultSet(connection)
nextCursor()
try {
resultSet = sql.unsafeResultSet(connection)
nextCursor()
} catch {
case NonFatal(cause) => failWith(cause)
}
}

override def postStop() = release()
Expand Down Expand Up @@ -152,10 +165,8 @@ object AkkaStream {
nextCursor()
}

case Failure(cause) => {
result.failure(cause)
fail(out, cause)
}
case Failure(cause) =>
failWith(cause)
}

case _ => {
Expand All @@ -172,7 +183,7 @@ object AkkaStream {
setHandler(out, this)
}

(logic, result.future)
logic -> result.future
}
}

Expand Down
5 changes: 5 additions & 0 deletions akka/src/test/scala-2.13+/AkkaCompat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package anorm

private[anorm] object AkkaCompat {
type Seq[T] = _root_.scala.collection.immutable.Seq[T]
}
5 changes: 5 additions & 0 deletions akka/src/test/scala-2.13-/AkkaCompat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package anorm

private[anorm] object AkkaCompat {
type Seq[T] = _root_.scala.collection.Seq[T]
}
35 changes: 32 additions & 3 deletions akka/src/test/scala/anorm/AkkaStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import scala.concurrent.duration._

import akka.stream.scaladsl.{ Keep, Sink, Source }

import acolyte.jdbc.QueryResult
import acolyte.jdbc.AcolyteDSL.withQueryResult
import acolyte.jdbc.Implicits._
import acolyte.jdbc.RowLists.stringList
Expand All @@ -29,7 +30,9 @@ final class AkkaStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable
"Akka Stream" should {
"expose the query result as source" in assertAllStagesStopped {
withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con =>
AkkaStream.source(SQL"SELECT * FROM Test", SqlParser.scalar[String]).runWith(Sink.seq[String]) must beEqualTo(
AkkaStream
.source(SQL"SELECT * FROM Test", SqlParser.scalar[String])
.runWith(Sink.seq[String]) must beTypedEqualTo(
Seq("A", "B", "C")
).await(0, 5.seconds)
}
Expand All @@ -40,7 +43,7 @@ final class AkkaStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable
AkkaStream
.source(SQL"SELECT * FROM Test", SqlParser.scalar[String])
.toMat(Sink.ignore)(Keep.left)
.run() must beEqualTo(3).await(0, 3.seconds)
.run() must beTypedEqualTo(3).await(0, 3.seconds)
}
}

Expand Down Expand Up @@ -79,7 +82,33 @@ final class AkkaStreamSpec(implicit ee: ExecutionEnv) extends org.specs2.mutable
}
}

"on failure" in (withQueryResult(stringList :+ "A" :+ "B" :+ "C")) { implicit con =>
"on failed initialization" in {
import java.sql.SQLException

withQueryResult(QueryResult.Nil) { implicit con =>
val failingSql = new Sql {
import java.sql.PreparedStatement

def unsafeStatement(
connection: Connection,
generatedColumn: String,
generatedColumns: AkkaCompat.Seq[String]
): PreparedStatement = ???

def unsafeStatement(connection: Connection, getGeneratedKeys: Boolean): PreparedStatement =
throw new SQLException("Init failure")

def resultSetOnFirstRow: Boolean = ???
}

val graph = source(failingSql, SqlParser.scalar[String])
val mat = Source.fromGraph(graph).toMat(Sink.ignore)(Keep.left).run()

mat must throwA[SQLException]("Init failure").awaitFor(3.seconds)
}
}

"on failure" in withQueryResult(stringList :+ "A" :+ "B" :+ "C") { implicit con =>
assertAllStagesStopped {
val rSet = run(Sink.reduce[String] { (_, _) => sys.error("Foo") })

Expand Down
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,17 @@ lazy val `anorm-akka` = (project in file("akka"))
libraryDependencies ++= (acolyte +: specs2Test) ++ Seq(
"com.typesafe.akka" %% "akka-stream-contrib" % akkaContribVer.value % Test
),
scalacOptions += "-P:silencer:globalFilters=deprecated"
scalacOptions += "-P:silencer:globalFilters=deprecated",
Test / unmanagedSourceDirectories ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n < 13 =>
Seq((Test / sourceDirectory).value / "scala-2.13-")

case _ =>
Seq((Test / sourceDirectory).value / "scala-2.13+")

}
}
)
.dependsOn(`anorm-core`)

Expand Down

0 comments on commit 637743a

Please sign in to comment.