From e49c46122c5dff6bf2cefff4bccc6762259cdf53 Mon Sep 17 00:00:00 2001 From: patelh Date: Mon, 2 Mar 2020 22:40:15 -0800 Subject: [PATCH] Use noop reporter --- .../manager/actor/cluster/KafkaStateActor.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index ba6dcab2e..19333f104 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -9,6 +9,7 @@ import java.io.Closeable import java.net.InetAddress import java.nio.ByteBuffer import java.time.Duration +import java.util import java.util.Properties import java.util.concurrent.{ConcurrentLinkedDeque, TimeUnit} @@ -48,6 +49,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupDescription, DescribeConsumerGroupsOptions} import org.apache.kafka.common.KafkaFuture.BiConsumer +import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter} import org.apache.kafka.common.utils.Time /** @@ -57,6 +59,17 @@ import kafka.manager.utils._ import scala.collection.JavaConverters._ +class NoopJMXReporter extends MetricsReporter { + override def init(metrics: util.List[KafkaMetric]): Unit = {} + + override def metricChange(metric: KafkaMetric): Unit = {} + + override def metricRemoval(metric: KafkaMetric): Unit = {} + + override def close(): Unit = {} + + override def configure(configs: util.Map[String, _]): Unit = {} +} case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) case class KafkaAdminClientActorConfig(clusterContext: ClusterContext, longRunningPoolConfig: LongRunningPoolConfig, kafkaStateActorPath: ActorPath, consumerProperties: Option[Properties]) case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends BaseClusterQueryActor with LongRunningPoolActor { @@ -257,7 +270,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(AUTO_OFFSET_RESET_CONFIG, "latest") - props.put(METRIC_REPORTER_CLASSES_CONFIG, "") + props.put(METRIC_REPORTER_CLASSES_CONFIG, classOf[NoopJMXReporter].getCanonicalName) consumerProperties.foreach { cp => props.putAll(cp.asMap) } @@ -1479,6 +1492,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom val port: Int = broker.endpoints(securityProtocol) consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port") consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId) + consumerProperties.put(METRIC_REPORTER_CLASSES_CONFIG, classOf[NoopJMXReporter].getCanonicalName) // Use secure endpoint if available if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){ consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId)