diff --git a/akka-projection-dynamodb/src/main/resources/reference.conf b/akka-projection-dynamodb/src/main/resources/reference.conf index 3eb4f1a63..cbce78aff 100644 --- a/akka-projection-dynamodb/src/main/resources/reference.conf +++ b/akka-projection-dynamodb/src/main/resources/reference.conf @@ -27,6 +27,14 @@ akka.projection.dynamodb { # reducing this may result in fewer restarts of the projection due to failure to query # starting offsets. offset-slice-read-parallelism = 64 + + # Batch writes are not automatically retried by the underlying SDK, so these settings govern those retries + retries { + max-retries = 3 + min-backoff = 200ms + max-backoff = 2s + random-factor = 0.3 + } } # By default it shares DynamoDB client with akka-persistence-dynamodb (write side). diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala index 82dc79268..f3f2de84b 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala @@ -46,7 +46,8 @@ object DynamoDBProjectionSettings { warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"), offsetBatchSize = config.getInt("offset-store.offset-batch-size"), offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"), - timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live"))) + timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")), + retrySettings = RetrySettings(config.getConfig("offset-store.retries"))) } /** @@ -68,7 +69,8 @@ final class DynamoDBProjectionSettings private ( val warnAboutFilteredEventsInFlow: Boolean, val offsetBatchSize: Int, val offsetSliceReadParallelism: Int, - val timeToLiveSettings: TimeToLiveSettings) { + val timeToLiveSettings: TimeToLiveSettings, + val retrySettings: RetrySettings) { def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings = copy(timestampOffsetTable = timestampOffsetTable) @@ -106,6 +108,9 @@ final class DynamoDBProjectionSettings private ( def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings = copy(timeToLiveSettings = timeToLiveSettings) + def withRetrySettings(retrySettings: RetrySettings): DynamoDBProjectionSettings = + copy(retrySettings = retrySettings) + @nowarn("msg=deprecated") private def copy( timestampOffsetTable: String = timestampOffsetTable, @@ -114,7 +119,8 @@ final class DynamoDBProjectionSettings private ( warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow, offsetBatchSize: Int = offsetBatchSize, offsetSliceReadParallelism: Int = offsetSliceReadParallelism, - timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) = + timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings, + retrySettings: RetrySettings = retrySettings) = new DynamoDBProjectionSettings( timestampOffsetTable, useClient, @@ -124,7 +130,8 @@ final class DynamoDBProjectionSettings private ( warnAboutFilteredEventsInFlow, offsetBatchSize, offsetSliceReadParallelism, - timeToLiveSettings) + timeToLiveSettings, + retrySettings) override def toString = s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" @@ -200,3 +207,48 @@ final class ProjectionTimeToLiveSettings private (val offsetTimeToLive: Option[F private def copy(offsetTimeToLive: Option[FiniteDuration]): ProjectionTimeToLiveSettings = new ProjectionTimeToLiveSettings(offsetTimeToLive) } + +object RetrySettings { + val defaults: RetrySettings = + new RetrySettings(maxRetries = 3, minBackoff = 200.millis, maxBackoff = 2.seconds, randomFactor = 0.3) + + def apply(config: Config): RetrySettings = { + new RetrySettings( + maxRetries = config.getInt("max-retries"), + minBackoff = config.getDuration("min-backoff").toScala, + maxBackoff = config.getDuration("max-backoff").toScala, + randomFactor = config.getDouble("random-factor")) + } +} + +final class RetrySettings private ( + val maxRetries: Int, + val minBackoff: FiniteDuration, + val maxBackoff: FiniteDuration, + val randomFactor: Double) { + + def withMaxRetries(maxRetries: Int): RetrySettings = + copy(maxRetries = maxRetries) + + def withMinBackoff(minBackoff: FiniteDuration): RetrySettings = + copy(minBackoff = minBackoff) + + def withMinBackoff(minBackoff: JDuration): RetrySettings = + copy(minBackoff = minBackoff.toScala) + + def withMaxBackoff(maxBackoff: FiniteDuration): RetrySettings = + copy(maxBackoff = maxBackoff) + + def withMaxBackoff(maxBackoff: JDuration): RetrySettings = + copy(maxBackoff = maxBackoff.toScala) + + def withRandomFactor(randomFactor: Double): RetrySettings = + copy(randomFactor = randomFactor) + + private def copy( + maxRetries: Int = maxRetries, + minBackoff: FiniteDuration = minBackoff, + maxBackoff: FiniteDuration = maxBackoff, + randomFactor: Double = randomFactor) = + new RetrySettings(maxRetries, minBackoff, maxBackoff, randomFactor) +} diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala index 40d221da1..c4c8b281f 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala @@ -17,6 +17,8 @@ import scala.jdk.FutureConverters._ import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi +import akka.pattern.BackoffSupervisor +import akka.pattern.after import akka.persistence.dynamodb.internal.InstantFactory import akka.persistence.query.TimestampOffset import akka.projection.ProjectionId @@ -35,6 +37,7 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest import software.amazon.awssdk.services.dynamodb.model.WriteRequest +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse /** * INTERNAL API @@ -59,6 +62,8 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest val timestampBySlicePid = AttributeValue.fromS("_") val managementStateBySlicePid = AttributeValue.fromS("_mgmt") } + + final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends Exception } /** @@ -70,6 +75,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest projectionId: ProjectionId, client: DynamoDbAsyncClient) { import OffsetStoreDao.log + import OffsetStoreDao.BatchWriteFailed import OffsetStoreDao.MaxBatchSize import OffsetStoreDao.MaxTransactItems import system.executionContext @@ -113,6 +119,47 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest }(ExecutionContext.parasitic) } + implicit def sys: ActorSystem[_] = system + + private def writeBatchWithRetries( + batchReq: BatchWriteItemRequest, + retries: Int = 0): Future[List[BatchWriteItemResponse]] = { + val result = client.batchWriteItem(batchReq).asScala + + result.flatMap { response => + if (response.hasUnprocessedItems && !response.unprocessedItems.isEmpty) { + if (retries >= settings.retrySettings.maxRetries) { + Future.failed(new BatchWriteFailed(response)) + } else { // retry after exponential backoff + val unprocessed = response.unprocessedItems + val newReq = batchReq.toBuilder.requestItems(unprocessed).build() + val nextRetry = retries + 1 + val delay = BackoffSupervisor.calculateDelay( + retries, + settings.retrySettings.minBackoff, + settings.retrySettings.maxBackoff, + settings.retrySettings.randomFactor) + + if (log.isDebugEnabled) { + val count = unprocessed.asScala.valuesIterator.map(_.size).sum + log.debug( + "Not all writes in batch were applied, retrying in [{}]: [{}] unapplied writes, [{}/{}] retries", + delay.toCoarsest, + count, + nextRetry, + settings.retrySettings.maxRetries) + } + + after(delay) { + writeBatchWithRetries(newReq, nextRetry) + }.map { responses => response :: responses }(ExecutionContext.parasitic) + } + } else { + Future.successful(List(response)) + } + } + } + def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = { import OffsetStoreDao.OffsetStoreAttributes._ @@ -163,22 +210,37 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() - val result = client.batchWriteItem(req).asScala + val result = writeBatchWithRetries(req) if (log.isDebugEnabled()) { - result.foreach { response => + result.foreach { responses => log.debug( "Wrote latest timestamps for [{}] slices, consumed [{}] WCU", offsetsBatch.size, - response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum) + responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum) } } result .map(_ => Done)(ExecutionContext.parasitic) .recoverWith { + case failed: BatchWriteFailed => + val unprocessedSliceItems = failed.lastResponse.unprocessedItems + .get(settings.timestampOffsetTable) + .asScala + .toVector + .map(_.putRequest.item) + + val unprocessedSlices = unprocessedSliceItems.map(_.get(NameSlice).s) + log.warn( + "Failed to write latest timestamps for [{}] slices: [{}]", + unprocessedSlices.size, + unprocessedSlices.mkString(", ")) + + Future.failed(failed) + case c: CompletionException => Future.failed(c.getCause) - }(ExecutionContext.parasitic) + } } if (offsetsBySlice.size <= MaxBatchSize) { @@ -221,18 +283,43 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() - val result = client.batchWriteItem(req).asScala + val result = writeBatchWithRetries(req) if (log.isDebugEnabled()) { - result.foreach { response => + result.foreach { responses => log.debug( "Wrote [{}] sequence numbers, consumed [{}] WCU", recordsBatch.size, - response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum) + responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum) } } - result.map(_ => Done)(ExecutionContext.parasitic) + result + .map(_ => Done)(ExecutionContext.parasitic) + .recoverWith { + case failed: BatchWriteFailed => + val unprocessedSeqNrItems = + failed.lastResponse.unprocessedItems + .get(settings.timestampOffsetTable) + .asScala + .toVector + .map(_.putRequest.item) + + val unprocessedSeqNrs = unprocessedSeqNrItems.map { item => + import OffsetStoreDao.OffsetStoreAttributes._ + s"${item.get(NameSlice).s}: ${item.get(Pid).s}" + } + + log.warn( + "Failed to write sequence numbers for [{}] persistence IDs: [{}]", + unprocessedSeqNrs.size, + unprocessedSeqNrs.mkString(", ")) + + Future.failed(failed) + + case c: CompletionException => + Future.failed(c.getCause) + } } if (records.size <= MaxBatchSize) { @@ -409,22 +496,41 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() - val result = client.batchWriteItem(req).asScala + val result = writeBatchWithRetries(req) if (log.isDebugEnabled()) { - result.foreach { response => + result.foreach { responses => log.debug( "Wrote management state for [{}] slices, consumed [{}] WCU", slices.size, - response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum) + responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum) } } result .map(_ => Done)(ExecutionContext.parasitic) .recoverWith { + case failed: BatchWriteFailed => + val unprocessedStateItems = + failed.lastResponse.unprocessedItems + .get(settings.timestampOffsetTable) + .asScala + .toVector + .map(_.putRequest.item) + + val unprocessedStates = unprocessedStateItems.map { item => + s"${item.get(NameSlice).s}-${item.get(Paused).bool}" + } + + log.warn( + "Failed to write management state for [{}] slices: [{}]", + unprocessedStates.size, + unprocessedStates.mkString(", ")) + + Future.failed(failed) + case c: CompletionException => Future.failed(c.getCause) - }(ExecutionContext.parasitic) + } } val sliceRange = (minSlice to maxSlice).toVector