Skip to content

Commit

Permalink
#120: fixing tests, post-refactoring, and implementing agg and non-ag…
Browse files Browse the repository at this point in the history
…g function util class separately so that users can choose - also available in Slick and Doobie, covered all by tests
  • Loading branch information
lsulak committed Jun 22, 2024
1 parent bdf72cf commit c5c26b2
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 76 deletions.
32 changes: 30 additions & 2 deletions core/src/main/scala/za/co/absa/fadb/DBFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ object DBFunction {
abstract class DBMultipleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride)
with StatusAggregator {
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)
Expand All @@ -246,6 +245,35 @@ object DBFunction {
* @return - a sequence of values, each coming from a row returned from the DB function transformed to scala
* type `R` wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[FailedOrRow[R]]] = multipleResults(values)
}

/**
* `DBMultipleResultFunctionWithAggStatus` is an abstract class that represents a database function returning
* multiple results with status information.
* It extends the [[DBFunctionWithStatus]] class and overrides the apply method to return a sequence of results
*
* It's similar to `DBMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value.
* The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`.
*/
abstract class DBMultipleResultFunctionWithAggStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride)
with StatusAggregator {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)

// A constructor that allows specifying the function name as a string, but not as an option
def this(functionName: String)(implicit schema: DBSchema, dBEngine: E) = this(Some(functionName))

/**
* For easy and convenient execution of the DB function call
* @param values - the values to pass over to the database function
* @return - a sequence of values, each coming from a row returned from the DB function transformed to scala
* type `R` wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)
(implicit me: MonadError[F, Throwable]): F[FailedOrRows[R]] =
multipleResults(values).flatMap(data => me.pure(aggregate(data)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,33 @@ import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow, Row}

/**
* `StatusAggregator` is a base trait that defines the interface for aggregating the error statuses of a function
* invocation. It provides methods to aggregate the error statuses into a single status information - this is
* typically needed for database functions that retrieve multiple records.
* `StatusAggregator` is a base trait that defines the interface for aggregating the error statuses of a function
* invocation. It provides methods to aggregate the error statuses into a single status information - this is
* typically needed for database functions that retrieve multiple records.
*/
trait StatusAggregator {


/**
* Aggregates the error status information into a single error.
* Aggregates the error status information into a single error.
*
* @param statusesWithData - The status of the function invocation with data.
* @return Either a `StatusException` if the status code indicates an error, or the data (along with the status
* @param statusesWithData - The status of the function invocation with data.
* @return Either a `StatusException` if the status code indicates an error, or the data (along with the status
* information so that it's retrievable) if the status being returned doesn't indicate an error.
*/
def aggregate[R](statusesWithData: Seq[FailedOrRow[R]]): FailedOrRows[R]
}

object StatusAggregator {
private[aggregation] def gatherExceptions[R](
eithersWithException: Seq[FailedOrRow[R]]
): Seq[StatusException] = {

private[aggregation] def gatherExceptions[R](eithersWithException: Seq[FailedOrRow[R]]): Seq[StatusException] = {
eithersWithException.flatMap {
case Left(exception) => Some(exception)
case _ => None
case _ => None
}
}

private[aggregation] def gatherDataWithStatuses[R](
eithersWithData: Seq[FailedOrRow[R]]
): Seq[Row[R]] = {
private[aggregation] def gatherDataWithStatuses[R](eithersWithData: Seq[FailedOrRow[R]]): Seq[Row[R]] = {
eithersWithData.flatMap {
case Left(_) => None
case Left(_) => None
case Right(dataWithStatuses) => Some(dataWithStatuses)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@ package za.co.absa.fadb.status.aggregation

import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.fadb.exceptions._
import za.co.absa.fadb.status.{FailedOrRows, FailedOrRow, FunctionStatus, Row}
import za.co.absa.fadb.status.{FunctionStatus, Row}

class StatusAggregatorUnitTests extends AnyFunSuite {

private val aggregateByFirstRowUnderTest: StatusAggregator = new StatusAggregator {
override def aggregate[R](statusesWithData: Seq[FailedOrRow[R]]):
FailedOrRows[R] = Right(Seq.empty)
}

test("gatherExceptions should return empty Seq on empty input") {
val testData = Seq.empty
val expectedGatheredExceptions = Seq.empty

val actualGatheredExceptions = aggregateByFirstRowUnderTest.gatherExceptions(testData)
val actualGatheredExceptions = StatusAggregator.gatherExceptions(testData)
assert(actualGatheredExceptions == expectedGatheredExceptions)
}

Expand All @@ -48,15 +43,15 @@ class StatusAggregatorUnitTests extends AnyFunSuite {
ErrorInDataException(FunctionStatus(50, "Some data error"))
)

val actualGatheredExceptions = aggregateByFirstRowUnderTest.gatherExceptions(testData)
val actualGatheredExceptions = StatusAggregator.gatherExceptions(testData)
assert(actualGatheredExceptions == expectedGatheredExceptions)
}

test("gatherDataWithStatuses should return empty Seq on empty input") {
val testData = Seq.empty
val expectedGatheredExceptions = Seq.empty

val actualGatheredExceptions = aggregateByFirstRowUnderTest.gatherDataWithStatuses(testData)
val actualGatheredExceptions = StatusAggregator.gatherDataWithStatuses(testData)
assert(actualGatheredExceptions == expectedGatheredExceptions)
}

Expand All @@ -74,7 +69,7 @@ class StatusAggregatorUnitTests extends AnyFunSuite {
Row(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")),
)

val actualGatheredData = aggregateByFirstRowUnderTest.gatherDataWithStatuses(testData)
val actualGatheredData = StatusAggregator.gatherDataWithStatuses(testData)
assert(actualGatheredData == expectedGatheredData)
}

Expand Down
33 changes: 25 additions & 8 deletions doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,34 @@ object DoobieFunction {
with DoobieFunction[I, R, F]

/**
* `DoobieMultipleResultFunctionWithStatus` represents a db function that returns multiple results with statuses.
*/
* `DoobieMultipleResultFunctionWithStatus` represents a db function that returns multiple results with statuses.
*/
abstract class DoobieMultipleResultFunctionWithStatus[I, R, F[_]](
override val toFragmentsSeq: I => Seq[Fragment],
functionNameOverride: Option[String] = None
override val toFragmentsSeq: I => Seq[Fragment],
functionNameOverride: Option[String] = None
)(implicit
override val schema: DBSchema,
val dbEngine: DoobieEngine[F],
val readR: Read[R]
override val schema: DBSchema,
val dbEngine: DoobieEngine[F],
val readR: Read[R]
) extends DBMultipleResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride)
with DoobieFunctionWithStatus[I, R, F]
with DoobieFunctionWithStatus[I, R, F]

/**
* `DoobieMultipleResultFunctionWithAggStatus` represents a db function that returns multiple results with statuses.
*
* It's similar as `DoobieMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value.
*
* The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`.
*/
abstract class DoobieMultipleResultFunctionWithAggStatus[I, R, F[_]](
override val toFragmentsSeq: I => Seq[Fragment],
functionNameOverride: Option[String] = None
)(implicit
override val schema: DBSchema,
val dbEngine: DoobieEngine[F],
val readR: Read[R]
) extends DBMultipleResultFunctionWithAggStatus[I, R, DoobieEngine[F], F](functionNameOverride)
with DoobieFunctionWithStatus[I, R, F]

/**
* `DoobieOptionalResultFunction` represents a db function that returns an optional result.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.fadb.doobie

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.fadb.status.aggregation.implementations.ByMajorityErrorsStatusAggregator
import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.fadb.status.{FunctionStatus, Row}

class DoobieMultipleResultFunctionWithAggStatusIntegrationTests extends AnyFunSuite with DoobieTest {

private val getActorsByLastnameQueryFragments: GetActorsByLastnameQueryParameters => Seq[Fragment] = {
values => Seq(fr"${values.lastName}", fr"${values.firstName}")
}

class GetActorsByLastname(implicit schema: DBSchema, dbEngine: DoobieEngine[IO])
// Option[Actor] because: Actor might not exist, and the function would return only status info without actor data
extends DoobieMultipleResultFunctionWithAggStatus[GetActorsByLastnameQueryParameters, Option[Actor], IO](getActorsByLastnameQueryFragments)
with StandardStatusHandling
with ByMajorityErrorsStatusAggregator {
override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name")
}

private val getActorsByLastname = new GetActorsByLastname()(Integration, new DoobieEngine(transactor))

test("Retrieving multiple actors from database, lastName match") {
val expectedResultElem = Set(
Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(51, "Fred", "Weasley"))),
Row(FunctionStatus(11, "OK, match on last name only"), Some(Actor(52, "George", "Weasley"))),
)

val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).unsafeRunSync()
val actualData = results match {
case Left(_) => fail("should not be left")
case Right(dataWithStatuses) => dataWithStatuses
}
assert(actualData.length == expectedResultElem.size)
assert(actualData.toSet == expectedResultElem)
}

test("Retrieving single actor from database, full match") {
val expectedResultElem = Row(
FunctionStatus(12, "OK, full match"), Some(Actor(50, "Liza", "Simpson"))
)

val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson", Some("Liza"))).unsafeRunSync()
val actualData = results match {
case Left(_) => fail("should not be left")
case Right(dataWithStatuses) => dataWithStatuses
}

assert(actualData.length == 1)
assert(actualData.head == expectedResultElem)
}

test("Retrieving single actor from database, lastname match") {
val expectedResultElem = Row(
FunctionStatus(11, "OK, match on last name only"), Some(Actor(50, "Liza", "Simpson"))
)

val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson")).unsafeRunSync()
val actualData = results match {
case Left(_) => fail("should not be left")
case Right(dataWithStatuses) => dataWithStatuses
}

assert(actualData.length == 1)
assert(actualData.head == expectedResultElem)
}

test("Retrieving non-existing actor from database, no match") {
val results = getActorsByLastname(GetActorsByLastnameQueryParameters("TotallyNonExisting!")).unsafeRunSync()
results match {
case Left(err) =>
assert(err.status.statusText == "No actor found")
assert(err.status.statusCode == 41)
case Right(_) => fail("should not be right")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import doobie.implicits.toSqlInterpolator
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithStatus
import za.co.absa.fadb.status.aggregation.implementations.ByMajorityErrorsStatusAggregator
import za.co.absa.fadb.exceptions.{DataNotFoundException, StatusException}
import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.fadb.status.{FunctionStatus, Row}

Expand All @@ -34,10 +34,9 @@ class DoobieMultipleResultFunctionWithStatusIntegrationTests extends AnyFunSuite
}

class GetActorsByLastname(implicit schema: DBSchema, dbEngine: DoobieEngine[IO])
// Option[Actor] because: Actor might not exist, and the function would return only status info without actor data
extends DoobieMultipleResultFunctionWithStatus[GetActorsByLastnameQueryParameters, Option[Actor], IO](getActorsByLastnameQueryFragments)
with StandardStatusHandling
with ByMajorityErrorsStatusAggregator {
// Option[Actor] because: Actor might not exist, and the function would return only status info without actor data
extends DoobieMultipleResultFunctionWithStatus[GetActorsByLastnameQueryParameters, Option[Actor], IO](getActorsByLastnameQueryFragments)
with StandardStatusHandling {
override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name")
}

Expand All @@ -50,12 +49,7 @@ class DoobieMultipleResultFunctionWithStatusIntegrationTests extends AnyFunSuite
)

val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).unsafeRunSync()
val actualData = results match {
case Left(_) => fail("should not be left")
case Right(dataWithStatuses) => dataWithStatuses
}
assert(actualData.length == expectedResultElem.size)
assert(actualData.toSet == expectedResultElem)
assert(results.toSet == expectedResultElem)
}

test("Retrieving single actor from database, full match") {
Expand All @@ -64,13 +58,7 @@ class DoobieMultipleResultFunctionWithStatusIntegrationTests extends AnyFunSuite
)

val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson", Some("Liza"))).unsafeRunSync()
val actualData = results match {
case Left(_) => fail("should not be left")
case Right(dataWithStatuses) => dataWithStatuses
}

assert(actualData.length == 1)
assert(actualData.head == expectedResultElem)
assert(results.toSet == expectedResultElem)
}

test("Retrieving single actor from database, lastname match") {
Expand All @@ -79,22 +67,15 @@ class DoobieMultipleResultFunctionWithStatusIntegrationTests extends AnyFunSuite
)

val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson")).unsafeRunSync()
val actualData = results match {
case Left(_) => fail("should not be left")
case Right(dataWithStatuses) => dataWithStatuses
}

assert(actualData.length == 1)
assert(actualData.head == expectedResultElem)
assert(results.toSet == expectedResultElem)
}

test("Retrieving non-existing actor from database, no match") {
val expectedErr = Left(DataNotFoundException(FunctionStatus(41, "No actor found")))
val results = getActorsByLastname(GetActorsByLastnameQueryParameters("TotallyNonExisting!")).unsafeRunSync()
results match {
case Left(err) =>
assert(err.status.statusText == "No actor found")
assert(err.status.statusCode == 41)
case Right(_) => fail("should not be right")
}

assert(results.length == 1)
assert(results.head.isLeft)
assert(results.head == expectedErr)
}
}
12 changes: 12 additions & 0 deletions slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ object SlickFunction {
) extends DBMultipleResultFunctionWithStatus[I, R, SlickPgEngine, Future](functionNameOverride)
with SlickFunctionWithStatus[I, R]

/**
* Similar as `SlickMultipleResultFunctionWithStatus` but the statuses are aggregated into a single value.
* The algorithm for performing the aggregation is based on provided implementation of `StatusAggregator.aggregate`.
*/
abstract class SlickMultipleResultFunctionWithAggStatus[I, R](
functionNameOverride: Option[String] = None
)(implicit
override val schema: DBSchema,
dBEngine: SlickPgEngine
) extends DBMultipleResultFunctionWithAggStatus[I, R, SlickPgEngine, Future](functionNameOverride)
with SlickFunctionWithStatus[I, R]

/**
* Class for Slick DB functions with optional result.
*/
Expand Down
Loading

0 comments on commit c5c26b2

Please sign in to comment.