diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 4fcd9116b..48bdff21f 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -344,47 +344,50 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() - readMessageKey(ByteBuffer.wrap(record.key())) match { - case OffsetKey(version, key) => - val value: OffsetAndMetadata = readOffsetMessageValue(ByteBuffer.wrap(record.value())) - val newKey = (key.group, key.topicPartition.topic, key.topicPartition.partition) - groupTopicPartitionOffsetMap.put(newKey, value) - groupTopicPartitionOffsetSet.add(newKey) - val topic = key.topicPartition.topic - val group = key.group - val consumerSet = { - if (topicConsumerSetMap.contains(topic)) { - topicConsumerSetMap(topic) - } else { - val s = new mutable.TreeSet[String]() - topicConsumerSetMap += topic -> s - s - } - } - consumerSet += group - - val topicSet = { - if (consumerTopicSetMap.contains(group)) { - consumerTopicSetMap(group) - } else { - val s = new mutable.TreeSet[String]() - consumerTopicSetMap += group -> s - s + if(record.value()!=null) { + readMessageKey(ByteBuffer.wrap(record.key())) match { + case OffsetKey(version, key) => + val value: OffsetAndMetadata = readOffsetMessageValue(ByteBuffer.wrap(record.value())) + val newKey = (key.group, key.topicPartition.topic, key.topicPartition.partition) + groupTopicPartitionOffsetMap.put(newKey, value) + groupTopicPartitionOffsetSet.add(newKey) + val topic = key.topicPartition.topic + val group = key.group + val consumerSet = { + if (topicConsumerSetMap.contains(topic)) { + topicConsumerSetMap(topic) + } else { + val s = new mutable.TreeSet[String]() + topicConsumerSetMap += topic -> s + s + } } - } - topicSet += topic - case GroupMetadataKey(version, key) => - val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()), Time.SYSTEM) - value.allMemberMetadata.foreach { - mm => - mm.assignment.foreach { - case (topic, part) => - val newKey = (key, topic, part) - groupTopicPartitionMemberMap.put(newKey, mm) - groupTopicPartitionMemberSet.add(newKey) + consumerSet += group + + val topicSet = { + if (consumerTopicSetMap.contains(group)) { + consumerTopicSetMap(group) + } else { + val s = new mutable.TreeSet[String]() + consumerTopicSetMap += group -> s + s } - } + } + topicSet += topic + case GroupMetadataKey(version, key) => + val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()), Time.SYSTEM) + value.allMemberMetadata.foreach { + mm => + mm.assignment.foreach { + case (topic, part) => + val newKey = (key, topic, part) + groupTopicPartitionMemberMap.put(newKey, mm) + groupTopicPartitionMemberSet.add(newKey) + } + } + } } + } lastUpdateTimeMillis = System.currentTimeMillis() } catch {