diff --git a/core/src/main/scala/za/co/absa/fadb/DBEngine.scala b/core/src/main/scala/za/co/absa/fadb/DBEngine.scala index 792106c5..f5702974 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBEngine.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBEngine.scala @@ -18,8 +18,7 @@ package za.co.absa.fadb import cats.Monad import cats.implicits.toFunctorOps -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.status.ExceptionOrStatusWithDataRow import scala.language.higherKinds @@ -39,19 +38,15 @@ abstract class DBEngine[F[_]: Monad] { /** * The actual query executioner of the queries of the engine + * + * Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries. + * * @param query - the query to execute * @tparam R - return type of the query * @return - sequence of the results of database query */ protected def run[R](query: QueryType[R]): F[Seq[R]] - - /** - * The actual query executioner of the queries of the engine with status - * @param query - the query to execute - * @tparam R - return type of the query - * @return - result of database query with status - */ - protected def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] + protected def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[ExceptionOrStatusWithDataRow[R]]] /** * Public method to execute when query is expected to return multiple results @@ -63,7 +58,7 @@ abstract class DBEngine[F[_]: Monad] { * @return - sequence of the results of database query */ def fetchAll[R](query: QueryType[R]): F[Seq[R]] = run(query) - def fetchAllWithStatus[R](query: QueryWithStatusType[R]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = + def fetchAllWithStatus[R](query: QueryWithStatusType[R]): F[Seq[ExceptionOrStatusWithDataRow[R]]] = runWithStatus(query) /** @@ -76,7 +71,7 @@ abstract class DBEngine[F[_]: Monad] { * @return - sequence of the results of database query */ def fetchHead[R](query: QueryType[R]): F[R] = run(query).map(_.head) - def fetchHeadWithStatus[R](query: QueryWithStatusType[R]): F[DBEngine.ExceptionOrStatusWithData[R]] = + def fetchHeadWithStatus[R](query: QueryWithStatusType[R]): F[ExceptionOrStatusWithDataRow[R]] = runWithStatus(query).map(_.head) /** @@ -89,12 +84,6 @@ abstract class DBEngine[F[_]: Monad] { * @return - sequence of the results of database query */ def fetchHeadOption[R](query: QueryType[R]): F[Option[R]] = run(query).map(_.headOption) - def fetchHeadOptionWithStatus[R](query: QueryWithStatusType[R]): F[Option[DBEngine.ExceptionOrStatusWithData[R]]] = + def fetchHeadOptionWithStatus[R](query: QueryWithStatusType[R]): F[Option[ExceptionOrStatusWithDataRow[R]]] = runWithStatus(query).map(_.headOption) } - -case object DBEngine { - - type ExceptionOrStatusWithData[R] = Either[StatusException, FunctionStatusWithData[R]] - -} \ No newline at end of file diff --git a/core/src/main/scala/za/co/absa/fadb/DBFunction.scala b/core/src/main/scala/za/co/absa/fadb/DBFunction.scala index 7e140be6..3a5f17bd 100644 --- a/core/src/main/scala/za/co/absa/fadb/DBFunction.scala +++ b/core/src/main/scala/za/co/absa/fadb/DBFunction.scala @@ -18,8 +18,9 @@ package za.co.absa.fadb import cats.MonadError import cats.implicits.toFlatMapOps -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.status.aggregation.StatusAggregation import za.co.absa.fadb.status.handling.StatusHandling +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow, FunctionStatusWithData} import scala.language.higherKinds @@ -109,7 +110,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv * @param values - The values to pass over to the database function. * @return - A sequence of results from the database function. */ - protected def multipleResults(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = + protected def multipleResults(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[ExceptionOrStatusWithDataRow[R]]] = query(values).flatMap(q => dBEngine.fetchAllWithStatus(q)) /** @@ -117,7 +118,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv * @param values - The values to pass over to the database function. * @return - A single result from the database function. */ - protected def singleResult(values: I)(implicit me: MonadError[F, Throwable]): F[DBEngine.ExceptionOrStatusWithData[R]] = + protected def singleResult(values: I)(implicit me: MonadError[F, Throwable]): F[ExceptionOrStatusWithDataRow[R]] = query(values).flatMap(q => dBEngine.fetchHeadWithStatus(q)) /** @@ -125,7 +126,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv * @param values - The values to pass over to the database function. * @return - An optional result from the database function. */ - protected def optionalResult(values: I)(implicit me: MonadError[F, Throwable]): F[Option[DBEngine.ExceptionOrStatusWithData[R]]] = + protected def optionalResult(values: I)(implicit me: MonadError[F, Throwable]): F[Option[ExceptionOrStatusWithDataRow[R]]] = query(values).flatMap(q => dBEngine.fetchHeadOptionWithStatus(q)) /** @@ -147,7 +148,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv protected def query(values: I)(implicit me: MonadError[F, Throwable]): F[dBEngine.QueryWithStatusType[R]] // To be provided by an implementation of QueryStatusHandling - override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A] + override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A] } object DBFunction { @@ -230,7 +231,8 @@ object DBFunction { abstract class DBMultipleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]]( functionNameOverride: Option[String] = None )(implicit schema: DBSchema, dBEngine: E) - extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) { + extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) + with StatusAggregation { // A constructor that takes only the mandatory parameters and uses default values for the optional ones def this()(implicit schema: DBSchema, dBEngine: E) = this(None) @@ -244,8 +246,9 @@ object DBFunction { * @return - a sequence of values, each coming from a row returned from the DB function transformed to scala * type `R` wrapped around with Either, providing StatusException if raised */ - def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = - multipleResults(values) + def apply(values: I) + (implicit me: MonadError[F, Throwable]): F[ExceptionOrStatusWithDataResultAgg[R]] = + multipleResults(values).flatMap(data => me.pure(aggregate(data))) } /** @@ -270,7 +273,7 @@ object DBFunction { * @return - the value returned from the DB function transformed to scala type `R` * wrapped around with Either, providing StatusException if raised */ - def apply(values: I)(implicit me: MonadError[F, Throwable]): F[DBEngine.ExceptionOrStatusWithData[R]] = + def apply(values: I)(implicit me: MonadError[F, Throwable]): F[ExceptionOrStatusWithDataRow[R]] = singleResult(values) } @@ -296,7 +299,7 @@ object DBFunction { * @return - the value returned from the DB function transformed to scala type `R` if a row is returned, * otherwise `None`, wrapped around with Either, providing StatusException if raised */ - def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[DBEngine.ExceptionOrStatusWithData[R]]] = + def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[ExceptionOrStatusWithDataRow[R]]] = optionalResult(values) } diff --git a/core/src/main/scala/za/co/absa/fadb/Query.scala b/core/src/main/scala/za/co/absa/fadb/Query.scala index 43520ada..a2638eca 100644 --- a/core/src/main/scala/za/co/absa/fadb/Query.scala +++ b/core/src/main/scala/za/co/absa/fadb/Query.scala @@ -16,7 +16,7 @@ package za.co.absa.fadb -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData} /** * The basis for all query types of [[DBEngine]] implementations @@ -44,14 +44,14 @@ trait QueryWithStatus[A, B, R] { * @param statusWithData - the status with data * @return either a status exception or the data */ - def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[B]): DBEngine.ExceptionOrStatusWithData[R] + def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[B]): ExceptionOrStatusWithDataRow[R] /** * Returns the result of the query or a status exception * @param initialResult - the initial result of the query * @return the result of the query or a status exception */ - def getResultOrException(initialResult: A): DBEngine.ExceptionOrStatusWithData[R] = + def getResultOrException(initialResult: A): ExceptionOrStatusWithDataRow[R] = toStatusExceptionOrData( processStatus(initialResult) ) diff --git a/core/src/main/scala/za/co/absa/fadb/status/FunctionStatusWithData.scala b/core/src/main/scala/za/co/absa/fadb/status/FunctionStatusWithData.scala deleted file mode 100644 index 966372e5..00000000 --- a/core/src/main/scala/za/co/absa/fadb/status/FunctionStatusWithData.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.fadb.status - -/** - * Represents a function status with data. - * @param functionStatus the function status - * @param data the data of one row (barring the status fields) - * @tparam A the type of the data - */ -case class FunctionStatusWithData[A](functionStatus: FunctionStatus, data: A) diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregation.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregation.scala new file mode 100644 index 00000000..596db872 --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/StatusAggregation.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation + +import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow, FunctionStatusWithData} + +/** + * `StatusAggregation` is a base trait that defines the interface for aggregating the statuses of a function invocation. + * It provides a method to aggregate the error statuses into a single status information - this is typically needed + * for database functions that retrieve multiple records. + */ +trait StatusAggregation { + + private [aggregation] def gatherExceptions[R]( + eithersWithException: Seq[ExceptionOrStatusWithDataRow[R]] + ): Seq[StatusException] = { + + eithersWithException.flatMap { + case Left(exception) => Some(exception) + case _ => None + } + } + + private [aggregation] def gatherDataWithStatuses[R]( + eithersWithData: Seq[ExceptionOrStatusWithDataRow[R]] + ): Seq[FunctionStatusWithData[R]] = { + eithersWithData.flatMap { + case Left(_) => None + case Right(dataWithStatuses) => Some(dataWithStatuses) + } + } + + /** + * Aggregates the error status information into a single error. + * + * @param statusesWithData - The status of the function invocation with data. + * @return Either a `StatusException` if the status code indicates an error, or the data (along with the status + * information so that it's retrievable) if the status being returned doesn't indicate an error. + */ + def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R] +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstError.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstError.scala new file mode 100644 index 00000000..1de202b4 --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstError.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import za.co.absa.fadb.status.aggregation.StatusAggregation +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow} + +/** + * `AggregateByFirstError` is a trait that extends the `StatusAggregation` interface. + * It provides an implementation for aggregating error statuses of a function invocation into a single error + * by choosing the first error encountered to be the representative one (i.e. if there are multiple errors of other + * types being returned, only the first one would be chosen and the rest would be ignored). + */ +trait AggregateByFirstError extends StatusAggregation { + + override def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R] = { + val firstError = gatherExceptions(statusesWithData).headOption + + val dataFinal = gatherDataWithStatuses(statusesWithData) + + firstError match { + case Some(statusException) => Left(statusException) + case None => Right(dataFinal) + } + } + +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstRow.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstRow.scala new file mode 100644 index 00000000..aef5350a --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstRow.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import za.co.absa.fadb.status.aggregation.StatusAggregation +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow} + +/** + * `AggregateByFirstRow` is a trait that extends the `StatusAggregation` interface. + * It provides an implementation for aggregating error statuses of a function invocation into a single error + * by choosing the first row that was returned to be the representative one + * (i.e. if there is an error on row two or later, it would be ignored). + */ +trait AggregateByFirstRow extends StatusAggregation { + + override def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R] = { + val firstRow = statusesWithData.headOption + + val dataFinal = gatherDataWithStatuses(statusesWithData) + + firstRow match { + case Some(exceptionOrDataWithStatuses) => exceptionOrDataWithStatuses match { + case Left(statusException) => Left(statusException) + case Right(_) => Right(dataFinal) + } + case None => Right(Seq.empty) + } + } + +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByMajorityErrors.scala b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByMajorityErrors.scala new file mode 100644 index 00000000..b46553c2 --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByMajorityErrors.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import za.co.absa.fadb.status.aggregation.StatusAggregation +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow} + +/** + * `AggregateByMajorityErrors` is a trait that extends the `StatusAggregation` interface. + * It provides an implementation for aggregating error statuses of a function invocation into a single error + * by choosing the error that occurred the most. + */ +trait AggregateByMajorityErrors extends StatusAggregation { + + private[aggregation] def gimmeMajorityWinner[T](inputData: Seq[T]): Option[T] = { + if (inputData.isEmpty) { + None + } else { + val grouped = inputData.groupBy(identity) + val maxCount = grouped.values.map(_.size).max + val mostOccurred = grouped.filter(_._2.size == maxCount).keys.toList + Some(mostOccurred.head) + } + } + + override def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R] = { + val allErrors = gatherExceptions(statusesWithData) + val majorityError = gimmeMajorityWinner(allErrors) + val dataFinal = gatherDataWithStatuses(statusesWithData) + + majorityError match { + case Some(statusException) => Left(statusException) + case None => Right(dataFinal) + } + } + +} diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala index 3cc5fa21..632c86c7 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/StatusHandling.scala @@ -16,8 +16,7 @@ package za.co.absa.fadb.status.handling -import za.co.absa.fadb.DBEngine -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData} /** * `StatusHandling` is a base trait that defines the interface for handling the status of a function invocation. @@ -31,5 +30,5 @@ trait StatusHandling { * @return Either a `StatusException` if the status code indicates an error, or the data (along with the status * information so that it's retrievable) if the status code is successful. */ - def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A] + def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A] } diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala index 36dfefcd..bebf8ac6 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/StandardStatusHandling.scala @@ -16,9 +16,8 @@ package za.co.absa.fadb.status.handling.implementations -import za.co.absa.fadb.DBEngine import za.co.absa.fadb.exceptions._ -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData} import za.co.absa.fadb.status.handling.StatusHandling /** @@ -30,7 +29,7 @@ trait StandardStatusHandling extends StatusHandling { /** * Checks the status of a function invocation. */ - override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A] = { + override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A] = { val functionStatus = statusWithData.functionStatus functionStatus.statusCode / 10 match { case 1 => Right(statusWithData) diff --git a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala index 97cfe11e..840dc6b0 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala +++ b/core/src/main/scala/za/co/absa/fadb/status/handling/implementations/UserDefinedStatusHandling.scala @@ -16,10 +16,9 @@ package za.co.absa.fadb.status.handling.implementations -import za.co.absa.fadb.DBEngine import za.co.absa.fadb.exceptions.OtherStatusException -import za.co.absa.fadb.status.FunctionStatusWithData import za.co.absa.fadb.status.handling.StatusHandling +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData} /** * Trait represents user defined status handling @@ -27,7 +26,7 @@ import za.co.absa.fadb.status.handling.StatusHandling trait UserDefinedStatusHandling extends StatusHandling { def OKStatuses: Set[Integer] - override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A] = + override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A] = if (OKStatuses.contains(statusWithData.functionStatus.statusCode)) { Right(statusWithData) } else { diff --git a/core/src/main/scala/za/co/absa/fadb/status/package.scala b/core/src/main/scala/za/co/absa/fadb/status/package.scala new file mode 100644 index 00000000..06ccb4ec --- /dev/null +++ b/core/src/main/scala/za/co/absa/fadb/status/package.scala @@ -0,0 +1,34 @@ +package za.co.absa.fadb + +import za.co.absa.fadb.exceptions.StatusException + +package object status { + + /** + * Class represents the status of calling a fa-db function (if it supports status that is) + */ + case class FunctionStatus(statusCode: Int, statusText: String) + + /** + * Represents a function status with data. + * @param functionStatus the function status + * @param data the data of one row (barring the status fields) + * @tparam R the type of the data + */ + case class FunctionStatusWithData[R](functionStatus: FunctionStatus, data: R) + + /** + * This is a representation of a single row returned from a DB function with processed status information. + * + * Note: R here represents a single row reduced by status-related columns, i.e. a type of data. + */ + type ExceptionOrStatusWithDataRow[R] = Either[StatusException, FunctionStatusWithData[R]] + + /** + * This is a representation of multiple rows returned from a DB function with processed status information, + * with error statuses aggregated to a single one. + * + * Note: R here represents a single row reduced by status-related columns, i.e. a type of data. + */ + type ExceptionOrStatusWithDataResultAgg[R] = Either[StatusException, Seq[FunctionStatusWithData[R]]] +} diff --git a/core/src/test/scala/za/co/absa/fadb/DBFunctionSuite.scala b/core/src/test/scala/za/co/absa/fadb/DBFunctionSuite.scala index e4242db0..88cc0be7 100644 --- a/core/src/test/scala/za/co/absa/fadb/DBFunctionSuite.scala +++ b/core/src/test/scala/za/co/absa/fadb/DBFunctionSuite.scala @@ -21,6 +21,7 @@ import cats.implicits._ import org.scalatest.funsuite.AnyFunSuite import za.co.absa.fadb.DBFunction.DBSingleResultFunction import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits.namingConvention +import za.co.absa.fadb.status.ExceptionOrStatusWithDataRow import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -33,7 +34,7 @@ class DBFunctionSuite extends AnyFunSuite { class EngineThrow extends DBEngine[Future] { override def run[R](query: QueryType[R]): Future[Seq[R]] = neverHappens - override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = neverHappens + override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Seq[ExceptionOrStatusWithDataRow[R]]] = neverHappens } private object FooNamed extends DBSchema diff --git a/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionSuite.scala b/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionTest.scala similarity index 96% rename from core/src/test/scala/za/co/absa/fadb/status/StatusExceptionSuite.scala rename to core/src/test/scala/za/co/absa/fadb/status/StatusExceptionTest.scala index 73ec77d7..8e7578d1 100644 --- a/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionSuite.scala +++ b/core/src/test/scala/za/co/absa/fadb/status/StatusExceptionTest.scala @@ -19,7 +19,7 @@ package za.co.absa.fadb.status import org.scalatest.funsuite.AnyFunSuite import za.co.absa.fadb.exceptions._ -class StatusExceptionSuite extends AnyFunSuite { +class StatusExceptionTest extends AnyFunSuite { test("Test equals - when they are the same") { val statusException = DataConflictException(FunctionStatus(10, "OK")) val otherStatusException = DataConflictException(FunctionStatus(10, "OK")) diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregationTest.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregationTest.scala new file mode 100644 index 00000000..cff87f32 --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/StatusAggregationTest.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2022ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation + +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow, FunctionStatus, FunctionStatusWithData} + +class StatusAggregationTest extends AnyFunSuite { + + private val aggregateByFirstRowUnderTest: StatusAggregation = new StatusAggregation { + override def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): + ExceptionOrStatusWithDataResultAgg[R] = Right(Seq.empty) + } + + test("gatherExceptions should return empty Seq on empty input") { + val testData = Seq.empty + val expectedGatheredExceptions = Seq.empty + + val actualGatheredExceptions = aggregateByFirstRowUnderTest.gatherExceptions(testData) + assert(actualGatheredExceptions == expectedGatheredExceptions) + } + + test("gatherExceptions should gather exceptions") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(ErrorInDataException(FunctionStatus(50, "Some data error"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedGatheredExceptions = Seq( + DataNotFoundException(FunctionStatus(42, "Data not found")), + ErrorInDataException(FunctionStatus(50, "Some data error")) + ) + + val actualGatheredExceptions = aggregateByFirstRowUnderTest.gatherExceptions(testData) + assert(actualGatheredExceptions == expectedGatheredExceptions) + } + + test("gatherDataWithStatuses should return empty Seq on empty input") { + val testData = Seq.empty + val expectedGatheredExceptions = Seq.empty + + val actualGatheredExceptions = aggregateByFirstRowUnderTest.gatherDataWithStatuses(testData) + assert(actualGatheredExceptions == expectedGatheredExceptions) + } + + test("gatherDataWithStatuses should gather exceptions") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(ErrorInDataException(FunctionStatus(50, "Some data error"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedGatheredData = Seq( + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + + val actualGatheredData = aggregateByFirstRowUnderTest.gatherDataWithStatuses(testData) + assert(actualGatheredData == expectedGatheredData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstErrorTest.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstErrorTest.scala new file mode 100644 index 00000000..9742e39b --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstErrorTest.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} + +class AggregateByFirstErrorTest extends AnyFunSuiteLike { + + private val aggregateByFirstErrorUnderTest = new AggregateByFirstError {} + + test("aggregate should return Empty Seq in Right for an empty Sequence") { + val testData = Seq.empty + val expectedAggData = Right(Seq.empty) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return Seq with data in Right for a Sequence with data (no errors") { + val rawTestData = Seq( + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + val testData = rawTestData.map(Right(_)) // wrap so that it's Seq of Eithers with data + val expectedAggData = Right(rawTestData) // wrap so that it's Either of Seq with data + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is single error status code, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is a single error status code along with data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes along with data") { + val testData = Seq( + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstErrorUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstRowTest.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstRowTest.scala new file mode 100644 index 00000000..8b1d5da3 --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByFirstRowTest.scala @@ -0,0 +1,102 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} + +class AggregateByFirstRowTest extends AnyFunSuiteLike { + + private val aggregateByFirstRowUnderTest = new AggregateByFirstRow {} + + test("aggregate should return Empty Seq in Right for an empty Sequence") { + val testData = Seq.empty + val expectedAggData = Right(Seq.empty) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return Seq with data in Right for a Sequence with data (no errors") { + val rawTestData = Seq( + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + val testData = rawTestData.map(Right(_)) // wrap so that it's Seq of Eithers with data + val expectedAggData = Right(rawTestData) // wrap so that it's Either of Seq with data + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is single error status code, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is an error status as the first row, along with data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return data only, when there are multiple error status codes on non-first row, along with data") { + val testData = Seq( + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Right( + Seq( + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + ) + + val actualAggData = aggregateByFirstRowUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + +} diff --git a/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByMajorityErrorsTest.scala b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByMajorityErrorsTest.scala new file mode 100644 index 00000000..f2f9aeb4 --- /dev/null +++ b/core/src/test/scala/za/co/absa/fadb/status/aggregation/implementations/AggregateByMajorityErrorsTest.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.status.aggregation.implementations + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.fadb.exceptions._ +import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} + +class AggregateByMajorityErrorsTest extends AnyFunSuiteLike { + + private val aggregateByMajorityErrorsUnderTest = new AggregateByMajorityErrors {} + + test("gimmeMajorityWinner should return None for an empty Sequence") { + val testData = Seq.empty + val expectedError = None + + val actualError = aggregateByMajorityErrorsUnderTest.gimmeMajorityWinner(testData) + assert(actualError == expectedError) + } + + test("gimmeMajorityWinner should return error that occurred the most (one error has biggest distribution)") { + val testData: Seq[StatusException] = Seq( + DataNotFoundException(FunctionStatus(42, "Data not found")), + OtherStatusException(FunctionStatus(91, "Non classified error")), + ErrorInDataException(FunctionStatus(51, "Data not found another")), + DataNotFoundException(FunctionStatus(42, "Data not found")), + ) + val expectedError = DataNotFoundException(FunctionStatus(42, "Data not found")) + + val actualError = aggregateByMajorityErrorsUnderTest.gimmeMajorityWinner(testData) + assert(actualError.get == expectedError) + } + + test("gimmeMajorityWinner should return error that occurred the most (uniform distribution of errors, first wins)") { + val testData: Seq[StatusException] = Seq( + DataNotFoundException(FunctionStatus(42, "Data not found")), + OtherStatusException(FunctionStatus(91, "Non classified error")), + OtherStatusException(FunctionStatus(91, "Non classified error")), + DataNotFoundException(FunctionStatus(42, "Data not found")), + ) + val expectedError = DataNotFoundException(FunctionStatus(42, "Data not found")) + + val actualError = aggregateByMajorityErrorsUnderTest.gimmeMajorityWinner(testData) + assert(actualError.get == expectedError) + } + + test("aggregate should return Empty Seq in Right for an empty Sequence") { + val testData = Seq.empty + val expectedAggData = Right(Seq.empty) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return Seq with data in Right for a Sequence with data (no errors") { + val rawTestData = Seq( + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2")), + FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3")), + ) + val testData = rawTestData.map(Right(_)) // wrap so that it's Seq of Eithers with data + val expectedAggData = Right(rawTestData) // wrap so that it's Either of Seq with data + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is single error status code, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes, uniform distribution, no data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there is an error status as the first row, along with data") { + val testData = Seq( + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + + test("aggregate should return a single Left only, when there are multiple error status codes with majority on err 42, along with data") { + val testData = Seq( + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName1", "SecondName1"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName2", "SecondName2"))), + Left(DataNotFoundException(FunctionStatus(43, "Data not found another"))), + Right(FunctionStatusWithData(FunctionStatus(10, "Ok"), ("FirstName3", "SecondName3"))), + Left(DataNotFoundException(FunctionStatus(42, "Data not found"))), + ) + val expectedAggData = Left(DataNotFoundException(FunctionStatus(42, "Data not found"))) + + val actualAggData = aggregateByMajorityErrorsUnderTest.aggregate(testData) + assert(actualAggData == expectedAggData) + } + +} diff --git a/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusTest.scala b/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusTest.scala index 809b2a8d..787c0c85 100644 --- a/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusTest.scala +++ b/doobie/src/it/scala/za/co/absa/fadb/doobie/DoobieMultipleResultFunctionWithStatusTest.scala @@ -23,8 +23,9 @@ import doobie.implicits.toSqlInterpolator import org.scalatest.funsuite.AnyFunSuite import za.co.absa.fadb.DBSchema import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithStatus -import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} +import za.co.absa.fadb.status.aggregation.implementations.AggregateByMajorityErrors import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} class DoobieMultipleResultFunctionWithStatusTest extends AnyFunSuite with DoobieTest { @@ -35,7 +36,8 @@ class DoobieMultipleResultFunctionWithStatusTest extends AnyFunSuite with Doobie class GetActorsByLastname(implicit schema: DBSchema, dbEngine: DoobieEngine[IO]) // Option[Actor] because: Actor might not exist, and the function would return only status info without actor data extends DoobieMultipleResultFunctionWithStatus[GetActorsByLastnameQueryParameters, Option[Actor], IO](getActorsByLastnameQueryFragments) - with StandardStatusHandling { + with StandardStatusHandling + with AggregateByMajorityErrors { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") } @@ -48,43 +50,47 @@ class DoobieMultipleResultFunctionWithStatusTest extends AnyFunSuite with Doobie ) val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).unsafeRunSync() - val actualData = results.map { + val actualData = results match { case Left(_) => fail("should not be left") - case Right(value) => value + case Right(dataWithStatuses) => dataWithStatuses } assert(actualData.length == expectedResultElem.size) assert(actualData.toSet == expectedResultElem) } test("Retrieving single actor from database, full match") { - val expectedResultElem = Actor(50, "Liza", "Simpson") + val expectedResultElem = FunctionStatusWithData( + FunctionStatus(12, "OK, full match"), Some(Actor(50, "Liza", "Simpson")) + ) val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson", Some("Liza"))).unsafeRunSync() - val actualData = results.map { + val actualData = results match { case Left(_) => fail("should not be left") - case Right(value) => value.data + case Right(dataWithStatuses) => dataWithStatuses } assert(actualData.length == 1) - assert(actualData.head.get == expectedResultElem) + assert(actualData.head == expectedResultElem) } test("Retrieving single actor from database, lastname match") { - val expectedResultElem = Actor(50, "Liza", "Simpson") + val expectedResultElem = FunctionStatusWithData( + FunctionStatus(11, "OK, match on last name only"), Some(Actor(50, "Liza", "Simpson")) + ) val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Simpson")).unsafeRunSync() - val actualData = results.map { + val actualData = results match { case Left(_) => fail("should not be left") - case Right(value) => value.data + case Right(dataWithStatuses) => dataWithStatuses } assert(actualData.length == 1) - assert(actualData.head.get == expectedResultElem) + assert(actualData.head == expectedResultElem) } test("Retrieving non-existing actor from database, no match") { val results = getActorsByLastname(GetActorsByLastnameQueryParameters("TotallyNonExisting!")).unsafeRunSync() - results.map { + results match { case Left(err) => assert(err.status.statusText == "No actor found") assert(err.status.statusCode == 41) diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala index 1d44eaec..6495afe3 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieEngine.scala @@ -22,8 +22,7 @@ import doobie._ import doobie.implicits._ import doobie.util.Read import za.co.absa.fadb.DBEngine -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.status.ExceptionOrStatusWithDataRow import scala.language.higherKinds @@ -65,7 +64,7 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ */ private def executeQueryWithStatus[R]( query: QueryWithStatusType[R] - )(implicit readStatusWithDataR: Read[StatusWithData[R]]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = { + )(implicit readStatusWithDataR: Read[StatusWithData[R]]): F[Seq[ExceptionOrStatusWithDataRow[R]]] = { query.fragment.query[StatusWithData[R]].to[Seq].transact(transactor).map(_.map(query.getResultOrException)) } @@ -84,7 +83,7 @@ class DoobieEngine[F[_]: Async](val transactor: Transactor[F]) extends DBEngine[ * @param query the Doobie query to run * @return the query result */ - override def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = { + override def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[ExceptionOrStatusWithDataRow[R]]] = { executeQueryWithStatus(query)(query.readStatusWithDataR) } } diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala index 991fe817..30c18c42 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieFunction.scala @@ -21,8 +21,8 @@ import doobie.implicits.toSqlInterpolator import doobie.util.Read import doobie.util.fragment.Fragment import za.co.absa.fadb.DBFunction._ -import za.co.absa.fadb.{DBEngine, DBSchema} -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData} import scala.language.higherKinds @@ -196,7 +196,7 @@ trait DoobieFunctionWithStatus[I, R, F[_]] extends DoobieFunctionBase[R] { } // This is to be mixed in by an implementation of StatusHandling - def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A] + def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A] } /** @@ -271,7 +271,7 @@ object DoobieFunction { val dbEngine: DoobieEngine[F], val readR: Read[R] ) extends DBMultipleResultFunctionWithStatus[I, R, DoobieEngine[F], F](functionNameOverride) - with DoobieFunctionWithStatus[I, R, F] + with DoobieFunctionWithStatus[I, R, F] /** * `DoobieOptionalResultFunction` represents a db function that returns an optional result. diff --git a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala index 9bed157a..b4fd7b64 100644 --- a/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala +++ b/doobie/src/main/scala/za/co/absa/fadb/doobie/DoobieQuery.scala @@ -18,8 +18,8 @@ package za.co.absa.fadb.doobie import doobie.util.Read import doobie.util.fragment.Fragment -import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} -import za.co.absa.fadb.{DBEngine, Query, QueryWithStatus} +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatus, FunctionStatusWithData} +import za.co.absa.fadb.{Query, QueryWithStatus} /** * `DoobieQuery` is a class that extends `Query` with `R` as the result type. @@ -40,7 +40,7 @@ class DoobieQuery[R](val fragment: Fragment)(implicit val readR: Read[R]) extend */ class DoobieQueryWithStatus[R]( val fragment: Fragment, - checkStatus: FunctionStatusWithData[R] => DBEngine.ExceptionOrStatusWithData[R] + checkStatus: FunctionStatusWithData[R] => ExceptionOrStatusWithDataRow[R] )(implicit val readStatusWithDataR: Read[StatusWithData[R]]) extends QueryWithStatus[StatusWithData[R], R, R] { @@ -57,6 +57,6 @@ class DoobieQueryWithStatus[R]( * @param statusWithData - the status with data * @return either a status exception or the data */ - override def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[R]): DBEngine.ExceptionOrStatusWithData[R] = + override def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[R]): ExceptionOrStatusWithDataRow[R] = checkStatus(statusWithData) } diff --git a/core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala b/slick/src/it/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala similarity index 57% rename from core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala rename to slick/src/it/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala index daa9de8c..e5904d61 100644 --- a/core/src/main/scala/za/co/absa/fadb/status/FunctionStatus.scala +++ b/slick/src/it/scala/za/co/absa/fadb/slick/OptionalActorSlickConverter.scala @@ -14,9 +14,20 @@ * limitations under the License. */ -package za.co.absa.fadb.status +package za.co.absa.fadb.slick + +import slick.jdbc.{GetResult, PositionedResult} /** - * Class represents the status of calling a fa-db function (if it supports status that is) + * A trait representing a converter from a Slick PositionedResult to an Actor. + * The trait is to be mixed into a SlickFunction returning an Actor. */ -case class FunctionStatus(statusCode: Int, statusText: String) +trait OptionalActorSlickConverter { + + protected def slickConverter: GetResult[Option[Actor]] = { + def converter(r: PositionedResult): Option[Actor] = { + Some(Actor(r.<<, r.<<, r.<<)) + } + GetResult(converter) + } +} diff --git a/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionTest.scala b/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionTest.scala index b86d1fb1..6a77d218 100644 --- a/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionTest.scala +++ b/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionTest.scala @@ -42,8 +42,11 @@ class SlickMultipleResultFunctionTest extends AnyFunSuite with SlickTest with Sc private val getActors = new GetActors()(Integration, new SlickPgEngine(db)) test("Retrieving actors from database") { - val expectedResultElem = Actor(49, "Pavel", "Marek") - val results = getActors(GetActorsQueryParameters(Some("Pavel"), Some("Marek"))) - assert(results.futureValue.contains(expectedResultElem)) + val expectedResultElem = Set( + Actor(51, "Fred", "Weasley"), + Actor(52, "George", "Weasley"), + ) + val results = getActors(GetActorsQueryParameters(lastName=Some("Weasley"), firstName=None)).futureValue + assert(results.toSet == expectedResultElem) } } diff --git a/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusTest.scala b/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusTest.scala new file mode 100644 index 00000000..fea51225 --- /dev/null +++ b/slick/src/it/scala/za/co/absa/fadb/slick/SlickMultipleResultFunctionWithStatusTest.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.fadb.slick + +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import slick.jdbc.SQLActionBuilder +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.slick.FaDbPostgresProfile.api._ +import za.co.absa.fadb.slick.SlickFunction.SlickMultipleResultFunctionWithStatus +import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} +import za.co.absa.fadb.status.aggregation.implementations.AggregateByFirstError +import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling + +import scala.concurrent.ExecutionContext.Implicits.global + + +class SlickMultipleResultFunctionWithStatusTest extends AnyFunSuite with SlickTest with ScalaFutures { + + class GetActorsByLastname(implicit override val schema: DBSchema, val dbEngine: SlickPgEngine) + extends SlickMultipleResultFunctionWithStatus[GetActorsByLastnameQueryParameters, Option[Actor]] + with StandardStatusHandling + with AggregateByFirstError + with OptionalActorSlickConverter { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("actor_id", "first_name", "last_name") + + override def sql(values: GetActorsByLastnameQueryParameters): SQLActionBuilder = { + sql"""SELECT #$selectEntry FROM #$functionName(${values.lastName},${values.firstName}) #$alias;""" + } + } + + private val getActorsByLastname = new GetActorsByLastname()(Integration, new SlickPgEngine(db)) + + test("Retrieving actors from database") { + val expectedResultElem = Set( + FunctionStatusWithData(FunctionStatus(11, "OK, match on last name only"), Some(Actor(51, "Fred", "Weasley"))), + FunctionStatusWithData(FunctionStatus(11, "OK, match on last name only"), Some(Actor(52, "George", "Weasley"))) + ) + + val results = getActorsByLastname(GetActorsByLastnameQueryParameters("Weasley")).futureValue + val actualData = results match { + case Left(_) => fail("should not be left") + case Right(dataWithStatuses) => dataWithStatuses + } + assert(actualData.length == 2) + assert(actualData.toSet == expectedResultElem) + + } +} diff --git a/slick/src/it/scala/za/co/absa/fadb/slick/SlickTest.scala b/slick/src/it/scala/za/co/absa/fadb/slick/SlickTest.scala index 669e97ed..ce7fa2ac 100644 --- a/slick/src/it/scala/za/co/absa/fadb/slick/SlickTest.scala +++ b/slick/src/it/scala/za/co/absa/fadb/slick/SlickTest.scala @@ -20,8 +20,10 @@ import slick.jdbc.JdbcBackend.Database import za.co.absa.fadb.DBSchema trait SlickTest { - case class CreateActorRequestBody(firstName: String, lastName: String) case class GetActorsQueryParameters(firstName: Option[String], lastName: Option[String]) + case class GetActorsByLastnameQueryParameters(lastName: String, firstName: Option[String] = None) + + case class CreateActorRequestBody(firstName: String, lastName: String) import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ object Integration extends DBSchema diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala index 67a9c076..39dd2b52 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickFunction.scala @@ -18,10 +18,9 @@ package za.co.absa.fadb.slick import cats.MonadError import slick.jdbc.{GetResult, SQLActionBuilder} -import za.co.absa.fadb.exceptions.StatusException import za.co.absa.fadb.DBFunction._ -import za.co.absa.fadb.{DBEngine, DBSchema} -import za.co.absa.fadb.status.FunctionStatusWithData +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData} import scala.concurrent.Future import scala.language.higherKinds @@ -90,7 +89,7 @@ private[slick] trait SlickFunctionWithStatus[I, R] extends SlickFunctionBase[I, } // Expected to be mixed in by an implementation of StatusHandling - def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A] + def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A] } object SlickFunction { diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala index a49d4668..7402b9ad 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickPgEngine.scala @@ -19,7 +19,7 @@ package za.co.absa.fadb.slick import cats.implicits._ import slick.jdbc.PostgresProfile.api._ import za.co.absa.fadb.DBEngine -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.fadb.status.ExceptionOrStatusWithDataRow import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds @@ -55,8 +55,8 @@ class SlickPgEngine(val db: Database)(implicit val executor: ExecutionContext) e * @tparam R - return the of the query * @return - either status exception or result of database query */ - override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Seq[DBEngine.ExceptionOrStatusWithData[R]]] = { - val slickAction = query.sql.as[DBEngine.ExceptionOrStatusWithData[R]](query.getStatusExceptionOrData) + override def runWithStatus[R](query: QueryWithStatusType[R]): Future[Seq[ExceptionOrStatusWithDataRow[R]]] = { + val slickAction = query.sql.as[ExceptionOrStatusWithDataRow[R]](query.getStatusExceptionOrData) db.run(slickAction) } } diff --git a/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala b/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala index 245ab7d0..44ff4d02 100644 --- a/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala +++ b/slick/src/main/scala/za/co/absa/fadb/slick/SlickQuery.scala @@ -17,9 +17,8 @@ package za.co.absa.fadb.slick import slick.jdbc.{GetResult, PositionedResult, SQLActionBuilder} -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.fadb.status.{FunctionStatus, FunctionStatusWithData} -import za.co.absa.fadb.{DBEngine, Query, QueryWithStatus} +import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatus, FunctionStatusWithData} +import za.co.absa.fadb.{Query, QueryWithStatus} /** * SQL query representation for Slick @@ -40,7 +39,7 @@ class SlickQuery[R](val sql: SQLActionBuilder, val getResult: GetResult[R]) exte class SlickQueryWithStatus[R]( val sql: SQLActionBuilder, val getResult: GetResult[R], - checkStatus: FunctionStatusWithData[PositionedResult] => DBEngine.ExceptionOrStatusWithData[PositionedResult] + checkStatus: FunctionStatusWithData[PositionedResult] => ExceptionOrStatusWithDataRow[PositionedResult] ) extends QueryWithStatus[PositionedResult, PositionedResult, R] { /** @@ -61,7 +60,7 @@ class SlickQueryWithStatus[R]( */ override def toStatusExceptionOrData( statusWithData: FunctionStatusWithData[PositionedResult] - ): DBEngine.ExceptionOrStatusWithData[R] = { + ): ExceptionOrStatusWithDataRow[R] = { val o = checkStatus(statusWithData) o match { case Left(statusException) => Left(statusException) @@ -80,7 +79,7 @@ class SlickQueryWithStatus[R]( * @return the GetResult, that combines the processing of the status and the conversion of the status with data * to either a status exception or the data */ - def getStatusExceptionOrData: GetResult[DBEngine.ExceptionOrStatusWithData[R]] = { + def getStatusExceptionOrData: GetResult[ExceptionOrStatusWithDataRow[R]] = { GetResult(pr => processStatus(pr)).andThen(fs => toStatusExceptionOrData(fs)) } }