From 5224d2e3a86d5de522a13ab436d89c35399d0465 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 2 Sep 2024 17:09:07 +0200 Subject: [PATCH 1/2] fix: ConsumerFilterRegistry unique actor name --- .../grpc/internal/ConsumerFilterRegistry.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala index 750d125cf..a806aae3c 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala @@ -7,11 +7,13 @@ package akka.projection.grpc.internal import scala.concurrent.duration._ import java.net.URLEncoder import java.nio.charset.StandardCharsets +import java.util.UUID import scala.collection.immutable import scala.util.Failure import scala.util.Success +import akka.actor.InvalidActorNameException import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.ActorContext @@ -61,9 +63,17 @@ import akka.util.Timeout stores.get(streamId) match { case Some(store) => store case None => - context.spawn( - ConsumerFilterStore(context.system, settings, streamId, context.self), - URLEncoder.encode(streamId, StandardCharsets.UTF_8.name)) + val encodedStreamId = URLEncoder.encode(streamId, StandardCharsets.UTF_8.name) + try { + context.spawn(ConsumerFilterStore(context.system, settings, streamId, context.self), encodedStreamId) + } catch { + case _: InvalidActorNameException => + // There could be a race condition from SubscriberTerminated where the child is stopped, + // but not removed yet. The actor name isn't important, but could be useful for debugging. + context.spawn( + ConsumerFilterStore(context.system, settings, streamId, context.self), + s"encodedStreamId-${UUID.randomUUID()}") + } } } From 4cd7e4327514368baed1fae111406fa98d70d7cd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 3 Sep 2024 07:30:31 +0200 Subject: [PATCH 2/2] Missing substitution Co-authored-by: Peter Vlugter <59895+pvlugter@users.noreply.github.com> --- .../akka/projection/grpc/internal/ConsumerFilterRegistry.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala index a806aae3c..cc804cb95 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerFilterRegistry.scala @@ -72,7 +72,7 @@ import akka.util.Timeout // but not removed yet. The actor name isn't important, but could be useful for debugging. context.spawn( ConsumerFilterStore(context.system, settings, streamId, context.self), - s"encodedStreamId-${UUID.randomUUID()}") + s"$encodedStreamId-${UUID.randomUUID()}") } } }