diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7085fa16..83098287 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,7 +5,7 @@ object Dependencies { val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2" val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4" val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6" - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "2.11.0" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "2.15.3-SNAPSHOT" // local release with RandomId val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17" val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0" val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala new file mode 100644 index 00000000..a025b5e8 --- /dev/null +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/KafkaHealthCheck.scala @@ -0,0 +1,251 @@ +package com.evolutiongaming.skafka + +import cats.effect._ +import cats.effect.concurrent.Ref +import cats.data.{NonEmptySet => Nes} +import cats.effect.syntax.all._ +import cats.syntax.all._ +import cats.{Applicative, Functor, Monad} +import com.evolutiongaming.catshelper.{FromTry, Log, LogOf, RandomIdOf} +import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerOf, Consumer => SKafkaConsumer} +import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerRecord, ProducerOf, Producer => SKafkaProducer} + +import scala.concurrent.CancellationException +import scala.concurrent.duration._ + +/** + * Provides a health check mechanism that repeatedly sends and consumes messages to/from Kafka. + */ +trait KafkaHealthCheck[F[_]] { + + /** + * Returns the last error that occurred during the health check. + */ + def error: F[Option[Throwable]] + + /** + * Blocks a fiber until the health check is done. + */ + def done: F[Unit] +} + +object KafkaHealthCheck { + + def empty[F[_]: Applicative]: KafkaHealthCheck[F] = new KafkaHealthCheck[F] { + + def error = none[Throwable].pure[F] + + def done = ().pure[F] + } + + def of[F[_]: Concurrent: Timer: LogOf: ConsumerOf: ProducerOf: RandomIdOf: FromTry]( + config: Config, + consumerConfig: ConsumerConfig, + producerConfig: ProducerConfig + ): Resource[F, KafkaHealthCheck[F]] = { + + val result = for { + log <- LogOf[F].apply(KafkaHealthCheck.getClass) + randomId <- RandomIdOf[F].apply + } yield { + val key = randomId.value + + val consumer = Consumer.of[F](key, consumerConfig) + + val producer = Producer.of[F](config.topic, producerConfig) + + of(key = key, config = config, stop = false.pure[F], producer = producer, consumer = consumer, log = log) + } + + Resource + .eval(result) + .flatten + } + + def of[F[_]: Concurrent: Timer]( + key: String, + config: Config, + stop: F[Boolean], + producer: Resource[F, Producer[F]], + consumer: Resource[F, Consumer[F]], + log: Log[F] + ): Resource[F, KafkaHealthCheck[F]] = { + + val result = for { + ref <- Ref.of[F, Option[Throwable]](None) + fiber <- (producer, consumer) + .tupled + .use { case (producer, consumer) => run(key, config, stop, producer, consumer, ref.set, log) } + .start + } yield { + val result = new KafkaHealthCheck[F] { + def error = ref.get + def done = fiber.join.guaranteeCase { + case ExitCase.Completed => Concurrent[F].unit + case ExitCase.Error(e) => Concurrent[F].raiseError(e) + case ExitCase.Canceled => Concurrent[F].raiseError(new CancellationException("HealthCheck cancelled")) + } + } + (result, fiber.cancel) + } + + Resource(result) + } + + def run[F[_]: Concurrent: Timer]( + key: String, + config: Config, + stop: F[Boolean], + producer: Producer[F], + consumer: Consumer[F], + set: Option[Throwable] => F[Unit], + log: Log[F] + ): F[Unit] = { + + val sleep = Timer[F].sleep(config.interval) + + def produce(value: String) = { + val record = Record(key = key.some, value = value.some) + for { + _ <- log.debug(s"$key send $value") + _ <- producer.send(record) + } yield {} + } + + def produceConsume(n: Long) = { + val value = n.toString + + def consume(retry: Long) = { + for { + records <- consumer.poll(config.pollTimeout) + found = records.find { record => record.key.contains_(key) && record.value.contains_(value) } + result <- found.fold { + for { + _ <- sleep + _ <- produce(s"$n:$retry") + } yield { + (retry + 1).asLeft[Unit] + } + } { _ => + ().asRight[Long].pure[F] + } + } yield result + } + + val produceConsume = for { + _ <- produce(value) + _ <- 0L.tailRecM(consume) + } yield {} + + produceConsume + .timeout(config.timeout) + .redeem(_.some, _ => none[Throwable]) + } + + def check(n: Long) = { + for { + error <- produceConsume(n) + _ <- error.fold(().pure[F]) { error => log.error(s"$n failed with $error") } + _ <- set(error) + _ <- sleep + stop <- stop + } yield { + if (stop) ().asRight[Long] + else (n + 1).asLeft[Unit] + } + } + + for { + _ <- Timer[F].sleep(config.initial) + _ <- consumer.subscribe(config.topic) + _ <- consumer.poll(config.interval) + _ <- produceConsume(0L) // warmup + _ <- 1L.tailRecM(check) + } yield {} + } + + trait Producer[F[_]] { + def send(record: Record): F[Unit] + } + + object Producer { + + def apply[F[_]](implicit F: Producer[F]): Producer[F] = F + + def apply[F[_]: Monad: FromTry](topic: Topic, producer: SKafkaProducer[F]): Producer[F] = { + new Producer[F] { + def send(record: Record) = { + val record1 = ProducerRecord[String, String](topic = topic, key = record.key, value = record.value) + producer.send(record1).void + } + } + } + + def of[F[_]: Monad: ProducerOf: FromTry](topic: Topic, config: ProducerConfig): Resource[F, Producer[F]] = { + for { + producer <- implicitly[ProducerOf[F]].apply(config) + } yield { + Producer[F](topic = topic, producer = producer) + } + } + } + + trait Consumer[F[_]] { + + def subscribe(topic: Topic): F[Unit] + + def poll(timeout: FiniteDuration): F[Iterable[Record]] + } + + object Consumer { + + def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F + + def apply[F[_]: Functor](consumer: SKafkaConsumer[F, String, String]): Consumer[F] = { + + new Consumer[F] { + + def subscribe(topic: Topic) = { + consumer.subscribe(Nes.of(topic)) + } + + def poll(timeout: FiniteDuration) = { + for { + records <- consumer.poll(timeout) + } yield for { + record <- records.values.values.flatMap(_.toList) + } yield { + Record(key = record.key.map(_.value), value = record.value.map(_.value)) + } + } + } + } + + def of[F[_]: Monad: ConsumerOf: FromTry](key: String, config: ConsumerConfig): Resource[F, Consumer[F]] = { + val config1 = { + val groupId = config.common.clientId.fold(key) { clientId => s"$clientId-$key" } + config.copy(groupId = groupId.some, autoOffsetReset = AutoOffsetReset.Latest) + } + + for { + consumer <- implicitly[ConsumerOf[F]].apply[String, String](config1) + } yield { + Consumer[F](consumer) + } + } + } + + final case class Record(key: Option[String], value: Option[String]) + + final case class Config( + topic: Topic = "healthcheck", + initial: FiniteDuration = 10.seconds, + interval: FiniteDuration = 1.second, + timeout: FiniteDuration = 2.minutes, + pollTimeout: FiniteDuration = 10.millis + ) + + object Config { + val default: Config = Config() + } +} diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala new file mode 100644 index 00000000..99483734 --- /dev/null +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/KafkaHealthCheckSpec.scala @@ -0,0 +1,135 @@ +package com.evolutiongaming.skafka + +import cats.effect._ +import cats.syntax.all._ +import cats.effect.concurrent.Ref +import com.evolutiongaming.catshelper.Log +import com.evolutiongaming.skafka.IOSuite._ +import com.evolutiongaming.skafka.KafkaHealthCheck.Record +import com.evolutiongaming.skafka.Topic +import org.scalatest.funsuite.AsyncFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class KafkaHealthCheckSpec extends AsyncFunSuite with Matchers { + import KafkaHealthCheckSpec._ + + test("error") { + implicit val log: Log[IO] = Log.empty[IO] + + val producer = new KafkaHealthCheck.Producer[IO] { + def send(record: Record) = ().pure[IO] + } + + val consumer = new KafkaHealthCheck.Consumer[IO] { + + def subscribe(topic: Topic) = ().pure[IO] + + def poll(timeout: FiniteDuration) = { + if (timeout == 1.second) List.empty[Record].pure[IO] + else Error.raiseError[IO, List[Record]] + } + } + + val healthCheck = KafkaHealthCheck.of[IO]( + key = "key", + config = KafkaHealthCheck.Config(topic = "topic", initial = 0.seconds, interval = 1.second), + stop = false.pure[IO], + producer = Resource.pure[IO, KafkaHealthCheck.Producer[IO]](producer), + consumer = Resource.pure[IO, KafkaHealthCheck.Consumer[IO]](consumer), + log = log + ) + val result = for { + error <- healthCheck.use(_.error.untilDefinedM) + } yield { + error shouldEqual error + } + result.run() + } + + test("periodic healthcheck") { + final case class State( + checks: Int = 0, + subscribed: Option[Topic] = None, + logs: List[String] = List.empty, + records: List[Record] = List.empty + ) + + def logOf(ref: Ref[IO, State]): Log[IO] = { + def add(log: String): IO[Unit] = + ref.update(state => state.copy(logs = log :: state.logs)) + + new Log[IO] { + def trace(msg: => String, mdc: Log.Mdc) = add(s"trace $msg") + + def debug(msg: => String, mdc: Log.Mdc) = add(s"debug $msg") + + def info(msg: => String, mdc: Log.Mdc) = add(s"info $msg") + + def warn(msg: => String, mdc: Log.Mdc) = add(s"warn $msg") + + def warn(msg: => String, cause: Throwable, mdc: Log.Mdc) = add(s"warn $msg $cause") + + def error(msg: => String, mdc: Log.Mdc) = add(s"error $msg") + + def error(msg: => String, cause: Throwable, mdc: Log.Mdc) = add(s"error $msg $cause") + } + } + + def consumerOf(ref: Ref[IO, State]) = new KafkaHealthCheck.Consumer[IO] { + def subscribe(topic: Topic): IO[Unit] = + ref.update(_.copy(subscribed = topic.some)) + + def poll(timeout: FiniteDuration): IO[Iterable[Record]] = + ref + .modify(state => + if (state.records.size >= 2) (state.copy(records = List.empty), state.records) + else (state, List.empty) + ) + } + + def producerOf(ref: Ref[IO, State]): KafkaHealthCheck.Producer[IO] = new KafkaHealthCheck.Producer[IO] { + def send(record: Record): IO[Unit] = + ref.update(state => state.copy(records = record :: state.records)) + } + + def stopOf(ref: Ref[IO, State]): IO[Boolean] = + ref.updateAndGet(state => state.copy(checks = state.checks - 1)).map(_.checks <= 0) + + val result = for { + ref <- Ref.of[IO, State](State(checks = 2)) + healthCheck = KafkaHealthCheck.of[IO]( + key = "key", + config = + KafkaHealthCheck.Config(topic = "topic", initial = 0.millis, interval = 0.millis, timeout = 100.millis), + stop = stopOf(ref), + producer = Resource.pure[IO, KafkaHealthCheck.Producer[IO]](producerOf(ref)), + consumer = Resource.pure[IO, KafkaHealthCheck.Consumer[IO]](consumerOf(ref)), + log = logOf(ref) + ) + _ <- healthCheck.use(_.done) + state <- ref.get + + } yield state shouldEqual State( + checks = 0, + subscribed = "topic".some, + logs = List( + "debug key send 2:0", + "debug key send 2", + "debug key send 1:0", + "debug key send 1", + "debug key send 0:0", + "debug key send 0" + ), + records = List() + ) + + result.run() + } +} + +object KafkaHealthCheckSpec { + val Error: Throwable = new RuntimeException with NoStackTrace +}