Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DynamoDB - Avoid dropping sequence number progress #1260

Merged
merged 12 commits into from
Nov 22, 2024
7 changes: 7 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ akka.projection.dynamodb {
# reducing this may result in fewer restarts of the projection due to failure to query
# starting offsets.
offset-slice-read-parallelism = 64

# Batch writes are not automatically retried by the underlying SDK, so these settings govern those retries
retries {
max-retries = 3
min-backoff = 200ms
max-backoff = 2s
}
pvlugter marked this conversation as resolved.
Show resolved Hide resolved
}

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ object DynamoDBProjectionSettings {
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")))
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")),
batchMaxRetries = config.getInt("offset-store.retries.max-retries"),
batchMinBackoff = config.getDuration("offset-store.retries.min-backoff").toScala,
batchMaxBackoff = config.getDuration("offset-store.retries.max-backoff").toScala)
pvlugter marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -68,7 +71,10 @@ final class DynamoDBProjectionSettings private (
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings) {
val timeToLiveSettings: TimeToLiveSettings,
val batchMaxRetries: Int,
val batchMinBackoff: FiniteDuration,
val batchMaxBackoff: FiniteDuration) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)
Expand Down Expand Up @@ -106,6 +112,21 @@ final class DynamoDBProjectionSettings private (
def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings =
copy(timeToLiveSettings = timeToLiveSettings)

def withBatchRetry(
maxRetries: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration): DynamoDBProjectionSettings =
copy(batchMaxRetries = maxRetries, batchMinBackoff = minBackoff, batchMaxBackoff = maxBackoff)

def withBatchMaxRetries(batchMaxRetries: Int): DynamoDBProjectionSettings =
copy(batchMaxRetries = batchMaxRetries)

def withBatchMinBackoff(batchMinBackoff: FiniteDuration): DynamoDBProjectionSettings =
copy(batchMinBackoff = batchMinBackoff)

def withBatchMaxBackoff(batchMaxBackoff: FiniteDuration): DynamoDBProjectionSettings =
copy(batchMaxBackoff = batchMaxBackoff)

@nowarn("msg=deprecated")
private def copy(
timestampOffsetTable: String = timestampOffsetTable,
Expand All @@ -114,7 +135,10 @@ final class DynamoDBProjectionSettings private (
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) =
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings,
batchMaxRetries: Int = batchMaxRetries,
batchMinBackoff: FiniteDuration = batchMinBackoff,
batchMaxBackoff: FiniteDuration = batchMaxBackoff) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
Expand All @@ -124,7 +148,10 @@ final class DynamoDBProjectionSettings private (
warnAboutFilteredEventsInFlow,
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings)
timeToLiveSettings,
batchMaxRetries,
batchMinBackoff,
batchMaxBackoff)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ package akka.projection.dynamodb.internal
import java.time.Instant
import java.util.Collections
import java.util.concurrent.CompletionException
import java.util.concurrent.ThreadLocalRandom
import java.util.{ HashMap => JHashMap }

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.pattern.after
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.query.TimestampOffset
import akka.projection.ProjectionId
Expand All @@ -35,6 +38,7 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

/**
* INTERNAL API
Expand All @@ -59,6 +63,8 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
val timestampBySlicePid = AttributeValue.fromS("_")
val managementStateBySlicePid = AttributeValue.fromS("_mgmt")
}

final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends Exception
pvlugter marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -70,6 +76,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
projectionId: ProjectionId,
client: DynamoDbAsyncClient) {
import OffsetStoreDao.log
import OffsetStoreDao.BatchWriteFailed
import OffsetStoreDao.MaxBatchSize
import OffsetStoreDao.MaxTransactItems
import system.executionContext
Expand Down Expand Up @@ -113,6 +120,48 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
}(ExecutionContext.parasitic)
}

implicit def sys: ActorSystem[_] = system

def writeWholeBatch(batchReq: BatchWriteItemRequest): Future[List[BatchWriteItemResponse]] =
writeWholeBatch(batchReq, settings.batchMaxRetries, settings.batchMinBackoff)

def writeWholeBatch(
batchReq: BatchWriteItemRequest,
maxRetries: Int,
backoff: FiniteDuration): Future[List[BatchWriteItemResponse]] = {
val result = client.batchWriteItem(batchReq).asScala

result.flatMap { response =>
val unprocessed =
if (response.hasUnprocessedItems && !response.unprocessedItems.isEmpty) Some(response.unprocessedItems)
else None

unprocessed.fold(Future.successful(List(response))) { u =>
if (log.isDebugEnabled) {
val count = u.asScala.valuesIterator.map(_.size).sum
log.debug(
"Not all writes in batch were applied: [{}] unapplied writes, [{}] retries remaining",
count,
maxRetries)
}

if (maxRetries < 1) Future.failed(new BatchWriteFailed(response))
else {
val newReq = batchReq.toBuilder.requestItems(u).build()
val factor = 2.0 + ThreadLocalRandom.current().nextDouble(0.3)
val nextDelay = {
val clamped = (backoff * factor).min(settings.batchMaxBackoff)
if (clamped.isFinite) clamped.toMillis.millis else settings.batchMaxBackoff
}
pvlugter marked this conversation as resolved.
Show resolved Hide resolved

after(backoff) {
writeWholeBatch(newReq, maxRetries - 1, nextDelay)
}.map { responses => response :: responses }(ExecutionContext.parasitic)
}
}
}
}

def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

Expand Down Expand Up @@ -163,22 +212,37 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeWholeBatch(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote latest timestamps for [{}] slices, consumed [{}] WCU",
offsetsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSliceItems = failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedSlices = unprocessedSliceItems.map(_.get(NameSlice).s)
log.warn(
"Failed to write latest timestamps for [{}] slices: [{}]",
unprocessedSlices.size,
unprocessedSlices.mkString(", "))

failed.asInstanceOf[Future[Done]] // safe, actually contains Nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to cast an exception to a future.

java.lang.ClassCastException: class akka.projection.dynamodb.internal.OffsetStoreDao$BatchWriteFailed cannot be cast to class scala.concurrent.Future


case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
}
}

if (offsetsBySlice.size <= MaxBatchSize) {
Expand Down Expand Up @@ -221,18 +285,43 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeWholeBatch(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote [{}] sequence numbers, consumed [{}] WCU",
recordsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}

result.map(_ => Done)(ExecutionContext.parasitic)
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSeqNrItems =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedSeqNrs = unprocessedSeqNrItems.map { item =>
import OffsetStoreDao.OffsetStoreAttributes._
s"${item.get(NameSlice).s}: ${item.get(Pid).s}"
}

log.warn(
"Failed to write sequence numbers for [{}] persistence IDs: [{}]",
unprocessedSeqNrs.size,
unprocessedSeqNrs.mkString(", "))

failed.asInstanceOf[Future[Done]]

case c: CompletionException =>
Future.failed(c.getCause)
}
}

if (records.size <= MaxBatchSize) {
Expand Down Expand Up @@ -409,22 +498,41 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeWholeBatch(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote management state for [{}] slices, consumed [{}] WCU",
slices.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedStateItems =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedStates = unprocessedStateItems.map { item =>
s"${item.get(NameSlice).s}-${item.get(Paused).bool}"
}

log.warn(
"Failed to write management state for [{}] slices: [{}]",
unprocessedStates.size,
unprocessedStates.mkString(", "))

failed.asInstanceOf[Future[Done]]

case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
}
}

val sliceRange = (minSlice to maxSlice).toVector
Expand Down
Loading