diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 4b21c489..f4ea311d 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -28,26 +28,7 @@ object ConsumerMetricsOf { registry <- KafkaMetricsRegistry.of(prometheus, prefix) } yield { (clientId: ClientId) => val source = sourceOf(clientId) - - new ConsumerMetrics[F] { - override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = - source.call(name, topic, latency, success) - - override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = - source.poll(topic, bytes, records, age) - - override def count(name: String, topic: Topic): F[Unit] = - source.count(name, topic) - - override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = - source.rebalance(name, topicPartition) - - override def topics(latency: FiniteDuration): F[Unit] = - source.topics(latency) - - override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = - registry.register(consumer.clientMetrics) - } + consumerMetricsOf(source, registry) } /** @@ -65,7 +46,13 @@ object ConsumerMetricsOf { ): Resource[F, ConsumerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) - } yield new ConsumerMetrics[F] { + } yield consumerMetricsOf(source, registry) + + private def consumerMetricsOf[F[_]]( + source: ConsumerMetrics[F], + registry: KafkaMetricsRegistry[F], + ): ConsumerMetrics[F] = + new ConsumerMetrics[F] { override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = source.call(name, topic, latency, success) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index c7870879..b45891f3 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -28,34 +28,7 @@ object ProducerMetricsOf { registry <- KafkaMetricsRegistry.of(prometheus, prefix) } yield { (clientId: ClientId) => val source = sourceOf(clientId) - - new ProducerMetrics[F] { - override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) - - override def beginTransaction: F[Unit] = source.beginTransaction - - override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = - source.sendOffsetsToTransaction(latency) - - override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency) - - override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency) - - override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = - source.send(topic, latency, bytes) - - override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency) - - override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency) - - override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency) - - override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) - - override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = - registry.register(producer.clientMetrics) - - } + producerMetricsOf(source, registry) } /** @@ -73,7 +46,13 @@ object ProducerMetricsOf { ): Resource[F, ProducerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) - } yield new ProducerMetrics[F] { + } yield producerMetricsOf(source, registry) + + private def producerMetricsOf[F[_]]( + source: ProducerMetrics[F], + registry: KafkaMetricsRegistry[F], + ): ProducerMetrics[F] = + new ProducerMetrics[F] { override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) override def beginTransaction: F[Unit] = source.beginTransaction