Skip to content

Commit

Permalink
improve code around ConsumeActionRecords
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Oct 16, 2023
1 parent 7f9e01b commit c73aab9
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.evolutiongaming.kafka.journal

import cats.data.{NonEmptyList => Nel, NonEmptySet => Nes}
import cats.data.{NonEmptySet => Nes}
import cats.effect.Resource
import cats.syntax.all._
import cats.~>
import com.evolutiongaming.catshelper.{BracketThrowable, Log}
import com.evolutiongaming.catshelper.BracketThrowable
import com.evolutiongaming.kafka.journal.conversions.ConsRecordToActionRecord
import com.evolutiongaming.kafka.journal.util.StreamHelper._
import com.evolutiongaming.skafka.{Offset, Partition, TopicPartition}
Expand All @@ -18,53 +18,54 @@ trait ConsumeActionRecords[F[_]] {

object ConsumeActionRecords {

def apply[F[_] : BracketThrowable](
consumer: Resource[F, Journals.Consumer[F]],
log: Log[F])(implicit
def apply[F[_]: BracketThrowable](
consumer: Resource[F, Journals.Consumer[F]])(implicit
consRecordToActionRecord: ConsRecordToActionRecord[F]
): ConsumeActionRecords[F] = {
(key: Key, partition: Partition, from: Offset) => {

val topicPartition = TopicPartition(topic = key.topic, partition = partition)

def seek(consumer: Journals.Consumer[F]) = {
for {
_ <- consumer.assign(Nes.of(topicPartition))
_ <- consumer.seek(topicPartition, from)
_ <- log.debug(s"$key consuming from $partition:$from")
} yield {}
}

def filter(records: List[Nel[ConsRecord]]) = {
class Main
new Main with ConsumeActionRecords[F] {
def apply(key: Key, partition: Partition, from: Offset) = {
for {
records <- records
record <- records.toList if record.key.exists { _.value === key.id }
consumer <- consumer.toStream
_ <- Stream.lift {
val topicPartition = TopicPartition(topic = key.topic, partition = partition)
for {
_ <- consumer.assign(Nes.of(topicPartition))
a <- consumer.seek(topicPartition, from)
} yield a
}
records <- Stream.repeat {
for {
records <- consumer.poll
actions <- records
.values
.values
.flatMap { records =>
records
.toList
.filter { record =>
record.key.exists { _.value === key.id }
}
}
.toList
.traverseFilter { record => consRecordToActionRecord(record) }
} yield actions
}
record <- records.toStream1[F]
} yield record
}

def poll(consumer: Journals.Consumer[F]) = {
for {
records0 <- consumer.poll
records = filter(records0.values.values.toList)
actions <- records.traverseFilter { a => consRecordToActionRecord(a).value }
} yield actions
}

for {
consumer <- consumer.toStream
_ <- seek(consumer).toStream
records <- Stream.repeat(poll(consumer))
record <- records.toStream1[F]
} yield record
}
}

private sealed abstract class MapK

implicit class ConsumeActionRecordsOps[F[_]](val self: ConsumeActionRecords[F]) extends AnyVal {

def mapK[G[_]](fg: F ~> G, gf: G ~> F): ConsumeActionRecords[G] = {
(key: Key, partition: Partition, from1: Offset) => {
self(key, partition, from1).mapK[G](fg, gf)
new MapK with ConsumeActionRecords[G] {
def apply(key: Key, partition: Partition, from: Offset) = {
self(key, partition, from).mapK[G](fg, gf)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object Journals {

apply[F](
eventual = eventualJournal,
consumeActionRecords = ConsumeActionRecords[F](consumer, log),
consumeActionRecords = ConsumeActionRecords[F](consumer),
produce = Produce[F](producer, origin),
headCache = headCache,
log = log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,33 @@ object StreamActionRecords {
.max(offsetReplicated)
.fold(Offset.min.pure[F]) { _.inc[F] }
.toStream
result <- consumeActionRecords(key, partition, fromOffset).stateless { record =>
result <- consumeActionRecords
.apply(key, partition, fromOffset)
.stateless { record =>

def take(action: Action.User) = {
(true, Stream[F].single(record.copy(action = action)))
}
def take(action: Action.User) = {
(true, Stream[F].single(record.copy(action = action)))
}

def skip = {
(true, Stream[F].empty[ActionRecord[Action.User]])
}
def skip = {
(true, Stream[F].empty[ActionRecord[Action.User]])
}

def stop = {
(false, Stream[F].empty[ActionRecord[Action.User]])
}
def stop = {
(false, Stream[F].empty[ActionRecord[Action.User]])
}

if (record.offset > max) {
stop
} else {
record.action match {
case a: Action.Append => if (a.range.to < from) skip else take(a)
case a: Action.Mark => if (a.id === marker.id) stop else skip
case a: Action.Delete => take(a)
case a: Action.Purge => take(a)
if (record.offset > max) {
stop
} else {
record.action match {
case a: Action.Append => if (a.range.to < from) skip else take(a)
case a: Action.Mark => if (a.id === marker.id) stop else skip
case a: Action.Delete => take(a)
case a: Action.Purge => take(a)
}
}
}
}

} yield result
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.evolutiongaming.kafka.journal.util.CatsHelper._

trait ConsRecordToActionRecord[F[_]] {

def apply(consRecord: ConsRecord): OptionT[F, ActionRecord[Action]]
def apply(consRecord: ConsRecord): F[Option[ActionRecord[Action]]]
}


Expand Down Expand Up @@ -67,7 +67,6 @@ object ConsRecordToActionRecord {
.adaptError { case e =>
JournalError(s"ConsRecordToActionRecord failed for $consRecord: $e", e)
}
.toOptionT
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.evolutiongaming.kafka.journal.util

import cats.Monad
import cats.effect._
import cats.effect.{Concurrent, Fiber, Poll}
import cats.effect.kernel.{Async, Outcome, Unique}
import cats.syntax.all._

import scala.annotation.nowarn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object ReplicateRecords {
for {
records <- records
.toList
.traverseFilter { a => consRecordToActionRecord(a).value }
.traverseFilter { record => consRecordToActionRecord(record) }
result <- records
.toNel
.foldMapM { records => apply(records) }
Expand Down

0 comments on commit c73aab9

Please sign in to comment.