Skip to content

Commit

Permalink
Funnel back the abort reason to the branch abort call
Browse files Browse the repository at this point in the history
So that differentiated behavior can be added to the abort logic
  • Loading branch information
Jonas Chapuis authored and jchapuis committed Nov 19, 2024
1 parent 6573ec8 commit cc294af
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cats.syntax.show.*
import cats.syntax.applicative.*
import cats.conversions.all.*
import endless.transaction.Branch
import endless.transaction.Transaction.AbortReason
import endless.transaction.example.algebra.Account
import endless.transaction.example.algebra.Account.InsufficientFunds
import endless.transaction.example.algebra.Accounts.TransferFailure
Expand Down Expand Up @@ -81,7 +82,7 @@ class TransferBranch[F[_]: Logger](accountID: AccountID, account: Account[F])(im
pure
)

def abort(transferID: TransferID): F[Unit] =
def abort(transferID: TransferID, reason: AbortReason[TransferFailure]): F[Unit] =
Logger[F].debug(show"Aborting transfer $transferID for account $accountID") >>
EitherT(
account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import endless.transaction.example.data.Transfer.TransferID
import endless.transaction.example.Generators
import cats.syntax.either.*
import endless.transaction.Branch
import endless.transaction.Transaction.AbortReason
import endless.transaction.example.algebra.Accounts.TransferFailure
import endless.transaction.example.helpers.RetryHelpers.RetryParameters
import endless.transaction.helpers.LogMessageAssertions
Expand Down Expand Up @@ -243,8 +244,8 @@ class TransferBranchSuite
}
}
)
assertIO(destinationBranch.abort(transferID), ()) >> assertIO(
originBranch.abort(transferID),
assertIO(destinationBranch.abort(transferID, AbortReason.Timeout), ()) >> assertIO(
originBranch.abort(transferID, AbortReason.Timeout),
()
) >> testLogger.assertLogsDebug
}
Expand Down Expand Up @@ -348,7 +349,7 @@ class TransferBranchSuite
}
)
_ <- assertIO(
branch.abort(transferID),
branch.abort(transferID, AbortReason.Timeout),
()
)
_ <- testLogger.assertLogsWarn
Expand Down
9 changes: 6 additions & 3 deletions lib/src/main/scala/endless/transaction/Branch.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package endless.transaction

import endless.transaction.Branch.Vote
import endless.transaction.Transaction.AbortReason

/** A branch defines behavior for the various phases of the 2PC protocol for a certain transaction
* type. The branch is responsible for preparing, committing and aborting the transaction. The
Expand Down Expand Up @@ -73,20 +74,22 @@ trait Branch[F[_], TID, Q, R] {
* aborted, however.
* @param id
* transaction id
* @param reason
* the reason for the abort
*/
def abort(id: TID): F[Unit]
def abort(id: TID, reason: AbortReason[R]): F[Unit]
}

object Branch {
def apply[F[_], TID, BID, Q, R](
prepareF: (TID, Q) => F[Vote[R]],
commitF: TID => F[Unit],
abortF: TID => F[Unit]
abortF: (TID, AbortReason[R]) => F[Unit]
): Branch[F, TID, Q, R] =
new Branch[F, TID, Q, R] {
def prepare(id: TID, query: Q): F[Vote[R]] = prepareF(id, query)
def commit(id: TID): F[Unit] = commitF(id)
def abort(id: TID): F[Unit] = abortF(id)
def abort(id: TID, reason: AbortReason[R]): F[Unit] = abortF(id, reason)
}

/** The vote of a branch in the 2PC protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private[transaction] final class TransactionSideEffect[
state: Aborting[TID, BID, Q, R]
)(branchID: BID, branch: Branch[F, TID, Q, R]) = {
branch
.abort(state.id)
.abort(state.id, state.reason)
.flatMap(_ =>
self
.branchAborted(branchID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import endless.core.entity.Effector.PassivationState
import endless.core.entity.SideEffect.Trigger
import endless.transaction.{Branch, Transaction}
import endless.transaction.Branch.Vote
import endless.transaction.Transaction.Status
import endless.transaction.Transaction.{AbortReason, Status}
import endless.transaction.impl.Generators
import endless.transaction.impl.algebra.{TransactionAlg, TransactionCreator}
import endless.transaction.impl.data.TransactionState
Expand Down Expand Up @@ -44,7 +44,7 @@ class TransactionSideEffectSuite
new TestBranch {
override def prepare(transactionID: TID, query: Q): IO[Vote[R]] = IO(Vote.Commit)
override def commit(transactionId: TID): IO[Unit] = IO.unit
override def abort(transactionID: TID): IO[Unit] = IO.unit
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] = IO.unit
}
)
)
Expand Down Expand Up @@ -78,7 +78,7 @@ class TransactionSideEffectSuite
new TestBranch {
override def prepare(transactionID: TID, query: Q): IO[Vote[R]] = IO(Vote.Commit)
override def commit(transactionId: TID): IO[Unit] = IO.unit
override def abort(transactionID: TID): IO[Unit] = IO.unit
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] = IO.unit
}
)
)
Expand Down Expand Up @@ -114,7 +114,7 @@ class TransactionSideEffectSuite
new TestBranch {
override def prepare(transactionID: TID, query: Q): IO[Vote[R]] = IO(Vote.Commit)
override def commit(transactionId: TID): IO[Unit] = IO.unit
override def abort(transactionID: TID): IO[Unit] = IO.unit
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] = IO.unit
}
)
)
Expand Down Expand Up @@ -425,8 +425,8 @@ class TransactionSideEffectSuite
neverTimeout,
(_: BID) =>
new TestBranch {
override def abort(transactionID: TID): IO[Unit] =
IO.unit
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] =
assertIOBoolean(IO(reason == aborting.reason))
}
)
)
Expand Down Expand Up @@ -460,7 +460,7 @@ class TransactionSideEffectSuite
neverTimeout,
(bid: BID) =>
new TestBranch {
override def abort(transactionID: TID): IO[Unit] =
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] =
if (bid == aborting.branches.head) shouldNotBeCalled else IO.unit
}
)
Expand Down Expand Up @@ -497,7 +497,8 @@ class TransactionSideEffectSuite
neverTimeout,
(_: BID) =>
new TestBranch {
override def abort(transactionID: TID): IO[Unit] = shouldNotBeCalled
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] =
shouldNotBeCalled
}
)
)
Expand Down Expand Up @@ -526,7 +527,7 @@ class TransactionSideEffectSuite
neverTimeout,
(_: BID) =>
new TestBranch {
override def abort(transactionID: TID): IO[Unit] =
override def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] =
IO.raiseError(new Exception("boom"))
}
)
Expand Down Expand Up @@ -599,7 +600,7 @@ class TransactionSideEffectSuite
trait TestBranch extends Branch[IO, TID, Q, R] {
def prepare(transactionID: TID, query: Q): IO[Vote[R]] = shouldNotBeCalled
def commit(transactionId: TID): IO[Unit] = shouldNotBeCalled
def abort(transactionID: TID): IO[Unit] = shouldNotBeCalled
def abort(transactionID: TID, reason: AbortReason[R]): IO[Unit] = shouldNotBeCalled
}

trait SelfEntity extends TransactionAlg[IO, TID, BID, Q, R] {
Expand Down

0 comments on commit cc294af

Please sign in to comment.