Skip to content

Commit

Permalink
Add a dedicated error for stream cancelled (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Nov 29, 2023
1 parent a36d7fc commit 879d4c6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.errors.PodUnavailable
import com.devsisters.shardcake.errors.StreamCancelled
import zio._
import zio.stream.ZStream

Expand Down Expand Up @@ -49,10 +49,10 @@ trait Messenger[-Msg] {
case (c, Left(err)) => (c, Left(c -> err))
}
.flatMap {
case Right(res) => ZStream.succeed(res)
case Left((lastSeenCursor, PodUnavailable(_))) =>
case Right(res) => ZStream.succeed(res)
case Left((lastSeenCursor, StreamCancelled)) =>
ZStream.execute(ZIO.sleep(200.millis)) ++
sendStreamAutoRestart(entityId, lastSeenCursor)(msg)(updateCursor)
case Left((_, err)) => ZStream.fail(err)
case Left((_, err)) => ZStream.fail(err)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.devsisters.shardcake.errors

/**
* Exception indicating that a stream was interrupted.
* It could be caused by a shard rebalance, by the entity inactivity or by the pod being unavailable.
*/
case object StreamCancelled extends Exception(s"Stream connection was canceled.")
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,9 @@ class GrpcPods(
if (ex.getStatus.getCode == Status.Code.RESOURCE_EXHAUSTED) {
// entity is not managed by this pod, wait and retry (assignments will be updated)
EntityNotManagedByThisPod(message.entityId)
} else if (
ex.getStatus.getCode == Status.Code.UNAVAILABLE || ex.getStatus.getCode == Status.Code.CANCELLED
) {
PodUnavailable(pod)
} else {
} else if (ex.getStatus.getCode == Status.Code.UNAVAILABLE) PodUnavailable(pod)
else if (ex.getStatus.getCode == Status.Code.CANCELLED) StreamCancelled
else {
ex
},
_.body.toByteArray
Expand Down

0 comments on commit 879d4c6

Please sign in to comment.