Skip to content

Commit

Permalink
Use noop reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Mar 3, 2020
1 parent 4f3a999 commit e49c461
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.io.Closeable
import java.net.InetAddress
import java.nio.ByteBuffer
import java.time.Duration
import java.util
import java.util.Properties
import java.util.concurrent.{ConcurrentLinkedDeque, TimeUnit}

Expand Down Expand Up @@ -48,6 +49,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupDescription, DescribeConsumerGroupsOptions}
import org.apache.kafka.common.KafkaFuture.BiConsumer
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
import org.apache.kafka.common.utils.Time

/**
Expand All @@ -57,6 +59,17 @@ import kafka.manager.utils._

import scala.collection.JavaConverters._

class NoopJMXReporter extends MetricsReporter {
override def init(metrics: util.List[KafkaMetric]): Unit = {}

override def metricChange(metric: KafkaMetric): Unit = {}

override def metricRemoval(metric: KafkaMetric): Unit = {}

override def close(): Unit = {}

override def configure(configs: util.Map[String, _]): Unit = {}
}
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
case class KafkaAdminClientActorConfig(clusterContext: ClusterContext, longRunningPoolConfig: LongRunningPoolConfig, kafkaStateActorPath: ActorPath, consumerProperties: Option[Properties])
case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends BaseClusterQueryActor with LongRunningPoolActor {
Expand Down Expand Up @@ -257,7 +270,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(METRIC_REPORTER_CLASSES_CONFIG, "")
props.put(METRIC_REPORTER_CLASSES_CONFIG, classOf[NoopJMXReporter].getCanonicalName)
consumerProperties.foreach {
cp => props.putAll(cp.asMap)
}
Expand Down Expand Up @@ -1479,6 +1492,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
val port: Int = broker.endpoints(securityProtocol)
consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port")
consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId)
consumerProperties.put(METRIC_REPORTER_CLASSES_CONFIG, classOf[NoopJMXReporter].getCanonicalName)
// Use secure endpoint if available
if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId)
Expand Down

0 comments on commit e49c461

Please sign in to comment.