From c73aab9f5bd9a4c31b90ecb1974818079d7a6e61 Mon Sep 17 00:00:00 2001 From: Yaroslav Klymko Date: Sat, 14 Oct 2023 23:17:37 +0200 Subject: [PATCH] improve code around ConsumeActionRecords --- .../kafka/journal/ConsumeActionRecords.scala | 75 ++++++++++--------- .../kafka/journal/Journals.scala | 2 +- .../kafka/journal/StreamActionRecords.scala | 41 +++++----- .../ConsRecordToActionRecord.scala | 3 +- .../kafka/journal/util/ConcurrentOf.scala | 4 +- .../journal/replicator/ReplicateRecords.scala | 2 +- 6 files changed, 65 insertions(+), 62 deletions(-) diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumeActionRecords.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumeActionRecords.scala index 985568190..ba4ef30a6 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumeActionRecords.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumeActionRecords.scala @@ -1,10 +1,10 @@ package com.evolutiongaming.kafka.journal -import cats.data.{NonEmptyList => Nel, NonEmptySet => Nes} +import cats.data.{NonEmptySet => Nes} import cats.effect.Resource import cats.syntax.all._ import cats.~> -import com.evolutiongaming.catshelper.{BracketThrowable, Log} +import com.evolutiongaming.catshelper.BracketThrowable import com.evolutiongaming.kafka.journal.conversions.ConsRecordToActionRecord import com.evolutiongaming.kafka.journal.util.StreamHelper._ import com.evolutiongaming.skafka.{Offset, Partition, TopicPartition} @@ -18,53 +18,54 @@ trait ConsumeActionRecords[F[_]] { object ConsumeActionRecords { - def apply[F[_] : BracketThrowable]( - consumer: Resource[F, Journals.Consumer[F]], - log: Log[F])(implicit + def apply[F[_]: BracketThrowable]( + consumer: Resource[F, Journals.Consumer[F]])(implicit consRecordToActionRecord: ConsRecordToActionRecord[F] ): ConsumeActionRecords[F] = { - (key: Key, partition: Partition, from: Offset) => { - - val topicPartition = TopicPartition(topic = key.topic, partition = partition) - - def seek(consumer: Journals.Consumer[F]) = { - for { - _ <- consumer.assign(Nes.of(topicPartition)) - _ <- consumer.seek(topicPartition, from) - _ <- log.debug(s"$key consuming from $partition:$from") - } yield {} - } - - def filter(records: List[Nel[ConsRecord]]) = { + class Main + new Main with ConsumeActionRecords[F] { + def apply(key: Key, partition: Partition, from: Offset) = { for { - records <- records - record <- records.toList if record.key.exists { _.value === key.id } + consumer <- consumer.toStream + _ <- Stream.lift { + val topicPartition = TopicPartition(topic = key.topic, partition = partition) + for { + _ <- consumer.assign(Nes.of(topicPartition)) + a <- consumer.seek(topicPartition, from) + } yield a + } + records <- Stream.repeat { + for { + records <- consumer.poll + actions <- records + .values + .values + .flatMap { records => + records + .toList + .filter { record => + record.key.exists { _.value === key.id } + } + } + .toList + .traverseFilter { record => consRecordToActionRecord(record) } + } yield actions + } + record <- records.toStream1[F] } yield record } - - def poll(consumer: Journals.Consumer[F]) = { - for { - records0 <- consumer.poll - records = filter(records0.values.values.toList) - actions <- records.traverseFilter { a => consRecordToActionRecord(a).value } - } yield actions - } - - for { - consumer <- consumer.toStream - _ <- seek(consumer).toStream - records <- Stream.repeat(poll(consumer)) - record <- records.toStream1[F] - } yield record } } + private sealed abstract class MapK implicit class ConsumeActionRecordsOps[F[_]](val self: ConsumeActionRecords[F]) extends AnyVal { def mapK[G[_]](fg: F ~> G, gf: G ~> F): ConsumeActionRecords[G] = { - (key: Key, partition: Partition, from1: Offset) => { - self(key, partition, from1).mapK[G](fg, gf) + new MapK with ConsumeActionRecords[G] { + def apply(key: Key, partition: Partition, from: Offset) = { + self(key, partition, from).mapK[G](fg, gf) + } } } } diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journals.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journals.scala index 46ef7eba9..4bf97c149 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journals.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journals.scala @@ -148,7 +148,7 @@ object Journals { apply[F]( eventual = eventualJournal, - consumeActionRecords = ConsumeActionRecords[F](consumer, log), + consumeActionRecords = ConsumeActionRecords[F](consumer), produce = Produce[F](producer, origin), headCache = headCache, log = log, diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/StreamActionRecords.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/StreamActionRecords.scala index d6553f0c5..65fb471e0 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/StreamActionRecords.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/StreamActionRecords.scala @@ -46,32 +46,33 @@ object StreamActionRecords { .max(offsetReplicated) .fold(Offset.min.pure[F]) { _.inc[F] } .toStream - result <- consumeActionRecords(key, partition, fromOffset).stateless { record => + result <- consumeActionRecords + .apply(key, partition, fromOffset) + .stateless { record => - def take(action: Action.User) = { - (true, Stream[F].single(record.copy(action = action))) - } + def take(action: Action.User) = { + (true, Stream[F].single(record.copy(action = action))) + } - def skip = { - (true, Stream[F].empty[ActionRecord[Action.User]]) - } + def skip = { + (true, Stream[F].empty[ActionRecord[Action.User]]) + } - def stop = { - (false, Stream[F].empty[ActionRecord[Action.User]]) - } + def stop = { + (false, Stream[F].empty[ActionRecord[Action.User]]) + } - if (record.offset > max) { - stop - } else { - record.action match { - case a: Action.Append => if (a.range.to < from) skip else take(a) - case a: Action.Mark => if (a.id === marker.id) stop else skip - case a: Action.Delete => take(a) - case a: Action.Purge => take(a) + if (record.offset > max) { + stop + } else { + record.action match { + case a: Action.Append => if (a.range.to < from) skip else take(a) + case a: Action.Mark => if (a.id === marker.id) stop else skip + case a: Action.Delete => take(a) + case a: Action.Purge => take(a) + } } } - } - } yield result } } diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/conversions/ConsRecordToActionRecord.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/conversions/ConsRecordToActionRecord.scala index ef0d4dd15..4fe288fac 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/conversions/ConsRecordToActionRecord.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/conversions/ConsRecordToActionRecord.scala @@ -10,7 +10,7 @@ import com.evolutiongaming.kafka.journal.util.CatsHelper._ trait ConsRecordToActionRecord[F[_]] { - def apply(consRecord: ConsRecord): OptionT[F, ActionRecord[Action]] + def apply(consRecord: ConsRecord): F[Option[ActionRecord[Action]]] } @@ -67,7 +67,6 @@ object ConsRecordToActionRecord { .adaptError { case e => JournalError(s"ConsRecordToActionRecord failed for $consRecord: $e", e) } - .toOptionT } } } diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/util/ConcurrentOf.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/util/ConcurrentOf.scala index fb787c1f3..498eb4b85 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/util/ConcurrentOf.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/util/ConcurrentOf.scala @@ -1,7 +1,9 @@ package com.evolutiongaming.kafka.journal.util import cats.Monad -import cats.effect._ +import cats.effect.{Concurrent, Fiber, Poll} +import cats.effect.kernel.{Async, Outcome, Unique} +import cats.syntax.all._ import scala.annotation.nowarn diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala index d0083fb15..823b6d142 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala @@ -157,7 +157,7 @@ object ReplicateRecords { for { records <- records .toList - .traverseFilter { a => consRecordToActionRecord(a).value } + .traverseFilter { record => consRecordToActionRecord(record) } result <- records .toNel .foldMapM { records => apply(records) }