From 0d39cd970d095649629b3f0322d7a77fb8c0e76c Mon Sep 17 00:00:00 2001
From: xb205 <62425964+devxb@users.noreply.github.com>
Date: Mon, 26 Feb 2024 13:39:08 +0900
Subject: [PATCH] perf: Supprot backpressure and & auto restart transaciton
listener (#42)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* perf: supprot Backpressure and & auto restart transaciton listener
* docs: netx version 0.2.0 to 0.2.1
* perf: boundElastic 과 parallel 사용을 수정한다
---
README.md | 30 ++++++++++---------
.../engine/AbstractTransactionDispatcher.kt | 2 +-
.../engine/AbstractTransactionListener.kt | 14 ++++++++-
.../AbstractTransactionRetrySupporter.kt | 18 +++++++----
.../redis/RedisStreamTransactionListener.kt | 7 ++---
.../netx/redis/RedisTransactionConfigurer.kt | 5 +++-
.../redis/RedisTransactionRetrySupporter.kt | 13 ++++----
.../client/{NetxTest.kt => NetxLoadTest.kt} | 14 ++++-----
.../netx/client/TransactionReceiveStorage.kt | 29 +++++++++++-------
.../redis/NoAckRedisTransactionConfigurer.kt | 5 +++-
.../RedisTransactionRetrySupporterTest.kt | 3 +-
src/test/resources/application.properties | 3 +-
12 files changed, 91 insertions(+), 52 deletions(-)
rename src/test/kotlin/org/rooftop/netx/client/{NetxTest.kt => NetxLoadTest.kt} (76%)
diff --git a/README.md b/README.md
index bcfbf54..24d5641 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
-![version 0.2.0](https://img.shields.io/badge/version-0.2.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
+![version 0.2.1](https://img.shields.io/badge/version-0.2.1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)
Choreography 방식으로 구현된 분산 트랜잭션 라이브러리 입니다.
@@ -14,9 +14,10 @@ Choreography 방식으로 구현된 분산 트랜잭션 라이브러리 입니
1. [Reactor](https://projectreactor.io/) 기반의 완전한 비동기 트랜잭션 관리
2. 처리되지 않은 트랜잭션을 찾아 자동으로 재실행
-3. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
-4. `At Least Once` 방식의 메시지 전달 보장
-5. 비동기 API와 동기 API 지원
+3. Backpressure 지원으로 노드별 처리가능한 트랜잭션 수 조절
+4. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
+5. `At Least Once` 방식의 메시지 전달 보장
+6. 비동기 API와 동기 API 지원
## How to use
@@ -41,16 +42,17 @@ class Application {
#### Properties
-| key | example | description |
-|-------------------------|---------|---------------------------------------------------------------------------------------------------------------------------|
-| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. |
-| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) |
-| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. |
-| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. |
-| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ |
-| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. |
-| **netx.recovery-milli** | 60000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. 기본값은 60000(60초) 입니다. |
-| **netx.orphan-milli** | 10000 | 트랜잭션이 PENDING 상태가 되었지만 orphan-milli가 지나도 ACK 상태가 되지 않는경우 다른 노드에게 처리를 위임합니다. 기본값은 10000(10초) 입니다. |
+| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
+|-------------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------|----|
+| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
+| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
+| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
+| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
+| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
+| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
+| **netx.recovery-milli** | 60000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 60000 |
+| **netx.orphan-milli** | 10000 | 트랜잭션이 PENDING 상태가 되었지만 orphan-milli가 지나도 ACK 상태가 되지 않는경우 다른 노드에게 처리를 위임합니다. | 10000 |
+| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. | 40 |
### Usage example
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
index 2b0b55a..0b3d399 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
@@ -25,7 +25,7 @@ abstract class AbstractTransactionDispatcher {
.flatMap { dispatchToNotPublisherHandler(transaction) }
.doOnComplete {
ack(transaction, messageId)
- .subscribeOn(Schedulers.boundedElastic())
+ .subscribeOn(Schedulers.parallel())
.subscribe()
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
index 262645e..3ce6fd9 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
@@ -1,22 +1,34 @@
package org.rooftop.netx.engine
import org.rooftop.netx.idl.Transaction
+import reactor.core.publisher.BufferOverflowStrategy
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
abstract class AbstractTransactionListener(
+ private val backpressureSize: Int,
private val transactionDispatcher: AbstractTransactionDispatcher,
) {
fun subscribeStream() {
receive()
+ .publishOn(Schedulers.boundedElastic())
+ .onBackpressureBuffer(backpressureSize, BufferOverflowStrategy.DROP_LATEST)
.flatMap { (transaction, messageId) ->
transactionDispatcher.dispatch(transaction, messageId)
.map { transaction to messageId }
+ .onErrorResume { Mono.empty() }
}
- .subscribeOn(Schedulers.parallel())
+ .restartWhenTerminated()
.subscribe()
}
protected abstract fun receive(): Flux>
+
+ private fun Flux.restartWhenTerminated(): Flux {
+ return this.doOnTerminate {
+ subscribeStream()
+ }
+ }
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
index 7f56262..cbdfd20 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
@@ -8,18 +8,26 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration
abstract class AbstractTransactionRetrySupporter(
- recoveryMilli: Long,
+ private val backpressureSize: Int,
+ private val recoveryMilli: Long,
) {
- init {
+ fun handleLostTransactions() {
Flux.interval(recoveryMilli.milliseconds.toJavaDuration())
.flatMap {
- handleOrphanTransaction()
+ handleOrphanTransaction(backpressureSize)
.onErrorResume { Mono.empty() }
}
- .subscribeOn(Schedulers.parallel())
+ .subscribeOn(Schedulers.boundedElastic())
+ .restartWhenTerminated()
.subscribe()
}
- protected abstract fun handleOrphanTransaction(): Flux>
+ protected abstract fun handleOrphanTransaction(backpressureSize: Int): Flux>
+
+ private fun Flux.restartWhenTerminated(): Flux {
+ return this.doOnTerminate {
+ handleLostTransactions()
+ }
+ }
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
index 5615cca..e403c1d 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
@@ -12,17 +12,17 @@ import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.stream.StreamReceiver
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.scheduler.Schedulers
import kotlin.time.Duration.Companion.hours
import kotlin.time.toJavaDuration
class RedisStreamTransactionListener(
+ backpressureSize: Int,
transactionDispatcher: AbstractTransactionDispatcher,
connectionFactory: ReactiveRedisConnectionFactory,
private val nodeGroup: String,
private val nodeName: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
-) : AbstractTransactionListener(transactionDispatcher) {
+) : AbstractTransactionListener(backpressureSize, transactionDispatcher) {
private val options = StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(1.hours.toJavaDuration())
@@ -36,8 +36,7 @@ class RedisStreamTransactionListener(
receiver.receive(
Consumer.from(nodeGroup, nodeName),
StreamOffset.create(STREAM_KEY, ReadOffset.from(">"))
- ).publishOn(Schedulers.parallel())
- .map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
+ ).map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
index 7d4fe88..aafb802 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
@@ -24,6 +24,7 @@ class RedisTransactionConfigurer(
@Value("\${netx.node-name}") private val nodeName: String,
@Value("\${netx.recovery-milli:60000}") private val recoveryMilli: Long,
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
+ @Value("\${netx.backpressure:40}") private val backpressureSize: Int,
private val applicationContext: ApplicationContext,
) {
@@ -41,6 +42,7 @@ class RedisTransactionConfigurer(
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun redisStreamTransactionListener(): RedisStreamTransactionListener =
RedisStreamTransactionListener(
+ backpressureSize = backpressureSize,
transactionDispatcher = redisStreamTransactionDispatcher(),
connectionFactory = reactiveRedisConnectionFactory(),
nodeGroup = nodeGroup,
@@ -59,7 +61,8 @@ class RedisTransactionConfigurer(
transactionDispatcher = redisStreamTransactionDispatcher(),
orphanMilli = orphanMilli,
recoveryMilli = recoveryMilli,
- )
+ backpressureSize = backpressureSize,
+ ).also { it.handleLostTransactions() }
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
index 9627cb2..5ccae8d 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
@@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit
class RedisTransactionRetrySupporter(
recoveryMilli: Long,
+ backpressureSize: Int,
private val nodeGroup: String,
private val nodeName: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
@@ -20,20 +21,20 @@ class RedisTransactionRetrySupporter(
private val transactionDispatcher: AbstractTransactionDispatcher,
private val orphanMilli: Long,
private val lockKey: String = "$nodeGroup-key",
-) : AbstractTransactionRetrySupporter(recoveryMilli) {
+) : AbstractTransactionRetrySupporter(backpressureSize, recoveryMilli) {
- override fun handleOrphanTransaction(): Flux> {
- return claimTransactions()
- .publishOn(Schedulers.parallel())
+ override fun handleOrphanTransaction(backpressureSize: Int): Flux> {
+ return claimTransactions(backpressureSize)
+ .publishOn(Schedulers.boundedElastic())
.flatMap { (transaction, messageId) ->
transactionDispatcher.dispatch(transaction, messageId)
.map { transaction to messageId }
}
}
- private fun claimTransactions(): Flux> {
+ private fun claimTransactions(backpressureSize: Int): Flux> {
return reactiveRedisTemplate.opsForStream()
- .pending(STREAM_KEY, nodeGroup, Range.closed("-", "+"), Long.MAX_VALUE)
+ .pending(STREAM_KEY, nodeGroup, Range.closed("-", "+"), backpressureSize.toLong())
.filter { it.get().toList().isNotEmpty() }
.flatMap { pendingMessage ->
redissonReactiveClient.getLock(lockKey)
diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxTest.kt b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
similarity index 76%
rename from src/test/kotlin/org/rooftop/netx/client/NetxTest.kt
rename to src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
index c517bd9..8a816df 100644
--- a/src/test/kotlin/org/rooftop/netx/client/NetxTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
@@ -8,10 +8,9 @@ import io.kotest.data.row
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.rooftop.netx.redis.RedisContainer
import org.springframework.boot.test.context.SpringBootTest
-import reactor.core.publisher.Hooks
import kotlin.time.Duration.Companion.minutes
-@DisplayName("Netx 테스트의")
+@DisplayName("Netx 부하테스트")
@SpringBootTest(
classes = [
RedisContainer::class,
@@ -21,7 +20,7 @@ import kotlin.time.Duration.Companion.minutes
]
)
@EnableDistributedTransaction
-internal class NetxTest(
+internal class NetxLoadTest(
private val netxClient: NetxClient,
private val loadRunner: LoadRunner,
private val transactionReceiveStorage: TransactionReceiveStorage,
@@ -33,6 +32,7 @@ internal class NetxTest(
row(10, 10),
row(100, 100),
row(1_000, 1_000),
+ row(10_000, 10_000)
) { commitLoadCount, rollbackLoadCount ->
transactionReceiveStorage.clear()
@@ -49,10 +49,10 @@ internal class NetxTest(
}
eventually(30.minutes) {
- transactionReceiveStorage.startCountShouldBe(commitLoadCount + rollbackLoadCount)
- transactionReceiveStorage.joinCountShouldBe(commitLoadCount + rollbackLoadCount)
- transactionReceiveStorage.commitCountShouldBe(commitLoadCount)
- transactionReceiveStorage.rollbackCountShouldBe(rollbackLoadCount)
+ transactionReceiveStorage.startCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
+ transactionReceiveStorage.joinCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
+ transactionReceiveStorage.commitCountShouldBeGreaterThanOrEqual(commitLoadCount)
+ transactionReceiveStorage.rollbackCountShouldBeGreaterThanOrEqual(rollbackLoadCount)
}
}
}
diff --git a/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt b/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt
index 94eb067..8953bfe 100644
--- a/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt
+++ b/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt
@@ -1,52 +1,61 @@
package org.rooftop.netx.client
-import io.kotest.matchers.shouldBe
+import io.kotest.matchers.ints.shouldBeGreaterThanOrEqual
import org.rooftop.netx.api.*
import org.rooftop.netx.meta.TransactionHandler
+import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ConcurrentMap
@TransactionHandler
class TransactionReceiveStorage(
- private val storage: MutableMap>,
+ private val storage: ConcurrentMap> = ConcurrentHashMap(),
) {
+ private val logger = LoggerFactory.getLogger(this::class.simpleName)
+
fun clear() {
storage.clear()
}
- fun joinCountShouldBe(count: Int) {
- (storage["JOIN"]?.size ?: 0) shouldBe count
+ fun joinCountShouldBeGreaterThanOrEqual(count: Int) {
+ (storage["JOIN"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}
- fun startCountShouldBe(count: Int) {
- (storage["START"]?.size ?: 0) shouldBe count
+ fun startCountShouldBeGreaterThanOrEqual(count: Int) {
+ (storage["START"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}
- fun commitCountShouldBe(count: Int) {
- (storage["COMMIT"]?.size ?: 0) shouldBe count
+ fun commitCountShouldBeGreaterThanOrEqual(count: Int) {
+ (storage["COMMIT"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}
- fun rollbackCountShouldBe(count: Int) {
- (storage["ROLLBACK"]?.size ?: 0) shouldBe count
+ fun rollbackCountShouldBeGreaterThanOrEqual(count: Int) {
+ (storage["ROLLBACK"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}
@TransactionRollbackHandler
fun logRollback(transaction: TransactionRollbackEvent): Mono {
+ logger.info("Receive transaction RollbackEvent ${storage["ROLLBACK"]?.size ?: 0} ")
return Mono.fromCallable { log("ROLLBACK", transaction) }
}
@TransactionStartHandler
fun logStart(transaction: TransactionStartEvent): Mono {
+ logger.info("Receive transaction StartEvent ${storage["START"]?.size ?: 0} ")
return Mono.fromCallable { log("START", transaction) }
}
@TransactionJoinHandler
fun logJoin(transaction: TransactionJoinEvent): Mono {
+ logger.info("Receive transaction JoinEvent ${storage["JOIN"]?.size ?: 0} ")
return Mono.fromCallable { log("JOIN", transaction) }
}
@TransactionCommitHandler
fun logCommit(transaction: TransactionCommitEvent): Mono {
+ logger.info("Receive transaction CommitEvent ${storage["COMMIT"]?.size ?: 0} ")
return Mono.fromCallable { log("COMMIT", transaction) }
}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
index 88cd6c9..d6ea6d9 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
@@ -24,6 +24,7 @@ class NoAckRedisTransactionConfigurer(
@Value("\${netx.node-name}") private val nodeName: String,
@Value("\${netx.recovery-milli:60000}") private val recoveryMilli: Long,
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
+ @Value("\${netx.backpressure:10}") private val backpressureSize: Int,
private val applicationContext: ApplicationContext,
) {
@@ -41,6 +42,7 @@ class NoAckRedisTransactionConfigurer(
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun redisStreamTransactionListener(): RedisStreamTransactionListener =
RedisStreamTransactionListener(
+ backpressureSize = backpressureSize,
transactionDispatcher = noAckRedisStreamTransactionDispatcher(),
connectionFactory = reactiveRedisConnectionFactory(),
nodeGroup = nodeGroup,
@@ -68,7 +70,8 @@ class NoAckRedisTransactionConfigurer(
transactionDispatcher = redisStreamTransactionDispatcher(),
orphanMilli = orphanMilli,
recoveryMilli = recoveryMilli,
- )
+ backpressureSize = backpressureSize,
+ ).also { it.handleLostTransactions() }
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt
index f29e992..9571ea7 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt
@@ -6,6 +6,7 @@ import io.kotest.core.spec.style.DescribeSpec
import org.rooftop.netx.api.TransactionManager
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
+import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
@ContextConfiguration(
@@ -36,7 +37,7 @@ internal class RedisTransactionRetrySupporterTest(
it("해당 트랜잭션을 찾아서 처리하고, ack 상태로 변경한다.") {
val transactionId = transactionManager.start("undo").block()!!
- eventually(10.seconds) {
+ eventually(1.minutes) {
noPublisherTransactionHandlerAssertions.startCountShouldBe(1)
monoTransactionHandlerAssertions.startCountShouldBe(1)
diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties
index fdea166..eda6121 100644
--- a/src/test/resources/application.properties
+++ b/src/test/resources/application.properties
@@ -5,4 +5,5 @@ netx.group=netx-group
netx.node-id=1
netx.node-name=netx-node
netx.recovery-milli=1000
-netx.orphan-milli=1000
+netx.orphan-milli=10000
+netx.backpressure=40