Skip to content

Commit

Permalink
use Kafka topic for pointers
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Nov 1, 2023
1 parent f7fd0c7 commit d444f84
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package com.evolutiongaming.kafka.journal

import cats._
import cats.effect._
import cats.effect.syntax.all._
import cats.effect.kernel.Async
import cats.effect.syntax.all._
import cats.syntax.all._
import com.evolution.scache.{Cache, ExpiringCache}
import com.evolutiongaming.catshelper._
import com.evolutiongaming.kafka.journal.PartitionCache.Result
import com.evolutiongaming.kafka.journal.conversions.ConsRecordToActionHeader
import com.evolutiongaming.kafka.journal.eventual.{EventualJournal, TopicPointers}
import com.evolution.scache.{Cache, ExpiringCache}
import com.evolutiongaming.skafka.consumer.ConsumerConfig
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig}
import com.evolutiongaming.skafka.{Offset, Partition, Topic}
import com.evolutiongaming.smetrics.MetricsHelper._
import com.evolutiongaming.smetrics._

import java.nio.ByteBuffer
import scala.concurrent.duration._

/** Metainfo of events written to Kafka, but not yet replicated to Cassandra.
Expand Down Expand Up @@ -107,13 +108,38 @@ object HeadCache {
eventualJournal: EventualJournal[F],
metrics: Option[HeadCacheMetrics[F]]
): Resource[F, HeadCache[F]] = {

import com.evolutiongaming.skafka.FromBytes

implicit val partitionFromBytes: FromBytes[F, Partition] = (bytes, _) =>
for {
int <- Sync[F].delay { ByteBuffer.wrap(bytes).getInt() }
partition <- Partition.of(int)
} yield partition

implicit val offsetFromBytes: FromBytes[F, Offset] = (bytes, _) =>
for {
long <- Sync[F].delay { ByteBuffer.wrap(bytes).getLong() }
partition <- Offset.of(long)
} yield partition

for {
log <- LogOf[F].apply(HeadCache.getClass).toResource
result <- HeadCache.of(
Eventual(eventualJournal),
log,
TopicCache.Consumer.of[F](consumerConfig),
metrics)
metrics,
pointer = KafkaConsumerOf[F].apply[Partition, Offset](
consumerConfig.copy(
groupId = none,
autoCommit = true,
autoCommitInterval = 5.seconds.some,
autoOffsetReset = AutoOffsetReset.Latest,
fetchMinBytes = 1,
)
)
)
result <- result.withFence
} yield {
result.withLog(log)
Expand All @@ -132,7 +158,7 @@ object HeadCache {
* debug logging will be affected by this. One needs to call
* [[HeadCache#withLog]] if debug logging for [[HeadCache]] is required.
* @param consumer
* Kakfa data source factory. The reason why it is factory (i.e.
* Kafka data source factory. The reason why it is factory (i.e.
* `Resource`) is that [[HeadCache]] will try to recreate consumer in case
* of the failure.
* @param metrics
Expand All @@ -153,7 +179,8 @@ object HeadCache {
log: Log[F],
consumer: Resource[F, TopicCache.Consumer[F]],
metrics: Option[HeadCacheMetrics[F]],
config: HeadCacheConfig = HeadCacheConfig.default
config: HeadCacheConfig = HeadCacheConfig.default,
pointer: Resource[F, KafkaConsumer[F, Partition, Offset]]
): Resource[F, HeadCache[F]] = {

val consRecordToActionHeader = ConsRecordToActionHeader[F]
Expand All @@ -179,7 +206,9 @@ object HeadCache {
consumer,
config,
consRecordToActionHeader,
metrics.map { _.headCache })
metrics.map { _.headCache },
pointer
)
.map { cache =>
metrics
.fold(cache) { metrics => cache.withMetrics(topic, metrics.headCache) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.evolutiongaming.kafka.journal

import cats._
import cats.data.{NonEmptyMap => Nem, NonEmptySet => Nes}
import cats.data.{NonEmptyList => Nel, NonEmptyMap => Nem, NonEmptySet => Nes}
import cats.effect._
import cats.effect.syntax.all._
import cats.syntax.all._
import com.evolutiongaming.catshelper._
import com.evolution.scache.Cache
import com.evolutiongaming.catshelper.ParallelHelper._
import com.evolutiongaming.catshelper._
import com.evolutiongaming.kafka.journal.HeadCache.Eventual
import com.evolutiongaming.kafka.journal.conversions.ConsRecordToActionHeader
import com.evolutiongaming.kafka.journal.util.SkafkaHelper._
import com.evolutiongaming.kafka.journal.HeadCache.Eventual
import com.evolutiongaming.random.Random
import com.evolutiongaming.retry.Retry.implicits._
import com.evolutiongaming.retry.{Sleep, Strategy}
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerRecords}
import com.evolutiongaming.skafka.{Offset, Partition, Topic, TopicPartition}
import com.evolution.scache.Cache

import scala.concurrent.duration._

Expand Down Expand Up @@ -84,9 +84,9 @@ object TopicCache {
consumer: Resource[F, Consumer[F]],
config: HeadCacheConfig,
consRecordToActionHeader: ConsRecordToActionHeader[F],
metrics: Option[HeadCache.Metrics[F]]
metrics: Option[HeadCache.Metrics[F]],
pointer: Resource[F, KafkaConsumer[F, Partition, Offset]],
): Resource[F, TopicCache[F]] = {

for {
consumer <- consumer
.map { _.withLog(log) }
Expand Down Expand Up @@ -125,6 +125,50 @@ object TopicCache {
}
}
_ <- remove.toResource

pointer <- pointer
subscribePointer = for {
partitions <- pointer.partitions(pointerTopic)
partitions <- Nel
.fromList(partitions.toList.map { partition => TopicPartition(pointerTopic, partition) })
.map(_.toNes)
.liftTo[F](new RuntimeException(s"no partitions in topic $pointerTopic"))
_ <- pointer.assign(partitions)
} yield {}
_ <- subscribePointer.toResource.whenA(topic == rbowTopic)
pollPointers = pointer
.poll(1.second)
.flatMap { records =>
val offsets = for {
case (_, records) <- records.values.toList
record <- records.toList
partition <- record.key
offset <- record.value
} yield partition.value -> offset.value
offsets
.groupBy { case (partition, _) => partition }
.map { case (partition, offsets) => partition -> offsets.map(_._2).maxBy(_.value) }
.toList
.foldMapM { case (partition, offset) =>
for {
cache <- partitionCacheOf(partition)
result <- cache
.remove(offset)
.map { diff =>
diff.foldMap { a => Sample(a.value) }
}
} yield result
}
}
.flatMap { sample =>
sample
.avg
.foldMapM { diff =>
metrics.foldMapM { _.storage(topic, diff) }
}
.as(sample.count)
}

pointers = {
cache
.values1
Expand Down Expand Up @@ -225,13 +269,24 @@ object TopicCache {
.exponential(10.millis)
.cap(3.seconds)
.jitter(random)
_ <- pollPointers
.flatMap {
case 0 => Sleep[F].sleep(config.removeInterval)
case _ => Applicative[F].unit
}
.retry(strategy) // probably not needed here
.handleErrorWith { a => log.error(s"remove poll failed, error: $a", a) }
.foreverM[Unit]
.background
.whenA(topic == rbowTopic)
_ <- Sleep[F]
.sleep(config.removeInterval)
.productR { remove }
.retry(strategy)
.handleErrorWith { a => log.error(s"remove failed, error: $a", a) }
.foreverM[Unit]
.background
.whenA(topic != rbowTopic)
_ <- metrics.foldMapM { metrics =>
val result = for {
_ <- Temporal[F].sleep(1.minute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ package object journal {
type ConsRecord = ConsumerRecord[String, ByteVector]

type ConsRecords = ConsumerRecords[String, ByteVector]

val rbowTopic = "rbow-journal-4.PlayerBetting"
val pointerTopic = "kafka-journal-offset-temp"
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ object HeadCacheSpec {
config = config,
eventual = eventual,
consumer = consumer,
metrics = metrics.some)
metrics = metrics.some,
pointer = ???
)
} yield headCache
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.evolutiongaming.skafka.consumer.{ConsumerConfig, ConsumerMetrics}
import com.evolutiongaming.skafka.{ClientId, Topic, Bytes => _}
import com.evolutiongaming.smetrics.CollectorRegistry
import com.evolution.scache.CacheMetrics
import com.evolutiongaming.skafka.producer.{Acks, ProducerConfig}
import scodec.bits.ByteVector

import scala.concurrent.duration._
Expand Down Expand Up @@ -59,7 +60,7 @@ object Replicator {

def of[
F[_]
: Temporal : Parallel
: Async : Parallel
: Runtime : FromTry : ToTry : Fail : LogOf
: KafkaConsumerOf : MeasureDuration
: JsonCodec
Expand All @@ -72,11 +73,27 @@ object Replicator {

val topicReplicator: Topic => Resource[F, F[Outcome[F, Throwable, Unit]]] =
(topic: Topic) => {
val pointerConfig = topic match {
case theTopic if theTopic == rbowTopic =>
TopicReplicator.ConsumerOf.PointerConfig(
topic = pointerTopic,
config = ProducerConfig(
common = config.kafka.consumer.common,
batchSize = 0,
acks = Acks.One,
retries = 0,
)
).some
case _ => none
}

val consumer = TopicReplicator.ConsumerOf.of[F](
topic,
config.kafka.consumer,
config.pollTimeout,
hostName)
hostName,
pointerConfig
)

val metrics1 = metrics
.flatMap { _.replicator }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import java.time.Instant
import cats.Applicative
import cats.data.{NonEmptyMap => Nem}
import cats.effect.kernel.Concurrent
import cats.effect.{Clock, Ref}
import cats.effect.{Clock, Ref, Sync}
import cats.syntax.all._
import com.evolutiongaming.catshelper.ClockHelper._
import com.evolutiongaming.catshelper.DataHelper._
import com.evolutiongaming.kafka.journal.util.TemporalHelper._
import com.evolutiongaming.kafka.journal.KafkaConsumer
import com.evolutiongaming.kafka.journal.{KafkaConsumer, KafkaProducer}
import com.evolutiongaming.skafka._
import com.evolutiongaming.skafka.producer.ProducerRecord
import com.evolutiongaming.skafka.ToBytes

import java.nio.ByteBuffer
import scala.collection.immutable.SortedMap
import scala.concurrent.duration._

Expand Down Expand Up @@ -77,4 +80,39 @@ object TopicCommit {
}
}
}
}

def pointer[F[_]: Sync](
topic: Topic,
producer: KafkaProducer[F],
commit: TopicCommit[F]
): TopicCommit[F] =
new TopicCommit[F] {

implicit val partitionToBytes: ToBytes[F, Partition] =
(partition: Partition, _) => Sync[F].delay { ByteBuffer.allocate(4).putInt(partition.value).array() }

implicit val offsetToBytes: ToBytes[F, Offset] =
(offset: Offset, _) => Sync[F].delay { ByteBuffer.allocate(8).putLong(offset.value).array() }

override def apply(offsets: Nem[Partition, Offset]): F[Unit] = {

val commitPointers = offsets.toNel.toList.traverse {
case (partition, offset) =>
val record = new ProducerRecord[Partition, Offset](
topic = topic,
partition = partition.some, // manually setting partition for imitating journal topic
key = partition.some,
value = offset.some,
)
producer.send(record)
}

val commitOffsets = commit(offsets)

for {
_ <- commitPointers
_ <- commitOffsets
} yield {}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.evolutiongaming.kafka.journal.util.SkafkaHelper._
import com.evolutiongaming.retry.Sleep
import com.evolutiongaming.skafka.{Metadata, Offset, Partition, Topic}
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig}
import com.evolutiongaming.skafka.producer.ProducerConfig
import scodec.bits.ByteVector

import java.time.Instant
Expand Down Expand Up @@ -231,11 +232,17 @@ object TopicReplicator {

object ConsumerOf {

def of[F[_]: Concurrent : KafkaConsumerOf : FromTry : Clock](
final case class PointerConfig(
topic: Topic,
config: ProducerConfig
)

def of[F[_]: Async : KafkaConsumerOf : FromTry : ToTry : Clock](
topic: Topic,
config: ConsumerConfig,
pollTimeout: FiniteDuration,
hostName: Option[HostName]
hostName: Option[HostName],
pointerConfig: Option[PointerConfig] = None,
): Resource[F, TopicConsumer[F]] = {

val groupId = {
Expand All @@ -261,6 +268,13 @@ object TopicReplicator {
metadata = hostName.fold { Metadata.empty } { _.value }
commit = TopicCommit(topic, metadata, consumer)
commit <- TopicCommit.delayed(5.seconds, commit).toResource
commit <- pointerConfig match {
case None => commit.pure[Resource[F, *]]
case Some(PointerConfig(topic, config)) =>
for {
producer <- KafkaProducerOf[F](none).apply(config)
} yield TopicCommit.pointer[F](topic, producer, commit)
}
} yield {
TopicConsumer(topic, pollTimeout, commit, consumer)
}
Expand Down

0 comments on commit d444f84

Please sign in to comment.