Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DynamoDB - Avoid dropping sequence number progress #1260

Merged
merged 12 commits into from
Nov 22, 2024
8 changes: 8 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ akka.projection.dynamodb {
# reducing this may result in fewer restarts of the projection due to failure to query
# starting offsets.
offset-slice-read-parallelism = 64

# Batch writes are not automatically retried by the underlying SDK, so these settings govern those retries
retries {
max-retries = 3
min-backoff = 200ms
max-backoff = 2s
random-factor = 0.3
}
}

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ object DynamoDBProjectionSettings {
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")))
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")),
retrySettings = RetrySettings(config.getConfig("offset-store.retries")))
}

/**
Expand All @@ -68,7 +69,8 @@ final class DynamoDBProjectionSettings private (
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings) {
val timeToLiveSettings: TimeToLiveSettings,
val retrySettings: RetrySettings) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)
Expand Down Expand Up @@ -106,6 +108,9 @@ final class DynamoDBProjectionSettings private (
def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings =
copy(timeToLiveSettings = timeToLiveSettings)

def withRetrySettings(retrySettings: RetrySettings): DynamoDBProjectionSettings =
copy(retrySettings = retrySettings)

@nowarn("msg=deprecated")
private def copy(
timestampOffsetTable: String = timestampOffsetTable,
Expand All @@ -114,7 +119,8 @@ final class DynamoDBProjectionSettings private (
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) =
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings,
retrySettings: RetrySettings = retrySettings) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
Expand All @@ -124,7 +130,8 @@ final class DynamoDBProjectionSettings private (
warnAboutFilteredEventsInFlow,
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings)
timeToLiveSettings,
retrySettings)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
Expand Down Expand Up @@ -200,3 +207,48 @@ final class ProjectionTimeToLiveSettings private (val offsetTimeToLive: Option[F
private def copy(offsetTimeToLive: Option[FiniteDuration]): ProjectionTimeToLiveSettings =
new ProjectionTimeToLiveSettings(offsetTimeToLive)
}

object RetrySettings {
val defaults: RetrySettings =
new RetrySettings(maxRetries = 3, minBackoff = 200.millis, maxBackoff = 2.seconds, randomFactor = 0.3)

def apply(config: Config): RetrySettings = {
new RetrySettings(
maxRetries = config.getInt("max-retries"),
minBackoff = config.getDuration("min-backoff").toScala,
maxBackoff = config.getDuration("max-backoff").toScala,
randomFactor = config.getDouble("random-factor"))
}
}

final class RetrySettings private (
val maxRetries: Int,
val minBackoff: FiniteDuration,
val maxBackoff: FiniteDuration,
val randomFactor: Double) {

def withMaxRetries(maxRetries: Int): RetrySettings =
copy(maxRetries = maxRetries)

def withMinBackoff(minBackoff: FiniteDuration): RetrySettings =
copy(minBackoff = minBackoff)

def withMinBackoff(minBackoff: JDuration): RetrySettings =
copy(minBackoff = minBackoff.toScala)

def withMaxBackoff(maxBackoff: FiniteDuration): RetrySettings =
copy(maxBackoff = maxBackoff)

def withMaxBackoff(maxBackoff: JDuration): RetrySettings =
copy(maxBackoff = maxBackoff.toScala)

def withRandomFactor(randomFactor: Double): RetrySettings =
copy(randomFactor = randomFactor)

private def copy(
maxRetries: Int = maxRetries,
minBackoff: FiniteDuration = minBackoff,
maxBackoff: FiniteDuration = maxBackoff,
randomFactor: Double = randomFactor) =
new RetrySettings(maxRetries, minBackoff, maxBackoff, randomFactor)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import scala.jdk.FutureConverters._
import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.pattern.BackoffSupervisor
import akka.pattern.after
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.query.TimestampOffset
import akka.projection.ProjectionId
Expand All @@ -35,6 +37,7 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

/**
* INTERNAL API
Expand All @@ -59,6 +62,8 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
val timestampBySlicePid = AttributeValue.fromS("_")
val managementStateBySlicePid = AttributeValue.fromS("_mgmt")
}

final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends Exception
pvlugter marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -70,6 +75,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
projectionId: ProjectionId,
client: DynamoDbAsyncClient) {
import OffsetStoreDao.log
import OffsetStoreDao.BatchWriteFailed
import OffsetStoreDao.MaxBatchSize
import OffsetStoreDao.MaxTransactItems
import system.executionContext
Expand Down Expand Up @@ -113,6 +119,47 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
}(ExecutionContext.parasitic)
}

implicit def sys: ActorSystem[_] = system

private def writeBatchWithRetries(
batchReq: BatchWriteItemRequest,
retries: Int = 0): Future[List[BatchWriteItemResponse]] = {
val result = client.batchWriteItem(batchReq).asScala

result.flatMap { response =>
if (response.hasUnprocessedItems && !response.unprocessedItems.isEmpty) {
if (retries >= settings.retrySettings.maxRetries) {
Future.failed(new BatchWriteFailed(response))
} else { // retry after exponential backoff
val unprocessed = response.unprocessedItems
val newReq = batchReq.toBuilder.requestItems(unprocessed).build()
val nextRetry = retries + 1
val delay = BackoffSupervisor.calculateDelay(
retries,
settings.retrySettings.minBackoff,
settings.retrySettings.maxBackoff,
settings.retrySettings.randomFactor)

if (log.isDebugEnabled) {
val count = unprocessed.asScala.valuesIterator.map(_.size).sum
log.debug(
"Not all writes in batch were applied, retrying in [{}]: [{}] unapplied writes, [{}/{}] retries",
delay.toCoarsest,
count,
nextRetry,
settings.retrySettings.maxRetries)
}

after(delay) {
writeBatchWithRetries(newReq, nextRetry)
}.map { responses => response :: responses }(ExecutionContext.parasitic)
}
} else {
Future.successful(List(response))
}
}
}

def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

Expand Down Expand Up @@ -163,22 +210,37 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeBatchWithRetries(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote latest timestamps for [{}] slices, consumed [{}] WCU",
offsetsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSliceItems = failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedSlices = unprocessedSliceItems.map(_.get(NameSlice).s)
log.warn(
"Failed to write latest timestamps for [{}] slices: [{}]",
unprocessedSlices.size,
unprocessedSlices.mkString(", "))

Future.failed(failed)

case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
}
}

if (offsetsBySlice.size <= MaxBatchSize) {
Expand Down Expand Up @@ -221,18 +283,43 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeBatchWithRetries(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote [{}] sequence numbers, consumed [{}] WCU",
recordsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}

result.map(_ => Done)(ExecutionContext.parasitic)
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSeqNrItems =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedSeqNrs = unprocessedSeqNrItems.map { item =>
import OffsetStoreDao.OffsetStoreAttributes._
s"${item.get(NameSlice).s}: ${item.get(Pid).s}"
}

log.warn(
"Failed to write sequence numbers for [{}] persistence IDs: [{}]",
unprocessedSeqNrs.size,
unprocessedSeqNrs.mkString(", "))

Future.failed(failed)

case c: CompletionException =>
Future.failed(c.getCause)
}
}

if (records.size <= MaxBatchSize) {
Expand Down Expand Up @@ -409,22 +496,41 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeBatchWithRetries(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote management state for [{}] slices, consumed [{}] WCU",
slices.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedStateItems =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedStates = unprocessedStateItems.map { item =>
s"${item.get(NameSlice).s}-${item.get(Paused).bool}"
}

log.warn(
"Failed to write management state for [{}] slices: [{}]",
unprocessedStates.size,
unprocessedStates.mkString(", "))

Future.failed(failed)

case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
}
}

val sliceRange = (minSlice to maxSlice).toVector
Expand Down
Loading