Skip to content

Commit

Permalink
#118: aggregation of errors into a single one, basic three implementa…
Browse files Browse the repository at this point in the history
…tions, unit tests and small refactoring
  • Loading branch information
lsulak committed Apr 3, 2024
1 parent bdcdce5 commit 3455339
Show file tree
Hide file tree
Showing 29 changed files with 799 additions and 113 deletions.
27 changes: 8 additions & 19 deletions core/src/main/scala/za/co/absa/fadb/DBEngine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package za.co.absa.fadb

import cats.Monad
import cats.implicits.toFunctorOps
import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.fadb.status.FunctionStatusWithData
import za.co.absa.fadb.status.ExceptionOrStatusWithDataRow

import scala.language.higherKinds

Expand All @@ -39,19 +38,15 @@ abstract class DBEngine[F[_]: Monad] {

/**
* The actual query executioner of the queries of the engine
*
* Two methods provided, one for dealing with query of type with no status, and the other for status-provided queries.
*
* @param query - the query to execute
* @tparam R - return type of the query
* @return - sequence of the results of database query
*/
protected def run[R](query: QueryType[R]): F[Seq[R]]

/**
* The actual query executioner of the queries of the engine with status
* @param query - the query to execute
* @tparam R - return type of the query
* @return - result of database query with status
*/
protected def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]]
protected def runWithStatus[R](query: QueryWithStatusType[R]): F[Seq[ExceptionOrStatusWithDataRow[R]]]

/**
* Public method to execute when query is expected to return multiple results
Expand All @@ -63,7 +58,7 @@ abstract class DBEngine[F[_]: Monad] {
* @return - sequence of the results of database query
*/
def fetchAll[R](query: QueryType[R]): F[Seq[R]] = run(query)
def fetchAllWithStatus[R](query: QueryWithStatusType[R]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] =
def fetchAllWithStatus[R](query: QueryWithStatusType[R]): F[Seq[ExceptionOrStatusWithDataRow[R]]] =
runWithStatus(query)

/**
Expand All @@ -76,7 +71,7 @@ abstract class DBEngine[F[_]: Monad] {
* @return - sequence of the results of database query
*/
def fetchHead[R](query: QueryType[R]): F[R] = run(query).map(_.head)
def fetchHeadWithStatus[R](query: QueryWithStatusType[R]): F[DBEngine.ExceptionOrStatusWithData[R]] =
def fetchHeadWithStatus[R](query: QueryWithStatusType[R]): F[ExceptionOrStatusWithDataRow[R]] =
runWithStatus(query).map(_.head)

/**
Expand All @@ -89,12 +84,6 @@ abstract class DBEngine[F[_]: Monad] {
* @return - sequence of the results of database query
*/
def fetchHeadOption[R](query: QueryType[R]): F[Option[R]] = run(query).map(_.headOption)
def fetchHeadOptionWithStatus[R](query: QueryWithStatusType[R]): F[Option[DBEngine.ExceptionOrStatusWithData[R]]] =
def fetchHeadOptionWithStatus[R](query: QueryWithStatusType[R]): F[Option[ExceptionOrStatusWithDataRow[R]]] =
runWithStatus(query).map(_.headOption)
}

case object DBEngine {

type ExceptionOrStatusWithData[R] = Either[StatusException, FunctionStatusWithData[R]]

}
23 changes: 13 additions & 10 deletions core/src/main/scala/za/co/absa/fadb/DBFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package za.co.absa.fadb

import cats.MonadError
import cats.implicits.toFlatMapOps
import za.co.absa.fadb.status.FunctionStatusWithData
import za.co.absa.fadb.status.aggregation.StatusAggregation
import za.co.absa.fadb.status.handling.StatusHandling
import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow, FunctionStatusWithData}

import scala.language.higherKinds

Expand Down Expand Up @@ -109,23 +110,23 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv
* @param values - The values to pass over to the database function.
* @return - A sequence of results from the database function.
*/
protected def multipleResults(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] =
protected def multipleResults(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[ExceptionOrStatusWithDataRow[R]]] =
query(values).flatMap(q => dBEngine.fetchAllWithStatus(q))

/**
* Executes the database function and returns a single result.
* @param values - The values to pass over to the database function.
* @return - A single result from the database function.
*/
protected def singleResult(values: I)(implicit me: MonadError[F, Throwable]): F[DBEngine.ExceptionOrStatusWithData[R]] =
protected def singleResult(values: I)(implicit me: MonadError[F, Throwable]): F[ExceptionOrStatusWithDataRow[R]] =
query(values).flatMap(q => dBEngine.fetchHeadWithStatus(q))

/**
* Executes the database function and returns an optional result.
* @param values - The values to pass over to the database function.
* @return - An optional result from the database function.
*/
protected def optionalResult(values: I)(implicit me: MonadError[F, Throwable]): F[Option[DBEngine.ExceptionOrStatusWithData[R]]] =
protected def optionalResult(values: I)(implicit me: MonadError[F, Throwable]): F[Option[ExceptionOrStatusWithDataRow[R]]] =
query(values).flatMap(q => dBEngine.fetchHeadOptionWithStatus(q))

/**
Expand All @@ -147,7 +148,7 @@ abstract class DBFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](functionNameOv
protected def query(values: I)(implicit me: MonadError[F, Throwable]): F[dBEngine.QueryWithStatusType[R]]

// To be provided by an implementation of QueryStatusHandling
override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): DBEngine.ExceptionOrStatusWithData[A]
override def checkStatus[A](statusWithData: FunctionStatusWithData[A]): ExceptionOrStatusWithDataRow[A]
}

object DBFunction {
Expand Down Expand Up @@ -230,7 +231,8 @@ object DBFunction {
abstract class DBMultipleResultFunctionWithStatus[I, R, E <: DBEngine[F], F[_]](
functionNameOverride: Option[String] = None
)(implicit schema: DBSchema, dBEngine: E)
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride) {
extends DBFunctionWithStatus[I, R, E, F](functionNameOverride)
with StatusAggregation {

// A constructor that takes only the mandatory parameters and uses default values for the optional ones
def this()(implicit schema: DBSchema, dBEngine: E) = this(None)
Expand All @@ -244,8 +246,9 @@ object DBFunction {
* @return - a sequence of values, each coming from a row returned from the DB function transformed to scala
* type `R` wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Seq[DBEngine.ExceptionOrStatusWithData[R]]] =
multipleResults(values)
def apply(values: I)
(implicit me: MonadError[F, Throwable]): F[ExceptionOrStatusWithDataResultAgg[R]] =
multipleResults(values).flatMap(data => me.pure(aggregate(data)))
}

/**
Expand All @@ -270,7 +273,7 @@ object DBFunction {
* @return - the value returned from the DB function transformed to scala type `R`
* wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[DBEngine.ExceptionOrStatusWithData[R]] =
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[ExceptionOrStatusWithDataRow[R]] =
singleResult(values)
}

Expand All @@ -296,7 +299,7 @@ object DBFunction {
* @return - the value returned from the DB function transformed to scala type `R` if a row is returned,
* otherwise `None`, wrapped around with Either, providing StatusException if raised
*/
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[DBEngine.ExceptionOrStatusWithData[R]]] =
def apply(values: I)(implicit me: MonadError[F, Throwable]): F[Option[ExceptionOrStatusWithDataRow[R]]] =
optionalResult(values)
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/za/co/absa/fadb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.fadb

import za.co.absa.fadb.status.FunctionStatusWithData
import za.co.absa.fadb.status.{ExceptionOrStatusWithDataRow, FunctionStatusWithData}

/**
* The basis for all query types of [[DBEngine]] implementations
Expand Down Expand Up @@ -44,14 +44,14 @@ trait QueryWithStatus[A, B, R] {
* @param statusWithData - the status with data
* @return either a status exception or the data
*/
def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[B]): DBEngine.ExceptionOrStatusWithData[R]
def toStatusExceptionOrData(statusWithData: FunctionStatusWithData[B]): ExceptionOrStatusWithDataRow[R]

/**
* Returns the result of the query or a status exception
* @param initialResult - the initial result of the query
* @return the result of the query or a status exception
*/
def getResultOrException(initialResult: A): DBEngine.ExceptionOrStatusWithData[R] =
def getResultOrException(initialResult: A): ExceptionOrStatusWithDataRow[R] =
toStatusExceptionOrData(
processStatus(initialResult)
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.fadb.status.aggregation

import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow, FunctionStatusWithData}

/**
* `StatusAggregation` is a base trait that defines the interface for aggregating the statuses of a function invocation.
* It provides a method to aggregate the error statuses into a single status information - this is typically needed
* for database functions that retrieve multiple records.
*/
trait StatusAggregation {

private [aggregation] def gatherExceptions[R](
eithersWithException: Seq[ExceptionOrStatusWithDataRow[R]]
): Seq[StatusException] = {

eithersWithException.flatMap {
case Left(exception) => Some(exception)
case _ => None
}
}

private [aggregation] def gatherDataWithStatuses[R](
eithersWithData: Seq[ExceptionOrStatusWithDataRow[R]]
): Seq[FunctionStatusWithData[R]] = {
eithersWithData.flatMap {
case Left(_) => None
case Right(dataWithStatuses) => Some(dataWithStatuses)
}
}

/**
* Aggregates the error status information into a single error.
*
* @param statusesWithData - The status of the function invocation with data.
* @return Either a `StatusException` if the status code indicates an error, or the data (along with the status
* information so that it's retrievable) if the status being returned doesn't indicate an error.
*/
def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.fadb.status.aggregation.implementations

import za.co.absa.fadb.status.aggregation.StatusAggregation
import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow}

/**
* `AggregateByFirstError` is a trait that extends the `StatusAggregation` interface.
* It provides an implementation for aggregating error statuses of a function invocation into a single error
* by choosing the first error encountered to be the representative one (i.e. if there are multiple errors of other
* types being returned, only the first one would be chosen and the rest would be ignored).
*/
trait AggregateByFirstError extends StatusAggregation {

override def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R] = {
val firstError = gatherExceptions(statusesWithData).headOption

val dataFinal = gatherDataWithStatuses(statusesWithData)

firstError match {
case Some(statusException) => Left(statusException)
case None => Right(dataFinal)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.fadb.status.aggregation.implementations

import za.co.absa.fadb.status.aggregation.StatusAggregation
import za.co.absa.fadb.status.{ExceptionOrStatusWithDataResultAgg, ExceptionOrStatusWithDataRow}

/**
* `AggregateByFirstRow` is a trait that extends the `StatusAggregation` interface.
* It provides an implementation for aggregating error statuses of a function invocation into a single error
* by choosing the first row that was returned to be the representative one
* (i.e. if there is an error on row two or later, it would be ignored).
*/
trait AggregateByFirstRow extends StatusAggregation {

override def aggregate[R](statusesWithData: Seq[ExceptionOrStatusWithDataRow[R]]): ExceptionOrStatusWithDataResultAgg[R] = {
val firstRow = statusesWithData.headOption

val dataFinal = gatherDataWithStatuses(statusesWithData)

firstRow match {
case Some(exceptionOrDataWithStatuses) => exceptionOrDataWithStatuses match {
case Left(statusException) => Left(statusException)
case Right(_) => Right(dataFinal)
}
case None => Right(Seq.empty)
}
}

}
Loading

0 comments on commit 3455339

Please sign in to comment.