Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ConsumerFilterRegistry unique actor name #1192

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package akka.projection.grpc.internal
import scala.concurrent.duration._
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.util.UUID

import scala.collection.immutable
import scala.util.Failure
import scala.util.Success

import akka.actor.InvalidActorNameException
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
Expand Down Expand Up @@ -61,9 +63,17 @@ import akka.util.Timeout
stores.get(streamId) match {
case Some(store) => store
case None =>
context.spawn(
ConsumerFilterStore(context.system, settings, streamId, context.self),
URLEncoder.encode(streamId, StandardCharsets.UTF_8.name))
val encodedStreamId = URLEncoder.encode(streamId, StandardCharsets.UTF_8.name)
try {
context.spawn(ConsumerFilterStore(context.system, settings, streamId, context.self), encodedStreamId)
} catch {
case _: InvalidActorNameException =>
// There could be a race condition from SubscriberTerminated where the child is stopped,
// but not removed yet. The actor name isn't important, but could be useful for debugging.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative could be to watch the store remove it from the stores Map when Terminated is received. That would probably be more clean, but maybe increases the risk of sending a UpdateFilter message to a store that is about to terminate.

context.spawn(
ConsumerFilterStore(context.system, settings, streamId, context.self),
s"$encodedStreamId-${UUID.randomUUID()}")
}
}
}

Expand Down
Loading