Skip to content

Commit

Permalink
Scala 3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Chapuis authored and jchapuis committed Nov 27, 2023
1 parent 256130d commit fdc6dbf
Show file tree
Hide file tree
Showing 73 changed files with 368 additions and 309 deletions.
5 changes: 5 additions & 0 deletions akka-runtime/src/main/protobuf/command.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
syntax = "proto2";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
scala3_sources: true
};

package endless.runtime.akka.serializer.proto;
option optimize_for = SPEED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext}
import cats.Applicative
import cats.effect.kernel.{Ref, Sync}
import cats.syntax.eq._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.eq.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import endless.core.entity.Effector.PassivationState

import scala.concurrent.duration.{Duration, FiniteDuration}

private[akka] class EntityPassivator[F[_]: Sync](upcomingPassivation: Ref[F, Option[Cancellable]])(
implicit
entityContext: EntityContext[_],
actorContext: ActorContext[_]
entityContext: EntityContext[?],
actorContext: ActorContext[?]
) {
private lazy val passivateMessage = ClusterSharding.Passivate(actorContext.self)
private lazy val passivate = Sync[F].delay(entityContext.shard.tell(passivateMessage))
Expand Down Expand Up @@ -44,8 +44,8 @@ private[akka] class EntityPassivator[F[_]: Sync](upcomingPassivation: Ref[F, Opt

object EntityPassivator {
def apply[F[_]: Sync](implicit
entityContext: EntityContext[_],
actorContext: ActorContext[_]
entityContext: EntityContext[?],
actorContext: ActorContext[?]
): F[EntityPassivator[F]] =
Ref.of[F, Option[Cancellable]](Option.empty[Cancellable]).map(new EntityPassivator(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package endless.runtime.akka
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityTypeKey}
import akka.util.Timeout
import cats.effect.kernel.Async
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.show._
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.show.*
import cats.~>
import endless.core.entity.EntityNameProvider
import endless.core.protocol.{CommandSender, EntityIDEncoder, OutgoingCommand}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.cluster.{Cluster, MemberStatus}
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import cats.effect.kernel.implicits._
import cats.effect.kernel.implicits.*
import cats.effect.kernel.{Async, Deferred, Resource, Sync}
import cats.effect.std.Dispatcher
import cats.implicits.catsSyntaxApplicativeError
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import org.typelevel.log4cats.Logger

import scala.concurrent.TimeoutException
Expand All @@ -27,7 +27,7 @@ import scala.concurrent.duration.{Duration, DurationInt}
* effects dispatcher tied to the cluster resource scope
*/
final case class AkkaCluster[F[_]: Async](
system: ActorSystem[_],
system: ActorSystem[?],
dispatcher: Dispatcher[F],
cluster: Cluster,
sharding: ClusterSharding
Expand Down Expand Up @@ -65,7 +65,7 @@ object AkkaCluster {
* seconds by default).
*/
def managedResource[F[_]: Async: Logger](
createActorSystem: => ActorSystem[_],
createActorSystem: => ActorSystem[?],
catsEffectReleaseTimeout: Duration = 5.seconds,
akkaReleaseTimeout: Duration = 5.seconds
): Resource[F, AkkaCluster[F]] =
Expand Down Expand Up @@ -115,7 +115,7 @@ object AkkaCluster {
)

private def createCluster[F[_]: Async: Logger](
createActorSystem: => ActorSystem[_],
createActorSystem: => ActorSystem[?],
dispatcher: Dispatcher[F]
) = for {
system <- Sync[F].delay(createActorSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext}
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import endless.core.entity._
import endless.core.entity.*
import endless.core.event.EventApplier
import endless.core.interpret._
import endless.core.interpret.*
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.data._
import AkkaDeployer._
import endless.runtime.akka.data.*
import AkkaDeployer.*
import endless.runtime.akka.deploy.internal.EventSourcedShardedRepositoryDeployer
import org.typelevel.log4cats.Logger
import endless.core.entity.Deployer
Expand All @@ -33,7 +33,7 @@ trait AkkaDeployer extends Deployer {
eventApplier: EventApplier[S, E],
parameters: AkkaDeploymentParameters[F, S, E]
): Resource[F, DeployedAkkaRepository[F, RepositoryAlg]] = {
import parameters._
import parameters.*
implicit val sharding: ClusterSharding = akkaCluster.sharding
implicit val sender: CommandSender[F, ID] = ShardingCommandSender[F, ID]
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext}
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import endless.core.entity._
import endless.core.interpret._
import endless.core.entity.*
import endless.core.interpret.*
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.ShardingCommandSender
import endless.runtime.akka.data._
import endless.runtime.akka.data.*
import endless.runtime.akka.deploy.AkkaDurableDeployer.{
AkkaDurableDeploymentParameters,
DeployedAkkaDurableRepository
Expand All @@ -34,7 +34,7 @@ trait AkkaDurableDeployer extends DurableDeployer {
commandProtocol: CommandProtocol[ID, Alg],
parameters: AkkaDurableDeploymentParameters[F, S]
): Resource[F, DeployedAkkaDurableRepository[F, RepositoryAlg]] = {
import parameters._
import parameters.*
implicit val sharding: ClusterSharding = akkaCluster.sharding
implicit val sender: CommandSender[F, ID] = ShardingCommandSender[F, ID]
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import akka.persistence.typed.state.scaladsl.{DurableStateBehavior, Effect}
import akka.persistence.typed.state.{RecoveryCompleted, RecoveryFailed}
import cats.effect.kernel.Async
import cats.effect.std.Dispatcher
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.show._
import endless.core.entity._
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.show.*
import endless.core.entity.*
import endless.core.interpret.DurableEntityT.{DurableEntityT, State}
import endless.core.interpret._
import endless.core.interpret.*
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.EntityPassivator
import endless.runtime.akka.data._
import endless.runtime.akka.data.*
import org.typelevel.log4cats.Logger

private[deploy] class DurableShardedRepositoryDeployer[F[
Expand Down Expand Up @@ -110,7 +110,7 @@ private[deploy] class DurableShardedRepositoryDeployer[F[
// run the effector asynchronously, as it can describe long-running processes
dispatcher.unsafeRunAndForget(handleSideEffect(state))
)
.thenReply(command.replyTo) { _: Option[S] =>
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
import akka.persistence.typed.{PersistenceId, RecoveryCompleted, RecoveryFailed}
import cats.effect.kernel.Async
import cats.effect.std.Dispatcher
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.show._
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.show.*
import endless.core.entity.{Effector, EntityNameProvider, Sharding, SideEffect}
import endless.core.event.EventApplier
import endless.core.interpret.{EntityT, SideEffectInterpreter}
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDCodec}
import endless.runtime.akka.EntityPassivator
import endless.runtime.akka.data.{Command, Reply}
import endless.runtime.akka.deploy.internal.EventSourcedShardedRepositoryDeployer._
import endless.runtime.akka.deploy.internal.EventSourcedShardedRepositoryDeployer.*
import org.typelevel.log4cats.Logger

private[deploy] class EventSourcedShardedRepositoryDeployer[F[
Expand Down Expand Up @@ -118,7 +118,7 @@ private[deploy] class EventSourcedShardedRepositoryDeployer[F[
]) => // run the effector asynchronously, as it can describe long-running processes
dispatcher.unsafeRunAndForget(handleSideEffect(state))
)
.thenReply(command.replyTo) { _: Option[S] =>
.thenReply(command.replyTo) { (_: Option[S]) =>
Reply(incomingCommand.replyEncoder.encode(reply))
}
.pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, Ent
import akka.util.Timeout
import cats.effect.kernel.{Async, Resource}
import cats.effect.std.Dispatcher
import endless.core.entity._
import endless.core.entity.*
import endless.core.interpret.RepositoryInterpreter
import endless.core.protocol.{CommandProtocol, CommandSender, EntityIDEncoder}
import endless.runtime.akka.ShardingCommandSender
import endless.runtime.akka.data._
import endless.runtime.akka.data.*
import endless.runtime.akka.deploy.AkkaCluster
import org.typelevel.log4cats.Logger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicReference
)
class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private val classToCompanionMapRef =
new AtomicReference[Map[Class[_], GeneratedMessageCompanion[_]]](Map.empty)
new AtomicReference[Map[Class[?], GeneratedMessageCompanion[?]]](Map.empty)

override def toBinary(o: AnyRef): Array[Byte] = o match {
case e: scalapb.GeneratedMessage => e.toByteArray
Expand All @@ -28,14 +28,14 @@ class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer

override def includeManifest: Boolean = true

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[?]]): AnyRef =
manifest match {
case Some(clazz) =>
// noinspection ScalaStyle
@scala.annotation.tailrec
def messageCompanion(
companion: GeneratedMessageCompanion[_] = null
): GeneratedMessageCompanion[_] = {
companion: GeneratedMessageCompanion[?] = null
): GeneratedMessageCompanion[?] = {
val classToCompanion = classToCompanionMapRef.get()
classToCompanion.get(clazz) match {
case Some(cachedCompanion) => cachedCompanion
Expand All @@ -46,7 +46,7 @@ class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer
.forName(clazz.getName + "$", true, clazz.getClassLoader)
.getField("MODULE$")
.get(())
.asInstanceOf[GeneratedMessageCompanion[_]]
.asInstanceOf[GeneratedMessageCompanion[?]]
else companion
if (
classToCompanionMapRef.compareAndSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package endless.runtime.akka.serializer
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.serialization.{BaseSerializer, SerializerWithStringManifest}
import cats.syntax.show._
import cats.syntax.show.*
import com.google.protobuf.ByteString
import endless.runtime.akka.data.Command
import endless.runtime.akka.serializer.CommandSerializer.ManifestKey
Expand Down
6 changes: 6 additions & 0 deletions akka-runtime/src/test/protobuf/dummy.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
syntax = "proto3";

import "scalapb/scalapb.proto";

option (scalapb.options) = {
scala3_sources: true
};

package endless.protobuf.test.proto;

message DummyCommand {
Expand Down
Loading

0 comments on commit fdc6dbf

Please sign in to comment.