-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs: Sample of edge initiated replicated event sourcing #1083
Conversation
Hit a tricky problem: integration tests only tested when the handler is the only source, which works fine, but when there are also regular push projections, like in the sample app, this will not work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala
Outdated
Show resolved
Hide resolved
samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala
Outdated
Show resolved
Hide resolved
samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala
Outdated
Show resolved
Hide resolved
|
||
private val FullChargeTime = 5.minutes | ||
|
||
// FIXME This is the only difference from the cloud one, maybe include in both and keep the RES identical? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, same code deployed to cloud and edge is expected with RES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I later realised that is not quite possible, since edge needs a different init method in Replication.grpcEdgeReplication and adding consumer filters, so I now added two separate methods (but duplicated the full sources with both those in both cloud and edge services).
14f9a85
to
8c0b9be
Compare
|
||
private def handleRecoveryCompleted(state: State): Unit = { | ||
// FIXME this scheme is not quite reliable, because station is not remembered/restarted | ||
// until next new event/command if edge system is shut down/restarted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@patriknw this is the part I mentioned, one simple improvement could perhaps be to schedule/complete on both edge and cloud replicas instead of only edge/initiating node, and make completion idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at the details here yet, but do we need to have the timer? Could it instead be another grpc request telling that drone charging is complete? Maybe that is somewhat more realistic? The drone would report when it is fully charged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe that is even better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to be up to drone requesting completion now 👍
ae0f114
to
bf6ea7e
Compare
…ud is still not in place
bf6ea7e
to
d168ecd
Compare
Ready for final review, guide docs will be another PR (started in #1095 already) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
.reply(startCharging.replyTo, StatusReply.success(new AllSlotsBusy(earliestFreeSlot))); | ||
} else { | ||
// charge | ||
var chargeCompletedBy = Instant.now().plus(FULL_CHARGE_TIME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the timer removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still estimate when done, needed because if all stations are busy we reply with a "try again at this point in time" calculated from first drone expected to have completed charging (to have some more logic than just is-list-full-then-deny)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename to expectedComplete
across events and commands/APIs to make it more more clear that it is a guess and not controlled by the entity itself
+ getReplicationContext().entityId() | ||
+ " not initialized"))) | ||
.onCommand( | ||
command -> true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onAnyCommand
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onAnyCommand
returns a built CommandHanlder
which can't be composed with orElse
for the other state below.
samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java
Outdated
Show resolved
Hide resolved
samples/grpc/local-drone-control-java/src/main/java/local/drones/ClusteredMain.java
Outdated
Show resolved
Hide resolved
samples/grpc/local-drone-control-java/src/main/resources/grpc.conf
Outdated
Show resolved
Hide resolved
@@ -16,6 +19,8 @@ import scala.concurrent.TimeoutException | |||
|
|||
class DroneServiceImpl( | |||
deliveriesQueue: ActorRef[DeliveriesQueue.Command], | |||
chargingStationEntityRefFactory: String => EntityRef[ | |||
ChargingStation.Command], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the indirection of this factory function? It could use ClusterSharding(system).entityRefFor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's from the Replication#entityRefFactory
/EdgeReplication#entityRefFactory
since the user doesn't set up sharding for the entity themselves and so it might not be obvious that it is a side effect of starting it with Replication.grpcReplication
or Replication.grpcEdgeReplication
that there is sharding with the entity type running after that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But maybe it would be more clear/clean to pass the entire EdgeReplication
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see that comes from the EdgeReplication.entityRefFactory
, and we have the same in the other grpcReplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No description provided.