diff --git a/README.md b/README.md index da0e73e..bcfbf54 100644 --- a/README.md +++ b/README.md @@ -2,20 +2,21 @@ > Distributed transaction library based on Choreography - +
-![version 0.1.9](https://img.shields.io/badge/version-0.1.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) +![version 0.2.0](https://img.shields.io/badge/version-0.2.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white) Choreography 방식으로 구현된 분산 트랜잭션 라이브러리 입니다. `Netx` 는 다음 기능을 제공합니다. 1. [Reactor](https://projectreactor.io/) 기반의 완전한 비동기 트랜잭션 관리 +2. 처리되지 않은 트랜잭션을 찾아 자동으로 재실행 3. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지 4. `At Least Once` 방식의 메시지 전달 보장 -5. 처리되지 않은 메시지를 자동으로 재실행 +5. 비동기 API와 동기 API 지원 ## How to use @@ -56,6 +57,20 @@ class Application { #### Scenario1. Start pay transaction ```kotlin +// Sync +fun pay(param: Any): Any { + val transactionId = transactionManager.syncStart("paid=1000") // start transaction + + runCatching { // This is kotlin try catch, not netx library spec + // Do your bussiness logic + }.fold( + onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction + onFailure = { transactionManager.syncRollback(transactionId, it.message) } // rollback transaction + ) +} + + +// Async fun pay(param: Any): Mono { return transactionManager.start("paid=1000") // Start distributed transaction and publish transaction start event .flatMap { transactionId -> @@ -75,6 +90,19 @@ fun pay(param: Any): Mono { #### Scenario2. Join order transaction ```kotlin +//Sync +fun order(param: Any): Any { + val transactionId = transactionManager.syncJoin(param.transactionId, "orderId=1:state=PENDING") // join transaction + + runCatching { // This is kotlin try catch, not netx library spec + // Do your bussiness logic + }.fold( + onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction + onFailure = { transactionManager.syncRollback(transactionId, it.message) } // rollback transaction + ) +} + +// Async fun order(param: Any): Mono { return transactionManager.join( param.transactionId, @@ -94,6 +122,12 @@ fun order(param: Any): Mono { #### Scenario3. Check exists transaction ```kotlin +// Sync +fun exists(param: Any): Any { + return transactionManager.syncExists(param.transactionId) +} + +// Async fun exists(param: Any): Mono { return transactionManager.exists(param.transactionId) // Find any transaction has ever been started } diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt index cc0b9d2..a0a5b77 100644 --- a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt +++ b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt @@ -6,12 +6,22 @@ interface TransactionManager { fun start(undo: String): Mono + fun syncStart(undo: String): String + fun join(transactionId: String, undo: String): Mono + fun syncJoin(transactionId: String, undo: String): String + fun exists(transactionId: String): Mono + fun syncExists(transactionId: String): String + fun commit(transactionId: String): Mono + fun syncCommit(transactionId: String): String + fun rollback(transactionId: String, cause: String): Mono + fun syncRollback(transactionId: String, cause: String): String + } diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt index a8737de..1106e7c 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt @@ -13,6 +13,30 @@ abstract class AbstractTransactionManager( private val transactionIdGenerator: TransactionIdGenerator = TransactionIdGenerator(nodeId), ) : TransactionManager { + override fun syncStart(undo: String): String { + return start(undo).block() ?: error("Cannot start transaction") + } + + override fun syncJoin(transactionId: String, undo: String): String { + return join(transactionId, undo).block() + ?: error("Cannot join transaction \"$transactionId\", \"$undo\"") + } + + override fun syncExists(transactionId: String): String { + return exists(transactionId).block() + ?: error("Cannot exists transaction \"$transactionId\"") + } + + override fun syncCommit(transactionId: String): String { + return commit(transactionId).block() + ?: error("Cannot commit transaction \"$transactionId\"") + } + + override fun syncRollback(transactionId: String, cause: String): String { + return rollback(transactionId, cause).block() + ?: error("Cannot rollback transaction \"$transactionId\", \"$cause\"") + } + final override fun start(undo: String): Mono { return startTransaction(undo) .contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) } diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt index c1ed219..c38b250 100644 --- a/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt +++ b/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt @@ -1,8 +1,10 @@ package org.rooftop.netx.redis import io.kotest.assertions.nondeterministic.eventually +import io.kotest.assertions.throwables.shouldThrowMessage import io.kotest.core.annotation.DisplayName import io.kotest.core.spec.style.DescribeSpec +import io.kotest.matchers.shouldBe import org.rooftop.netx.api.* import org.rooftop.netx.meta.EnableDistributedTransaction import org.springframework.test.context.ContextConfiguration @@ -32,9 +34,9 @@ internal class RedisStreamTransactionManagerTest( } describe("start 메소드는") { - context("replay 를 입력받으면,") { + context("UNDO 를 입력받으면,") { it("트랜잭션을 시작하고 transaction-id를 반환한다.") { - transactionManager.start(REPLAY).subscribe() + transactionManager.start(UNDO).subscribe() eventually(5.seconds) { monoTransactionHandlerAssertions.startCountShouldBe(1) @@ -45,8 +47,33 @@ internal class RedisStreamTransactionManagerTest( context("서로 다른 id의 트랜잭션이 여러번 시작되어도") { it("모두 읽을 수 있다.") { - transactionManager.start(REPLAY).block() - transactionManager.start(REPLAY).block() + transactionManager.start(UNDO).block() + transactionManager.start(UNDO).block() + + eventually(5.seconds) { + monoTransactionHandlerAssertions.startCountShouldBe(2) + noPublisherTransactionHandlerAssertions.startCountShouldBe(2) + } + } + } + } + + describe("syncStart 메소드는") { + context("UNDO 를 입력받으면,") { + it("트랜잭션을 시작하고 transaction-id를 반환한다.") { + transactionManager.syncStart(UNDO) + + eventually(5.seconds) { + monoTransactionHandlerAssertions.startCountShouldBe(1) + noPublisherTransactionHandlerAssertions.startCountShouldBe(1) + } + } + } + + context("서로 다른 id의 트랜잭션이 여러번 시작되어도") { + it("모두 읽을 수 있다.") { + transactionManager.syncStart(UNDO) + transactionManager.syncStart(UNDO) eventually(5.seconds) { monoTransactionHandlerAssertions.startCountShouldBe(2) @@ -58,10 +85,10 @@ internal class RedisStreamTransactionManagerTest( describe("join 메소드는") { context("존재하는 transactionId를 입력받으면,") { - val transactionId = transactionManager.start(REPLAY).block()!! + val transactionId = transactionManager.start(UNDO).block()!! it("트랜잭션에 참여한다.") { - transactionManager.join(transactionId, REPLAY).subscribe() + transactionManager.join(transactionId, UNDO).subscribe() eventually(5.seconds) { monoTransactionHandlerAssertions.joinCountShouldBe(1) @@ -72,7 +99,7 @@ internal class RedisStreamTransactionManagerTest( context("존재하지 않는 transactionId를 입력받으면,") { it("IllegalStateException 을 던진다.") { - val result = transactionManager.join(NOT_EXIST_TX_ID, REPLAY) + val result = transactionManager.join(NOT_EXIST_TX_ID, UNDO) StepVerifier.create(result) .verifyErrorMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") @@ -80,9 +107,32 @@ internal class RedisStreamTransactionManagerTest( } } + describe("syncJoin 메소드는") { + context("존재하는 transactionId를 입력받으면,") { + val transactionId = transactionManager.syncStart(UNDO) + + it("트랜잭션에 참여한다.") { + transactionManager.syncJoin(transactionId, UNDO) + + eventually(5.seconds) { + monoTransactionHandlerAssertions.joinCountShouldBe(1) + noPublisherTransactionHandlerAssertions.joinCountShouldBe(1) + } + } + } + + context("존재하지 않는 transactionId를 입력받으면,") { + it("IllegalStateException 을 던진다.") { + shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") { + transactionManager.syncJoin(NOT_EXIST_TX_ID, UNDO) + } + } + } + } + describe("exists 메소드는") { context("존재하는 transactionId를 입력받으면,") { - val transactionId = transactionManager.start(REPLAY).block()!! + val transactionId = transactionManager.start(UNDO).block()!! it("트랜잭션 id를 반환한다.") { val result = transactionManager.exists(transactionId) @@ -103,9 +153,29 @@ internal class RedisStreamTransactionManagerTest( } } + describe("syncExists 메소드는") { + context("존재하는 transactionId를 입력받으면,") { + val transactionId = transactionManager.syncStart(UNDO) + + it("트랜잭션 id를 반환한다.") { + val result = transactionManager.syncExists(transactionId) + + result shouldBe transactionId + } + } + + context("존재하지 않는 transactionId를 입력받으면,") { + it("IllegalStateException 을 던진다.") { + shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") { + transactionManager.syncExists(NOT_EXIST_TX_ID) + } + } + } + } + describe("commit 메소드는") { context("존재하는 transactionId를 입력받으면,") { - val transactionId = transactionManager.start(REPLAY).block()!! + val transactionId = transactionManager.start(UNDO).block()!! it("commit 메시지를 publish 한다") { transactionManager.commit(transactionId).block() @@ -127,12 +197,35 @@ internal class RedisStreamTransactionManagerTest( } } + describe("syncCommit 메소드는") { + context("존재하는 transactionId를 입력받으면,") { + val transactionId = transactionManager.syncStart(UNDO) + + it("commit 메시지를 publish 한다") { + transactionManager.syncCommit(transactionId) + + eventually(5.seconds) { + monoTransactionHandlerAssertions.commitCountShouldBe(1) + noPublisherTransactionHandlerAssertions.commitCountShouldBe(1) + } + } + } + + context("존재하지 않는 transactionId를 입력받으면,") { + it("IllegalStateException 을 던진다.") { + shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") { + transactionManager.syncCommit(NOT_EXIST_TX_ID) + } + } + } + } + describe("rollback 메소드는") { context("존재하는 transactionId를 입력받으면,") { - val transactionId = transactionManager.start(REPLAY).block()!! + val transactionId = transactionManager.start(UNDO).block()!! it("rollback 메시지를 publish 한다") { - transactionManager.rollback(transactionId, "rollback occured for test").block() + transactionManager.rollback(transactionId, "rollback for test").block() eventually(5.seconds) { monoTransactionHandlerAssertions.rollbackCountShouldBe(1) @@ -143,17 +236,40 @@ internal class RedisStreamTransactionManagerTest( context("존재하지 않는 transactionId를 입력받으면,") { it("IllegalStateException 을 던진다.") { - val result = transactionManager.commit(NOT_EXIST_TX_ID) + val result = transactionManager.rollback(NOT_EXIST_TX_ID, "rollback for test") StepVerifier.create(result) .verifyErrorMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") } } } + + describe("syncRollback 메소드는") { + context("존재하는 transactionId를 입력받으면,") { + val transactionId = transactionManager.syncStart(UNDO) + + it("rollback 메시지를 publish 한다") { + transactionManager.syncRollback(transactionId, "rollback for test") + + eventually(5.seconds) { + monoTransactionHandlerAssertions.rollbackCountShouldBe(1) + noPublisherTransactionHandlerAssertions.rollbackCountShouldBe(1) + } + } + } + + context("존재하지 않는 transactionId를 입력받으면,") { + it("IllegalStateException 을 던진다.") { + shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") { + transactionManager.syncRollback(NOT_EXIST_TX_ID, "rollback for test") + } + } + } + } }) { private companion object { - private const val REPLAY = "REPLAY" + private const val UNDO = "UNDO" private const val NOT_EXIST_TX_ID = "NOT_EXISTS_TX_ID" } }