From 2a1690fb4b98a611a358375e213943e5e672d126 Mon Sep 17 00:00:00 2001 From: Tamalendu Nath Date: Thu, 10 Jan 2019 14:06:27 +0100 Subject: [PATCH] Search by Kafka message key - pom.xml -- Dependency added for Spark - message-inspector -- Made changes in Message-inspector UI. -- Added Search by dropdown and Message Key input field. - MessageVO -- Implement Serializable To implement the functionality added new config, service and util classes and made changes in controller cleasses. - Config (added) -- MessageConfiguration - Service (added) -- SparkMessageInspector - Util (added) -- SearchType - Controller (change) -- MessageController --- pom.xml | 38 +++++- .../kafdrop/config/MessageConfiguration.java | 48 +++++++ .../kafdrop/controller/MessageController.java | 82 +++++++++--- .../homeadvisor/kafdrop/model/MessageVO.java | 2 +- .../service/SparkMessageInspector.java | 122 ++++++++++++++++++ .../homeadvisor/kafdrop/util/SearchType.java | 5 + .../resources/static/js/message-inspector.js | 36 +++++- .../resources/templates/message-inspector.ftl | 60 ++++++--- 8 files changed, 342 insertions(+), 51 deletions(-) create mode 100644 src/main/java/com/homeadvisor/kafdrop/config/MessageConfiguration.java create mode 100644 src/main/java/com/homeadvisor/kafdrop/service/SparkMessageInspector.java create mode 100644 src/main/java/com/homeadvisor/kafdrop/util/SearchType.java diff --git a/pom.xml b/pom.xml index 1e73b39..ef83145 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,9 @@ 1.3.6.RELEASE -Xdoclint:none 2.10.0 - + 0.10.2.0 + 2.11 + 2.3.0 -Xdoclint:none @@ -71,7 +73,7 @@ org.apache.kafka - kafka_2.9.2 + kafka_${scala.version} 0.8.2.2 @@ -109,6 +111,36 @@ gson 2.8.5 + + + commons-codec + commons-codec + 1.9 + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-streaming_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-core_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-sql_${scala.version} + ${spark.version} + @@ -336,7 +368,7 @@ org.apache.kafka - kafka_2.9.2 + kafka_${scala.version} 0.8.2.2 diff --git a/src/main/java/com/homeadvisor/kafdrop/config/MessageConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/MessageConfiguration.java new file mode 100644 index 0000000..0e0e544 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/MessageConfiguration.java @@ -0,0 +1,48 @@ +package com.homeadvisor.kafdrop.config; + +import javax.annotation.PostConstruct; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import com.homeadvisor.kafdrop.util.SearchType;; + + +@Configuration +public class MessageConfiguration { + + @Component + @ConfigurationProperties(prefix = "message") + public static class MessageProperties + { + + private SearchType type; + private static String consumerName = "kafka-test-ARRTP"; + + @PostConstruct + public void init() { + // Set a default message format if not configured. + if (type == null) { + type = SearchType.Offset; + } + } + + public SearchType getType() + { + return type; + } + + public void setType(SearchType type) + { + this.type = type; + } + + public String getConsumerName() + { + return this.consumerName; + } + + } + +} diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index 5325147..e25351c 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -22,15 +22,18 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.homeadvisor.kafdrop.config.MessageFormatConfiguration; import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration; +import com.homeadvisor.kafdrop.config.MessageConfiguration; import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.service.KafkaMonitor; import com.homeadvisor.kafdrop.service.MessageInspector; +import com.homeadvisor.kafdrop.service.SparkMessageInspector; import com.homeadvisor.kafdrop.service.TopicNotFoundException; import com.homeadvisor.kafdrop.util.AvroMessageDeserializer; import com.homeadvisor.kafdrop.util.DefaultMessageDeserializer; import com.homeadvisor.kafdrop.util.MessageDeserializer; import com.homeadvisor.kafdrop.util.MessageFormat; +import com.homeadvisor.kafdrop.util.SearchType; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -58,12 +61,18 @@ public class MessageController @Autowired private MessageInspector messageInspector; + @Autowired + private SparkMessageInspector sparkMessageInspector; + @Autowired private MessageFormatConfiguration.MessageFormatProperties messageFormatProperties; @Autowired private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties; + @Autowired + private MessageConfiguration.MessageProperties searchTypeProperties; + /** * Human friendly view of reading messages. * @param topicName Name of topic @@ -79,7 +88,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, Model model) { final MessageFormat defaultFormat = messageFormatProperties.getFormat(); - + final SearchType defaultSearchType = searchTypeProperties.getType(); if (messageForm.isEmpty()) { final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); @@ -98,21 +107,32 @@ public String viewMessageForm(@PathVariable("name") String topicName, model.addAttribute("defaultFormat", defaultFormat); model.addAttribute("messageFormats", MessageFormat.values()); - - if (!messageForm.isEmpty() && !errors.hasErrors()) - { - final MessageDeserializer deserializer = getDeserializer( - topicName, messageForm.getFormat()); - - model.addAttribute("messages", - messageInspector.getMessages(topicName, - messageForm.getPartition(), - messageForm.getOffset(), - messageForm.getCount(), - deserializer)); - + model.addAttribute("defaultSearchType", defaultSearchType); + model.addAttribute("searchType", SearchType.values()); + + if (messageForm.getSearchby() != null && messageForm.getSearchby().name() == SearchType.Key.toString()) { + if (!messageForm.isEmptyMessageKey() && !errors.hasErrors()) + { + model.addAttribute("messages", + sparkMessageInspector.getMessagesByKey(topicName, + messageForm.getMessageKey(), + messageForm.getCount())); + } + } else { + if (!messageForm.isEmpty() && !errors.hasErrors()) + { + final MessageDeserializer deserializer = getDeserializer( + topicName, messageForm.getFormat()); + + model.addAttribute("messages", + messageInspector.getMessages(topicName, + messageForm.getPartition(), + messageForm.getOffset(), + messageForm.getCount(), + deserializer)); + + } } - return "message-inspector"; } @@ -207,23 +227,24 @@ public static class PartitionOffsetInfo */ @NotNull @Min(1) - @Max(100) @JsonProperty("lastOffset") private Long count; private MessageFormat format; - - public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format) + private SearchType searchby; + private String messageKey; + public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format, SearchType searchby) { this.partition = partition; this.offset = offset; this.count = count; this.format = format; + this.searchby = searchby; } public PartitionOffsetInfo(int partition, long offset, long count) { - this(partition, offset, count, MessageFormat.DEFAULT); + this(partition, offset, count, MessageFormat.DEFAULT, SearchType.Offset); } public PartitionOffsetInfo() @@ -236,7 +257,10 @@ public boolean isEmpty() { return partition == null && offset == null && (count == null || count == 1); } - + public boolean isEmptyMessageKey() + { + return messageKey == null && (count == null || count == 0); + } public Integer getPartition() { return partition; @@ -276,5 +300,23 @@ public void setFormat(MessageFormat format) { this.format = format; } + public String getMessageKey() + { + return this.messageKey; + } + public void setMessageKey(String messageKey) + { + this.messageKey = messageKey; + } + + public SearchType getSearchby() + { + return searchby; + } + + public void setSearchby(SearchType searchby) + { + this.searchby = searchby; + } } } diff --git a/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java b/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java index 2fa863d..0778c94 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java @@ -18,7 +18,7 @@ package com.homeadvisor.kafdrop.model; -public class MessageVO +public class MessageVO implements java.io.Serializable { private String message; private String key; diff --git a/src/main/java/com/homeadvisor/kafdrop/service/SparkMessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/SparkMessageInspector.java new file mode 100644 index 0000000..c608a18 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/service/SparkMessageInspector.java @@ -0,0 +1,122 @@ +/* + * Copyright 2017 HomeAdvisor, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.homeadvisor.kafdrop.service; + +import com.homeadvisor.kafdrop.config.MessageConfiguration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.List; +import java.util.Collection; +import java.util.Map; +import java.util.HashMap; + +import com.homeadvisor.kafdrop.model.MessageVO; +import com.homeadvisor.kafdrop.model.TopicPartitionVO; +import com.homeadvisor.kafdrop.model.TopicVO; +import com.homeadvisor.kafdrop.model.BrokerVO; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + + +import org.apache.spark.streaming.kafka010.OffsetRange; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; + + +@Service +public class SparkMessageInspector +{ + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + @Autowired + private KafkaMonitor kafkaMonitor; + + @Autowired + private MessageConfiguration.MessageProperties messageProperties; + + public List getMessagesByKey(String topicName, String messageKey, long numberOfMessages) + { + final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); + Collection partitions = topic.getPartitions(); + List broker = kafkaMonitor.getBrokers(); + + String brokerList = ""; + for (BrokerVO aBroker : broker) { + if (brokerList.length()>0) { + brokerList += ","; + } + brokerList += aBroker.getHost() + ":" + aBroker.getPort(); + } + + Map param = new HashMap(); + param.put("bootstrap.servers", brokerList); + param.put("key.deserializer", StringDeserializer.class); + param.put("value.deserializer", StringDeserializer.class); + param.put("group.id", messageProperties.getConsumerName()); + param.put("auto.offset.reset", "earliest"); + param.put("enable.auto.commit", false); + + OffsetRange[] offsetRanges = new OffsetRange[partitions.size()]; + for (TopicPartitionVO aPart : partitions) { + long startOffset = aPart.getSize() - numberOfMessages; + if (startOffset <= aPart.getFirstOffset()) { + startOffset = aPart.getFirstOffset(); + } + offsetRanges[aPart.getId()] = OffsetRange.create(topic.getName(),aPart.getId(),startOffset,aPart.getSize()); + } + SparkSession spark = SparkSession.builder().appName(messageProperties.getConsumerName()).master("local[*]").getOrCreate(); + SparkContext sc = spark.sparkContext(); + JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); + JavaRDD> rdd =KafkaUtils.createRDD(jsc, param, offsetRanges, LocationStrategies.PreferConsistent()); + List newRdd = rdd.map( + (Function, MessageVO>) r -> { + MessageVO vo = new MessageVO(); + vo.setMessage(r.value()); + vo.setKey(r.key()); + vo.setComputedChecksum(r.checksum()); + vo.setChecksum(r.checksum()); + vo.setCompressionCodec("none"); + return vo; + }).filter( r -> { + if (r != null) { + String messageKey2 = messageKey; + return r.getKey().equals(messageKey2); + } + return false; + }).collect(); + + jsc.stop(); + List messages = new ArrayList<>(); + for (MessageVO aMessage : newRdd) { + messages.add(aMessage); + } + return messages; + } +} diff --git a/src/main/java/com/homeadvisor/kafdrop/util/SearchType.java b/src/main/java/com/homeadvisor/kafdrop/util/SearchType.java new file mode 100644 index 0000000..c12b96c --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/util/SearchType.java @@ -0,0 +1,5 @@ +package com.homeadvisor.kafdrop.util; + +public enum SearchType { + Offset, Key; +} diff --git a/src/main/resources/static/js/message-inspector.js b/src/main/resources/static/js/message-inspector.js index f51209d..3149613 100644 --- a/src/main/resources/static/js/message-inspector.js +++ b/src/main/resources/static/js/message-inspector.js @@ -16,11 +16,18 @@ * */ + jQuery(document).ready(function() { + var isFormLoad + $(function() { + isFormLoad = true; + $('#searchby').trigger('change'); + isFormLoad = false; + }); jQuery(document).on('click', '.toggle-msg', function(e) { - var link=jQuery(this), - linkIcon=link.find('.fa'), - body=link.parent().find('.message-body'); + var link = jQuery(this), + linkIcon = link.find('.fa'), + body = link.parent().find('.message-body'); e.preventDefault(); @@ -28,8 +35,7 @@ jQuery(document).ready(function() { if (true == body.data('expanded')) { body.text(JSON.stringify(JSON.parse(body.text()))); body.data('expanded', false); - } - else { + } else { body.text(JSON.stringify(JSON.parse(body.text()), null, 3)); body.data('expanded', true); } @@ -43,4 +49,22 @@ jQuery(document).ready(function() { jQuery('#lastOffset').text(lastOffset); jQuery('#partitionSize').text(lastOffset - firstOffset) }); -}) + + jQuery(document).on('change', '#searchby', function(e) { + var selectedOption = jQuery(this).children("option").filter(":selected").val(); + if (isFormLoad == false) { + jQuery('#message-display').empty(); + } + if (selectedOption.toLowerCase() == 'offset') { + jQuery('#partitionSizes').show(); + jQuery('#offset').show(); + jQuery('#dvMessageKey').hide(); + jQuery('#dvMessageFormat').show(); + } else { + jQuery('#partitionSizes').hide(); + jQuery('#offset').hide(); + jQuery('#dvMessageKey').show(); + jQuery('#dvMessageFormat').hide(); + } + }); +}) \ No newline at end of file diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index ed96755..40f307d 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -18,42 +18,51 @@ <@template.header "Topic: ${topic.name}: Messages"> + <#setting number_format="0"> - - -

Topic Messages: ${topic.name}

+

Topic Messages: ${topic.name}

<#assign selectedPartition=messageForm.partition!0?number> <#assign selectedFormat=messageForm.format!defaultFormat> - -
- <#assign curPartition=topic.getPartition(selectedPartition).get()> - First Offset: ${curPartition.firstOffset} - Last Offset: ${curPartition.size} - Size: ${curPartition.size - curPartition.firstOffset} -
- -
+<#assign selectedSearchby=messageForm.searchby!defaultSearchType>
- +
+ + +
+
- <#list topic.partitions as p> + <#assign curPartition=topic.getPartition(selectedPartition).get()> + First Offset: ${curPartition.firstOffset} + Last Offset: ${curPartition.size} + Size: ${curPartition.size - curPartition.firstOffset}
+
+ +
+ <@spring.bind path="messageForm.offset"/> -
+
<@spring.formInput path="messageForm.offset" attributes='class="form-control"'/> <#if spring.status.error> @@ -70,8 +79,17 @@
-
- + <@spring.bind path="messageForm.messageKey"/> + + +
+