Skip to content

Commit

Permalink
perf: Supprot backpressure and & auto restart transaciton listener (#42)
Browse files Browse the repository at this point in the history
* perf: supprot Backpressure and & auto restart transaciton listener

* docs: netx version 0.2.0 to 0.2.1

* perf: boundElastic 과 parallel 사용을 수정한다
  • Loading branch information
devxb authored Feb 26, 2024
1 parent e9c5fbb commit 0d39cd9
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 52 deletions.
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@

<br>

![version 0.2.0](https://img.shields.io/badge/version-0.2.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![version 0.2.1](https://img.shields.io/badge/version-0.2.1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Choreography 방식으로 구현된 분산 트랜잭션 라이브러리 입니다.
`Netx` 는 다음 기능을 제공합니다.

1. [Reactor](https://projectreactor.io/) 기반의 완전한 비동기 트랜잭션 관리
2. 처리되지 않은 트랜잭션을 찾아 자동으로 재실행
3. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
4. `At Least Once` 방식의 메시지 전달 보장
5. 비동기 API와 동기 API 지원
3. Backpressure 지원으로 노드별 처리가능한 트랜잭션 수 조절
4. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
5. `At Least Once` 방식의 메시지 전달 보장
6. 비동기 API와 동기 API 지원

## How to use

Expand All @@ -41,16 +42,17 @@ class Application {

#### Properties

| key | example | description |
|-------------------------|---------|---------------------------------------------------------------------------------------------------------------------------|
| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. |
| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) |
| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. |
| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. |
| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ |
| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. |
| **netx.recovery-milli** | 60000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. 기본값은 60000(60초) 입니다. |
| **netx.orphan-milli** | 10000 | 트랜잭션이 PENDING 상태가 되었지만 orphan-milli가 지나도 ACK 상태가 되지 않는경우 다른 노드에게 처리를 위임합니다. 기본값은 10000(10초) 입니다. |
| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
|-------------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------|----|
| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
| **netx.recovery-milli** | 60000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 60000 |
| **netx.orphan-milli** | 10000 | 트랜잭션이 PENDING 상태가 되었지만 orphan-milli가 지나도 ACK 상태가 되지 않는경우 다른 노드에게 처리를 위임합니다. | 10000 |
| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. | 40 |

### Usage example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ abstract class AbstractTransactionDispatcher {
.flatMap { dispatchToNotPublisherHandler(transaction) }
.doOnComplete {
ack(transaction, messageId)
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.parallel())
.subscribe()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package org.rooftop.netx.engine

import org.rooftop.netx.idl.Transaction
import reactor.core.publisher.BufferOverflowStrategy
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

abstract class AbstractTransactionListener(
private val backpressureSize: Int,
private val transactionDispatcher: AbstractTransactionDispatcher,
) {

fun subscribeStream() {
receive()
.publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer(backpressureSize, BufferOverflowStrategy.DROP_LATEST)
.flatMap { (transaction, messageId) ->
transactionDispatcher.dispatch(transaction, messageId)
.map { transaction to messageId }
.onErrorResume { Mono.empty() }
}
.subscribeOn(Schedulers.parallel())
.restartWhenTerminated()
.subscribe()
}

protected abstract fun receive(): Flux<Pair<Transaction, String>>

private fun <T> Flux<T>.restartWhenTerminated(): Flux<T> {
return this.doOnTerminate {
subscribeStream()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration

abstract class AbstractTransactionRetrySupporter(
recoveryMilli: Long,
private val backpressureSize: Int,
private val recoveryMilli: Long,
) {

init {
fun handleLostTransactions() {
Flux.interval(recoveryMilli.milliseconds.toJavaDuration())
.flatMap {
handleOrphanTransaction()
handleOrphanTransaction(backpressureSize)
.onErrorResume { Mono.empty() }
}
.subscribeOn(Schedulers.parallel())
.subscribeOn(Schedulers.boundedElastic())
.restartWhenTerminated()
.subscribe()
}

protected abstract fun handleOrphanTransaction(): Flux<Pair<Transaction, String>>
protected abstract fun handleOrphanTransaction(backpressureSize: Int): Flux<Pair<Transaction, String>>

private fun <T> Flux<T>.restartWhenTerminated(): Flux<T> {
return this.doOnTerminate {
handleLostTransactions()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.stream.StreamReceiver
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import kotlin.time.Duration.Companion.hours
import kotlin.time.toJavaDuration

class RedisStreamTransactionListener(
backpressureSize: Int,
transactionDispatcher: AbstractTransactionDispatcher,
connectionFactory: ReactiveRedisConnectionFactory,
private val nodeGroup: String,
private val nodeName: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate<String, ByteArray>,
) : AbstractTransactionListener(transactionDispatcher) {
) : AbstractTransactionListener(backpressureSize, transactionDispatcher) {

private val options = StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(1.hours.toJavaDuration())
Expand All @@ -36,8 +36,7 @@ class RedisStreamTransactionListener(
receiver.receive(
Consumer.from(nodeGroup, nodeName),
StreamOffset.create(STREAM_KEY, ReadOffset.from(">"))
).publishOn(Schedulers.parallel())
.map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
).map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class RedisTransactionConfigurer(
@Value("\${netx.node-name}") private val nodeName: String,
@Value("\${netx.recovery-milli:60000}") private val recoveryMilli: Long,
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
@Value("\${netx.backpressure:40}") private val backpressureSize: Int,
private val applicationContext: ApplicationContext,
) {

Expand All @@ -41,6 +42,7 @@ class RedisTransactionConfigurer(
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun redisStreamTransactionListener(): RedisStreamTransactionListener =
RedisStreamTransactionListener(
backpressureSize = backpressureSize,
transactionDispatcher = redisStreamTransactionDispatcher(),
connectionFactory = reactiveRedisConnectionFactory(),
nodeGroup = nodeGroup,
Expand All @@ -59,7 +61,8 @@ class RedisTransactionConfigurer(
transactionDispatcher = redisStreamTransactionDispatcher(),
orphanMilli = orphanMilli,
recoveryMilli = recoveryMilli,
)
backpressureSize = backpressureSize,
).also { it.handleLostTransactions() }

@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,28 @@ import java.util.concurrent.TimeUnit

class RedisTransactionRetrySupporter(
recoveryMilli: Long,
backpressureSize: Int,
private val nodeGroup: String,
private val nodeName: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate<String, ByteArray>,
private val redissonReactiveClient: RedissonReactiveClient,
private val transactionDispatcher: AbstractTransactionDispatcher,
private val orphanMilli: Long,
private val lockKey: String = "$nodeGroup-key",
) : AbstractTransactionRetrySupporter(recoveryMilli) {
) : AbstractTransactionRetrySupporter(backpressureSize, recoveryMilli) {

override fun handleOrphanTransaction(): Flux<Pair<Transaction, String>> {
return claimTransactions()
.publishOn(Schedulers.parallel())
override fun handleOrphanTransaction(backpressureSize: Int): Flux<Pair<Transaction, String>> {
return claimTransactions(backpressureSize)
.publishOn(Schedulers.boundedElastic())
.flatMap { (transaction, messageId) ->
transactionDispatcher.dispatch(transaction, messageId)
.map { transaction to messageId }
}
}

private fun claimTransactions(): Flux<Pair<Transaction, String>> {
private fun claimTransactions(backpressureSize: Int): Flux<Pair<Transaction, String>> {
return reactiveRedisTemplate.opsForStream<String, String>()
.pending(STREAM_KEY, nodeGroup, Range.closed("-", "+"), Long.MAX_VALUE)
.pending(STREAM_KEY, nodeGroup, Range.closed("-", "+"), backpressureSize.toLong())
.filter { it.get().toList().isNotEmpty() }
.flatMap { pendingMessage ->
redissonReactiveClient.getLock(lockKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import io.kotest.data.row
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.rooftop.netx.redis.RedisContainer
import org.springframework.boot.test.context.SpringBootTest
import reactor.core.publisher.Hooks
import kotlin.time.Duration.Companion.minutes

@DisplayName("Netx 테스트의")
@DisplayName("Netx 부하테스트")
@SpringBootTest(
classes = [
RedisContainer::class,
Expand All @@ -21,7 +20,7 @@ import kotlin.time.Duration.Companion.minutes
]
)
@EnableDistributedTransaction
internal class NetxTest(
internal class NetxLoadTest(
private val netxClient: NetxClient,
private val loadRunner: LoadRunner,
private val transactionReceiveStorage: TransactionReceiveStorage,
Expand All @@ -33,6 +32,7 @@ internal class NetxTest(
row(10, 10),
row(100, 100),
row(1_000, 1_000),
row(10_000, 10_000)
) { commitLoadCount, rollbackLoadCount ->
transactionReceiveStorage.clear()

Expand All @@ -49,10 +49,10 @@ internal class NetxTest(
}

eventually(30.minutes) {
transactionReceiveStorage.startCountShouldBe(commitLoadCount + rollbackLoadCount)
transactionReceiveStorage.joinCountShouldBe(commitLoadCount + rollbackLoadCount)
transactionReceiveStorage.commitCountShouldBe(commitLoadCount)
transactionReceiveStorage.rollbackCountShouldBe(rollbackLoadCount)
transactionReceiveStorage.startCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
transactionReceiveStorage.joinCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
transactionReceiveStorage.commitCountShouldBeGreaterThanOrEqual(commitLoadCount)
transactionReceiveStorage.rollbackCountShouldBeGreaterThanOrEqual(rollbackLoadCount)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,61 @@
package org.rooftop.netx.client

import io.kotest.matchers.shouldBe
import io.kotest.matchers.ints.shouldBeGreaterThanOrEqual
import org.rooftop.netx.api.*
import org.rooftop.netx.meta.TransactionHandler
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

@TransactionHandler
class TransactionReceiveStorage(
private val storage: MutableMap<String, MutableList<TransactionEvent>>,
private val storage: ConcurrentMap<String, MutableList<TransactionEvent>> = ConcurrentHashMap(),
) {

private val logger = LoggerFactory.getLogger(this::class.simpleName)

fun clear() {
storage.clear()
}

fun joinCountShouldBe(count: Int) {
(storage["JOIN"]?.size ?: 0) shouldBe count
fun joinCountShouldBeGreaterThanOrEqual(count: Int) {
(storage["JOIN"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}

fun startCountShouldBe(count: Int) {
(storage["START"]?.size ?: 0) shouldBe count
fun startCountShouldBeGreaterThanOrEqual(count: Int) {
(storage["START"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}

fun commitCountShouldBe(count: Int) {
(storage["COMMIT"]?.size ?: 0) shouldBe count
fun commitCountShouldBeGreaterThanOrEqual(count: Int) {
(storage["COMMIT"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}

fun rollbackCountShouldBe(count: Int) {
(storage["ROLLBACK"]?.size ?: 0) shouldBe count
fun rollbackCountShouldBeGreaterThanOrEqual(count: Int) {
(storage["ROLLBACK"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}

@TransactionRollbackHandler
fun logRollback(transaction: TransactionRollbackEvent): Mono<Unit> {
logger.info("Receive transaction RollbackEvent ${storage["ROLLBACK"]?.size ?: 0} ")
return Mono.fromCallable { log("ROLLBACK", transaction) }
}

@TransactionStartHandler
fun logStart(transaction: TransactionStartEvent): Mono<Unit> {
logger.info("Receive transaction StartEvent ${storage["START"]?.size ?: 0} ")
return Mono.fromCallable { log("START", transaction) }
}

@TransactionJoinHandler
fun logJoin(transaction: TransactionJoinEvent): Mono<Unit> {
logger.info("Receive transaction JoinEvent ${storage["JOIN"]?.size ?: 0} ")
return Mono.fromCallable { log("JOIN", transaction) }
}

@TransactionCommitHandler
fun logCommit(transaction: TransactionCommitEvent): Mono<Unit> {
logger.info("Receive transaction CommitEvent ${storage["COMMIT"]?.size ?: 0} ")
return Mono.fromCallable { log("COMMIT", transaction) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class NoAckRedisTransactionConfigurer(
@Value("\${netx.node-name}") private val nodeName: String,
@Value("\${netx.recovery-milli:60000}") private val recoveryMilli: Long,
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
@Value("\${netx.backpressure:10}") private val backpressureSize: Int,
private val applicationContext: ApplicationContext,
) {

Expand All @@ -41,6 +42,7 @@ class NoAckRedisTransactionConfigurer(
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun redisStreamTransactionListener(): RedisStreamTransactionListener =
RedisStreamTransactionListener(
backpressureSize = backpressureSize,
transactionDispatcher = noAckRedisStreamTransactionDispatcher(),
connectionFactory = reactiveRedisConnectionFactory(),
nodeGroup = nodeGroup,
Expand Down Expand Up @@ -68,7 +70,8 @@ class NoAckRedisTransactionConfigurer(
transactionDispatcher = redisStreamTransactionDispatcher(),
orphanMilli = orphanMilli,
recoveryMilli = recoveryMilli,
)
backpressureSize = backpressureSize,
).also { it.handleLostTransactions() }

@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.kotest.core.spec.style.DescribeSpec
import org.rooftop.netx.api.TransactionManager
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

@ContextConfiguration(
Expand Down Expand Up @@ -36,7 +37,7 @@ internal class RedisTransactionRetrySupporterTest(
it("해당 트랜잭션을 찾아서 처리하고, ack 상태로 변경한다.") {
val transactionId = transactionManager.start("undo").block()!!

eventually(10.seconds) {
eventually(1.minutes) {
noPublisherTransactionHandlerAssertions.startCountShouldBe(1)
monoTransactionHandlerAssertions.startCountShouldBe(1)

Expand Down
Loading

0 comments on commit 0d39cd9

Please sign in to comment.