Skip to content

Commit

Permalink
Add storage client using Redisson (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Feb 13, 2024
1 parent ab599b1 commit 1b76d54
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 1 deletion.
14 changes: 14 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val zioCatsInteropVersion = "23.1.0.0"
val sttpVersion = "3.9.1"
val calibanVersion = "2.4.3"
val redis4catsVersion = "1.5.2"
val redissonVersion = "3.23.0"
val scalaKryoVersion = "1.0.2"
val testContainersVersion = "0.40.9"

Expand Down Expand Up @@ -53,6 +54,7 @@ lazy val root = project
entities,
healthK8s,
storageRedis,
storageRedisson,
serializationKryo,
grpcProtocol,
examples
Expand Down Expand Up @@ -126,6 +128,18 @@ lazy val storageRedis = project
)
)

lazy val storageRedisson = project
.in(file("storage-redisson"))
.settings(name := "shardcake-storage-redisson")
.settings(commonSettings)
.dependsOn(core)
.settings(
libraryDependencies ++=
Seq(
"org.redisson" % "redisson" % redissonVersion
)
)

lazy val serializationKryo = project
.in(file("serialization-kryo"))
.settings(name := "shardcake-serialization-kryo")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.devsisters.shardcake

/**
* The configuration for the Redis storage implementation.
* @param assignmentsKey the key to store shard assignments
* @param podsKey the key to store registered pods
*/
case class RedisConfig(assignmentsKey: String, podsKey: String)

object RedisConfig {
val default: RedisConfig = RedisConfig(assignmentsKey = "shard_assignments", podsKey = "pods")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.devsisters.shardcake

import scala.jdk.CollectionConverters._

import com.devsisters.shardcake.interfaces.Storage
import org.redisson.api.RedissonClient
import org.redisson.api.listener.MessageListener
import org.redisson.client.codec.StringCodec
import zio.stream.ZStream
import zio.{ Queue, Task, Unsafe, ZIO, ZLayer }

object StorageRedis {

/**
* A layer that returns a Storage implementation using Redis
*/
val live: ZLayer[RedissonClient with RedisConfig, Nothing, Storage] =
ZLayer {
for {
config <- ZIO.service[RedisConfig]
redisClient <- ZIO.service[RedissonClient]
assignmentsMap = redisClient.getMap[String, String](config.assignmentsKey)
podsMap = redisClient.getMap[String, String](config.podsKey)
assignmentsTopic = redisClient.getTopic(config.assignmentsKey, StringCodec.INSTANCE)
} yield new Storage {
def getAssignments: Task[Map[ShardId, Option[PodAddress]]] =
ZIO
.fromCompletionStage(assignmentsMap.readAllEntrySetAsync())
.map(
_.asScala
.flatMap(entry =>
entry.getKey.toIntOption.map(
_ -> (if (entry.getValue.isEmpty) None
else PodAddress(entry.getValue))
)
)
.toMap
)
def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] =
ZIO.fromCompletionStage(assignmentsMap.putAllAsync(assignments.map { case (k, v) =>
k.toString -> v.fold("")(_.toString)
}.asJava)) *>
ZIO.fromCompletionStage(assignmentsTopic.publishAsync("ping")).unit
def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] =
ZStream.unwrap {
for {
queue <- Queue.unbounded[String]
runtime <- ZIO.runtime[Any]
_ <- ZIO.fromCompletionStage(
assignmentsTopic.addListenerAsync(
classOf[String],
new MessageListener[String] {
def onMessage(channel: CharSequence, msg: String): Unit =
Unsafe.unsafe(implicit unsafe => runtime.unsafe.run(queue.offer(msg)))
}
)
)
} yield ZStream.fromQueueWithShutdown(queue).mapZIO(_ => getAssignments)
}
def getPods: Task[Map[PodAddress, Pod]] =
ZIO
.fromCompletionStage(podsMap.readAllEntrySetAsync())
.map(
_.asScala
.flatMap(entry => PodAddress(entry.getKey).map(address => address -> Pod(address, entry.getValue)))
.toMap
)
def savePods(pods: Map[PodAddress, Pod]): Task[Unit] =
ZIO.fromCompletionStage(podsMap.putAllAsync(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Storage
import com.dimafeng.testcontainers.GenericContainer
import org.redisson.Redisson
import org.redisson.config.{ Config => RedissonConfig }
import org.redisson.api.RedissonClient
import zio.Clock.ClockLive
import zio._
import zio.stream.ZStream
import zio.test.TestAspect.sequential
import zio.test._

object StorageRedisSpec extends ZIOSpecDefault {
val container: ZLayer[Any, Nothing, GenericContainer] =
ZLayer.scoped {
ZIO.acquireRelease {
ZIO.attemptBlocking {
val container = new GenericContainer(dockerImage = "redis:6.2.5", exposedPorts = Seq(6379))
container.start()
container
}.orDie
}(container => ZIO.attemptBlocking(container.stop()).orDie)
}

val redis: ZLayer[GenericContainer, Throwable, RedissonClient] =
ZLayer {
for {
container <- ZIO.service[GenericContainer]
uri = s"redis://foobared@${container.host}:${container.mappedPort(container.exposedPorts.head)}"
redissonConfig = new RedissonConfig()
_ = redissonConfig.useSingleServer().setAddress(uri)
client = Redisson.create(redissonConfig)
} yield client
}

def spec: Spec[TestEnvironment with Scope, Any] =
suite("StorageRedisSpec")(
test("save and get pods") {
val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0"))
.map(p => p.address -> p)
.toMap
for {
_ <- ZIO.serviceWithZIO[Storage](_.savePods(expected))
actual <- ZIO.serviceWithZIO[Storage](_.getPods)
} yield assertTrue(expected == actual)
},
test("save and get assignments") {
val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None)
for {
_ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected))
actual <- ZIO.serviceWithZIO[Storage](_.getAssignments)
} yield assertTrue(expected == actual)
},
test("assignments stream") {
val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None)
for {
p <- Promise.make[Nothing, Map[Int, Option[PodAddress]]]
_ <- ZStream.serviceWithStream[Storage](_.assignmentsStream).runForeach(p.succeed(_)).fork
_ <- ClockLive.sleep(1 second)
_ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected))
actual <- p.await
} yield assertTrue(expected == actual)
}
).provideLayerShared(
container >>> redis ++ ZLayer.succeed(RedisConfig.default) >>> StorageRedis.live
) @@ sequential
}
2 changes: 1 addition & 1 deletion vuepress/docs/docs/customization.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait Storage {

For testing, you can use the `Storage.memory` layer that keeps data in memory.

Shardcake provides an implementation of `Storage` using Redis. To use it, add the following dependency:
Shardcake provides an implementation of `Storage` using Redis with the Redis4cats library (there's also an alternative using Redisson). To use it, add the following dependency:
```scala
libraryDependencies += "com.devsisters" %% "shardcake-storage-redis" % "2.1.0"
```
Expand Down

0 comments on commit 1b76d54

Please sign in to comment.