Skip to content

Commit

Permalink
feat: Sync API (#39)
Browse files Browse the repository at this point in the history
* feat: sync 호출을 지원한다

* docs: README에 동기 지원 방식을 추가한다

* docs: README code의 컨벤션 오류를 수정한다

* docs: 분산트랜잭션 동작과정 사진을 330 -> 365 로 증가시킨다

* docs: 분산트랜잭션 사진을 365 -> 360 으로 변경한다

* �docs: version update 0.1.9 to 0.2.0
  • Loading branch information
devxb authored Feb 25, 2024
1 parent 63b0d95 commit e9c5fbb
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 16 deletions.
40 changes: 37 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

> Distributed transaction library based on Choreography
<img src = "https://github.com/rooftop-MSA/Netx/assets/62425964/08ed9050-1923-42b5-803f-5b7ea37a263f" width="330" align="right"/>
<img src = "https://github.com/rooftop-MSA/Netx/assets/62425964/08ed9050-1923-42b5-803f-5b7ea37a263f" width="360" align="right"/>

<br>

![version 0.1.9](https://img.shields.io/badge/version-0.1.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![version 0.2.0](https://img.shields.io/badge/version-0.2.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Choreography 방식으로 구현된 분산 트랜잭션 라이브러리 입니다.
`Netx` 는 다음 기능을 제공합니다.

1. [Reactor](https://projectreactor.io/) 기반의 완전한 비동기 트랜잭션 관리
2. 처리되지 않은 트랜잭션을 찾아 자동으로 재실행
3. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
4. `At Least Once` 방식의 메시지 전달 보장
5. 처리되지 않은 메시지를 자동으로 재실행
5. 비동기 API와 동기 API 지원

## How to use

Expand Down Expand Up @@ -56,6 +57,20 @@ class Application {
#### Scenario1. Start pay transaction

```kotlin
// Sync
fun pay(param: Any): Any {
val transactionId = transactionManager.syncStart("paid=1000") // start transaction

runCatching { // This is kotlin try catch, not netx library spec
// Do your bussiness logic
}.fold(
onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction
onFailure = { transactionManager.syncRollback(transactionId, it.message) } // rollback transaction
)
}


// Async
fun pay(param: Any): Mono<Any> {
return transactionManager.start("paid=1000") // Start distributed transaction and publish transaction start event
.flatMap { transactionId ->
Expand All @@ -75,6 +90,19 @@ fun pay(param: Any): Mono<Any> {
#### Scenario2. Join order transaction

```kotlin
//Sync
fun order(param: Any): Any {
val transactionId = transactionManager.syncJoin(param.transactionId, "orderId=1:state=PENDING") // join transaction

runCatching { // This is kotlin try catch, not netx library spec
// Do your bussiness logic
}.fold(
onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction
onFailure = { transactionManager.syncRollback(transactionId, it.message) } // rollback transaction
)
}

// Async
fun order(param: Any): Mono<Any> {
return transactionManager.join(
param.transactionId,
Expand All @@ -94,6 +122,12 @@ fun order(param: Any): Mono<Any> {
#### Scenario3. Check exists transaction

```kotlin
// Sync
fun exists(param: Any): Any {
return transactionManager.syncExists(param.transactionId)
}

// Async
fun exists(param: Any): Mono<Any> {
return transactionManager.exists(param.transactionId) // Find any transaction has ever been started
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ interface TransactionManager {

fun start(undo: String): Mono<String>

fun syncStart(undo: String): String

fun join(transactionId: String, undo: String): Mono<String>

fun syncJoin(transactionId: String, undo: String): String

fun exists(transactionId: String): Mono<String>

fun syncExists(transactionId: String): String

fun commit(transactionId: String): Mono<String>

fun syncCommit(transactionId: String): String

fun rollback(transactionId: String, cause: String): Mono<String>

fun syncRollback(transactionId: String, cause: String): String

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,30 @@ abstract class AbstractTransactionManager(
private val transactionIdGenerator: TransactionIdGenerator = TransactionIdGenerator(nodeId),
) : TransactionManager {

override fun syncStart(undo: String): String {
return start(undo).block() ?: error("Cannot start transaction")
}

override fun syncJoin(transactionId: String, undo: String): String {
return join(transactionId, undo).block()
?: error("Cannot join transaction \"$transactionId\", \"$undo\"")
}

override fun syncExists(transactionId: String): String {
return exists(transactionId).block()
?: error("Cannot exists transaction \"$transactionId\"")
}

override fun syncCommit(transactionId: String): String {
return commit(transactionId).block()
?: error("Cannot commit transaction \"$transactionId\"")
}

override fun syncRollback(transactionId: String, cause: String): String {
return rollback(transactionId, cause).block()
?: error("Cannot rollback transaction \"$transactionId\", \"$cause\"")
}

final override fun start(undo: String): Mono<String> {
return startTransaction(undo)
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.rooftop.netx.redis

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.assertions.throwables.shouldThrowMessage
import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.shouldBe
import org.rooftop.netx.api.*
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.springframework.test.context.ContextConfiguration
Expand Down Expand Up @@ -32,9 +34,9 @@ internal class RedisStreamTransactionManagerTest(
}

describe("start 메소드는") {
context("replay 를 입력받으면,") {
context("UNDO 를 입력받으면,") {
it("트랜잭션을 시작하고 transaction-id를 반환한다.") {
transactionManager.start(REPLAY).subscribe()
transactionManager.start(UNDO).subscribe()

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(1)
Expand All @@ -45,8 +47,33 @@ internal class RedisStreamTransactionManagerTest(

context("서로 다른 id의 트랜잭션이 여러번 시작되어도") {
it("모두 읽을 수 있다.") {
transactionManager.start(REPLAY).block()
transactionManager.start(REPLAY).block()
transactionManager.start(UNDO).block()
transactionManager.start(UNDO).block()

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(2)
noPublisherTransactionHandlerAssertions.startCountShouldBe(2)
}
}
}
}

describe("syncStart 메소드는") {
context("UNDO 를 입력받으면,") {
it("트랜잭션을 시작하고 transaction-id를 반환한다.") {
transactionManager.syncStart(UNDO)

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(1)
noPublisherTransactionHandlerAssertions.startCountShouldBe(1)
}
}
}

context("서로 다른 id의 트랜잭션이 여러번 시작되어도") {
it("모두 읽을 수 있다.") {
transactionManager.syncStart(UNDO)
transactionManager.syncStart(UNDO)

eventually(5.seconds) {
monoTransactionHandlerAssertions.startCountShouldBe(2)
Expand All @@ -58,10 +85,10 @@ internal class RedisStreamTransactionManagerTest(

describe("join 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("트랜잭션에 참여한다.") {
transactionManager.join(transactionId, REPLAY).subscribe()
transactionManager.join(transactionId, UNDO).subscribe()

eventually(5.seconds) {
monoTransactionHandlerAssertions.joinCountShouldBe(1)
Expand All @@ -72,17 +99,40 @@ internal class RedisStreamTransactionManagerTest(

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
val result = transactionManager.join(NOT_EXIST_TX_ID, REPLAY)
val result = transactionManager.join(NOT_EXIST_TX_ID, UNDO)

StepVerifier.create(result)
.verifyErrorMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"")
}
}
}

describe("syncJoin 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("트랜잭션에 참여한다.") {
transactionManager.syncJoin(transactionId, UNDO)

eventually(5.seconds) {
monoTransactionHandlerAssertions.joinCountShouldBe(1)
noPublisherTransactionHandlerAssertions.joinCountShouldBe(1)
}
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncJoin(NOT_EXIST_TX_ID, UNDO)
}
}
}
}

describe("exists 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("트랜잭션 id를 반환한다.") {
val result = transactionManager.exists(transactionId)
Expand All @@ -103,9 +153,29 @@ internal class RedisStreamTransactionManagerTest(
}
}

describe("syncExists 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("트랜잭션 id를 반환한다.") {
val result = transactionManager.syncExists(transactionId)

result shouldBe transactionId
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncExists(NOT_EXIST_TX_ID)
}
}
}
}

describe("commit 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("commit 메시지를 publish 한다") {
transactionManager.commit(transactionId).block()
Expand All @@ -127,12 +197,35 @@ internal class RedisStreamTransactionManagerTest(
}
}

describe("syncCommit 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("commit 메시지를 publish 한다") {
transactionManager.syncCommit(transactionId)

eventually(5.seconds) {
monoTransactionHandlerAssertions.commitCountShouldBe(1)
noPublisherTransactionHandlerAssertions.commitCountShouldBe(1)
}
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncCommit(NOT_EXIST_TX_ID)
}
}
}
}

describe("rollback 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.start(REPLAY).block()!!
val transactionId = transactionManager.start(UNDO).block()!!

it("rollback 메시지를 publish 한다") {
transactionManager.rollback(transactionId, "rollback occured for test").block()
transactionManager.rollback(transactionId, "rollback for test").block()

eventually(5.seconds) {
monoTransactionHandlerAssertions.rollbackCountShouldBe(1)
Expand All @@ -143,17 +236,40 @@ internal class RedisStreamTransactionManagerTest(

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
val result = transactionManager.commit(NOT_EXIST_TX_ID)
val result = transactionManager.rollback(NOT_EXIST_TX_ID, "rollback for test")

StepVerifier.create(result)
.verifyErrorMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"")
}
}
}

describe("syncRollback 메소드는") {
context("존재하는 transactionId를 입력받으면,") {
val transactionId = transactionManager.syncStart(UNDO)

it("rollback 메시지를 publish 한다") {
transactionManager.syncRollback(transactionId, "rollback for test")

eventually(5.seconds) {
monoTransactionHandlerAssertions.rollbackCountShouldBe(1)
noPublisherTransactionHandlerAssertions.rollbackCountShouldBe(1)
}
}
}

context("존재하지 않는 transactionId를 입력받으면,") {
it("IllegalStateException 을 던진다.") {
shouldThrowMessage("Cannot find exists transaction id \"$NOT_EXIST_TX_ID\"") {
transactionManager.syncRollback(NOT_EXIST_TX_ID, "rollback for test")
}
}
}
}
}) {

private companion object {
private const val REPLAY = "REPLAY"
private const val UNDO = "UNDO"
private const val NOT_EXIST_TX_ID = "NOT_EXISTS_TX_ID"
}
}

0 comments on commit e9c5fbb

Please sign in to comment.