-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: DynamoDB - Avoid dropping sequence number progress #1260
Conversation
...ojection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
…estampOffsets" This reverts commit d8dd7bf.
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
This fix was tested against a slightly under-provisioned offset table, to have some failed writes from throttling. Looks good. |
...projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala
Outdated
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Show resolved
Hide resolved
akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala
Outdated
Show resolved
Hide resolved
unprocessedSlices.size, | ||
unprocessedSlices.mkString(", ")) | ||
|
||
failed.asInstanceOf[Future[Done]] // safe, actually contains Nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to cast an exception to a future.
java.lang.ClassCastException: class akka.projection.dynamodb.internal.OffsetStoreDao$BatchWriteFailed cannot be cast to class scala.concurrent.Future
Applied some changes directly after review. Have tested again, and it looks better with the backoff delay on retries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Great catch! I have a few minor code style comments, but will follow up with those in a separate PR.
The major fix here is retrying unprocessed batch-write items in the OffsetStoreDao: while the individual writes are atomic, the batch is not and reports partial success as a successful CompletionStage: it's on the caller to check that all items succeeded and retry ones that didn't.