Skip to content

Commit

Permalink
remove usage of loadingCache in TopicCache in order to reduce allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Dec 13, 2023
1 parent 1d406ce commit 9212f0a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import com.evolutiongaming.retry.Retry.implicits._
import com.evolutiongaming.retry.{Sleep, Strategy}
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerRecords}
import com.evolutiongaming.skafka.{Offset, Partition, Topic, TopicPartition}
import com.evolution.scache.Cache

import scala.concurrent.duration._

Expand Down Expand Up @@ -94,20 +93,26 @@ object TopicCache {
partitions <- consumer
.use { _.partitions(topic) }
.toResource
cache <- Cache.loading[F, Partition, PartitionCache[F]](partitions = 1.some)
partitionCacheOf = (partition: Partition) => {
cache.getOrUpdateResource(partition) {
PartitionCache.of(
maxSize = config.partition.maxSize,
dropUponLimit = config.partition.dropUponLimit,
timeout = config.timeout)

caches <- partitions
.toList
.parTraverse { partition =>
PartitionCache
.of(
maxSize = config.partition.maxSize,
dropUponLimit = config.partition.dropUponLimit,
timeout = config.timeout)
.map { partitionCache =>
(partition, partitionCache)
}
}
}
remove = partitions
.foldMapM { partition =>

cachesMap = caches.toMap

remove = caches
.foldMapM { case (partition, cache) =>
for {
offset <- eventual.pointer(topic, partition)
cache <- partitionCacheOf(partition)
result <- offset.foldMapM { offset =>
cache
.remove(offset)
Expand All @@ -125,28 +130,18 @@ object TopicCache {
}
}
_ <- remove.toResource
pointers = {
cache
.values1
.flatMap { values =>
values
.toList
.traverseFilter { case (partition, cache) =>
cache
.toOption
.traverse { cache =>
cache
.offset
.flatMap {
case Some(offset) => offset.inc[F]
case None => Offset.min.pure[F]
}
.map { offset => (partition, offset) }
}
}
}
.map { _.toMap }
}
pointers = caches
.traverse { case (partition, cache) =>
cache
.offset
.flatMap {
case Some(offset) => offset.inc[F]
case None => Offset.min.pure[F]
}
.map { offset => (partition, offset) }
}
.map { _.toMap }

_ <- HeadCacheConsumption
.apply(
topic = topic,
Expand Down Expand Up @@ -174,9 +169,12 @@ object TopicCache {
}
}
.flatMap { records =>
partitionCacheOf
.apply(topicPartition.partition)
.flatMap { cache =>
val partition = topicPartition.partition
cachesMap
.get(partition)
.fold {
JournalError(s"invalid partition: $partition").raiseError[F, Sample]
} { cache =>
cache
.add(records)
.map {
Expand Down Expand Up @@ -235,7 +233,7 @@ object TopicCache {
_ <- metrics.foldMapM { metrics =>
val result = for {
_ <- Temporal[F].sleep(1.minute)
a <- cache.foldMap { case (_, value) => value.foldMapM { _.meters } }
a <- caches.foldMapM { case (_, value) => value.meters }
a <- metrics.meters(topic, entries = a.entries, listeners = a.listeners)
} yield a
result
Expand All @@ -249,7 +247,13 @@ object TopicCache {
new Main with TopicCache[F] {

def get(id: String, partition: Partition, offset: Offset) = {
partitionCacheOf(partition).flatMap { _.get(id, offset) }
cachesMap
.get(partition)
.fold {
JournalError(s"invalid partition: $partition").raiseError[F, PartitionCache.Result[F]]
} { cache =>
cache.get(id, offset)
}
}
}
}
Expand Down Expand Up @@ -566,7 +570,7 @@ object TopicCache {
}
}

private implicit class SetOps[A](val self: Set[A]) extends AnyVal {
private final implicit class SetOps[A](val self: Set[A]) extends AnyVal {

/** Aggregate all values in a set to something else using [[Monoid]].
*
Expand Down Expand Up @@ -595,4 +599,11 @@ object TopicCache {

}

private final implicit class MapOps[K, V](val self: Map[K, V]) extends AnyVal {
def foldMapM[F[_]: Monad, A: Monoid](f: (K, V) => F[A]): F[A] = {
self.foldLeft(Monoid[A].empty.pure[F]) { case (a, (k, v)) =>
a.flatMap { a => f(k, v).map { b => a.combine(b) } }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class HeadCacheSpec extends AsyncWordSpec with Matchers {

val eventual = HeadCache.Eventual.empty[IO]

val state = TestConsumer.State.empty
val state = TestConsumer.State(topics = Map((topic, List(partition))))

val key = Key(id = "id", topic = topic)
val result = for {
Expand Down Expand Up @@ -186,10 +186,9 @@ class HeadCacheSpec extends AsyncWordSpec with Matchers {
}

"timeout" in {
val consumer = TopicCache.Consumer.empty[IO]
val headCache = headCacheOf(
HeadCache.Eventual.empty,
consumer.pure[IO].toResource,
consumerEmpty.pure[IO].toResource,
config.copy(timeout = 10.millis))
val result = headCache.use { headCache =>
val key = Key(id = "id", topic = topic)
Expand Down Expand Up @@ -222,10 +221,9 @@ class HeadCacheSpec extends AsyncWordSpec with Matchers {
}

"not leak resources on release" in {
val consumer = TopicCache.Consumer.empty[IO]
val headCache = headCacheOf(
HeadCache.Eventual.empty,
consumer.pure[IO].toResource)
consumerEmpty.pure[IO].toResource)
val result = for {
a <- headCache.use { headCache =>
val key = Key(id = "id", topic = topic)
Expand Down Expand Up @@ -280,6 +278,14 @@ object HeadCacheSpec {
implicit val LogIO: Log[IO] = Log.empty[IO]


val consumerEmpty = new TopicCache.Consumer[IO] {
def assign(topic: Topic, partitions: Nes[Partition]) = IO.unit
def seek(topic: Topic, offsets: Nem[Partition, Offset]) = IO.unit
def poll = ConsumerRecords.empty[String, Unit].pure[IO]
def partitions(topic: Topic) = Set(partition).pure[IO]
}


def headCacheOf(
eventual: HeadCache.Eventual[IO],
consumer: Resource[IO, TopicCache.Consumer[IO]],
Expand Down

0 comments on commit 9212f0a

Please sign in to comment.