From 987978e4a8dbd73ecf8bd606d6c3473d7c3d6fe7 Mon Sep 17 00:00:00 2001 From: Michael Pratt Date: Wed, 21 Jun 2017 11:24:15 -0600 Subject: [PATCH] Kafdrop 2.0.0 sync from HomeAdvisor --- pom.xml | 9 +- .../java/com/homeadvisor/kafdrop/KafDrop.java | 5 +- .../kafdrop/config/CorsConfiguration.java | 106 +++++++++++++++ .../kafdrop/config/CuratorConfiguration.java | 2 +- .../config/HealthCheckConfiguration.java | 2 +- .../config/ServiceDiscoveryConfiguration.java | 2 +- .../kafdrop/config/SwaggerConfiguration.java | 78 +++++++++++ .../kafdrop/config/ini/IniFileProperties.java | 2 +- .../config/ini/IniFilePropertySource.java | 2 +- .../kafdrop/config/ini/IniFileReader.java | 2 +- .../kafdrop/controller/BrokerController.java | 32 ++++- .../kafdrop/controller/ClusterController.java | 42 +++++- .../controller/ConsumerController.java | 23 +++- .../controller/KafkaExceptionHandler.java | 2 +- .../kafdrop/controller/MessageController.java | 122 +++++++++++++++--- .../kafdrop/controller/TopicController.java | 58 ++++++++- .../homeadvisor/kafdrop/model/BrokerVO.java | 2 +- .../kafdrop/model/ConsumerPartitionVO.java | 2 +- .../kafdrop/model/ConsumerRegistrationVO.java | 2 +- .../kafdrop/model/ConsumerTopicVO.java | 2 +- .../homeadvisor/kafdrop/model/ConsumerVO.java | 2 +- .../homeadvisor/kafdrop/model/MessageVO.java | 2 +- .../kafdrop/model/TopicPartitionStateVO.java | 2 +- .../kafdrop/model/TopicPartitionVO.java | 2 +- .../kafdrop/model/TopicRegistrationVO.java | 2 +- .../homeadvisor/kafdrop/model/TopicVO.java | 2 +- .../service/BrokerNotFoundException.java | 2 +- .../service/ConsumerNotFoundException.java | 2 +- .../kafdrop/service/CuratorKafkaMonitor.java | 2 +- .../CuratorKafkaMonitorProperties.java | 2 +- .../kafdrop/service/KafkaMonitor.java | 2 +- .../kafdrop/service/MessageInspector.java | 6 +- .../service/NotInitializedException.java | 2 +- .../service/PartitionNotFoundException.java | 2 +- .../service/TopicNotFoundException.java | 2 +- .../kafdrop/util/BrokerChannel.java | 2 +- .../homeadvisor/kafdrop/util/JmxUtils.java | 2 +- 37 files changed, 480 insertions(+), 55 deletions(-) create mode 100644 src/main/java/com/homeadvisor/kafdrop/config/CorsConfiguration.java create mode 100644 src/main/java/com/homeadvisor/kafdrop/config/SwaggerConfiguration.java diff --git a/pom.xml b/pom.xml index 8c23747..4893e50 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.homeadvisor.kafka kafdrop - 1.2.2-SNAPSHOT + 2.0.0 For when you have a Kaf(ka) cluster to monitor @@ -108,6 +108,13 @@ spring-beans + + + io.springfox + springfox-swagger2 + 2.7.0 + + junit diff --git a/src/main/java/com/homeadvisor/kafdrop/KafDrop.java b/src/main/java/com/homeadvisor/kafdrop/KafDrop.java index 767b48d..6b40509 100644 --- a/src/main/java/com/homeadvisor/kafdrop/KafDrop.java +++ b/src/main/java/com/homeadvisor/kafdrop/KafDrop.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import com.homeadvisor.kafdrop.config.ini.IniFileReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.Banner; import org.springframework.boot.actuate.autoconfigure.MetricFilterAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -44,7 +45,7 @@ public class KafDrop public static void main(String[] args) { new SpringApplicationBuilder(KafDrop.class) - .showBanner(false) + .bannerMode(Banner.Mode.OFF) .listeners(new EnvironmentSetupListener(), new LoggingConfigurationListener()) .run(args); diff --git a/src/main/java/com/homeadvisor/kafdrop/config/CorsConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/CorsConfiguration.java new file mode 100644 index 0000000..1d9f4d4 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/CorsConfiguration.java @@ -0,0 +1,106 @@ +/* + * Copyright 2017 HomeAdvisor, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.homeadvisor.kafdrop.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; + +import javax.servlet.*; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; + +/** + * Auto configuration for enabling CORS support. Can override behavior with + * various configs: + * + * + *
+ * To disable CORS entirely, set cors.enabled=false. All other configs are + * just Strings that get used as-is to set the corresponding CORS header. + */ +@Configuration +@ConditionalOnProperty(value = "cors.enabled", matchIfMissing = true) +public class CorsConfiguration +{ + @Value("${cors.allowOrigins:*}") + private String corsAllowOrigins; + + @Value("${cors.allowMethods:GET,POST,PUT,DELETE}") + private String corsAllowMethods; + + @Value("${cors.maxAge:3600}") + private String corsMaxAge; + + @Value("${cors.allowCredentials:true}") + private String corsAllowCredentials; + + @Value("${cors.allowHeaders:Origin,Accept,X-Requested-With,Content-Type,Access-Control-Request-Method,Access-Control-Request-Headers,Authorization}") + private String corsAllowHeaders; + + @Bean + @Order(Ordered.HIGHEST_PRECEDENCE) + public Filter corsFilter() + { + return new Filter() + { + @Override + public void init(FilterConfig filterConfig) throws ServletException + {} + + @Override + public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException + { + HttpServletResponse response = (HttpServletResponse) res; + HttpServletRequest request = (HttpServletRequest) req; + + response.setHeader("Access-Control-Allow-Origin", corsAllowOrigins); + response.setHeader("Access-Control-Allow-Methods", corsAllowMethods); + response.setHeader("Access-Control-Max-Age", corsMaxAge); + response.setHeader("Access-Control-Allow-Credentials", corsAllowCredentials); + response.setHeader("Access-Control-Allow-Headers", corsAllowHeaders); + + if(request.getMethod().equals(HttpMethod.OPTIONS.name())) + { + response.setStatus(HttpStatus.NO_CONTENT.value()); + } + else + { + chain.doFilter(req, res); + } + } + + @Override + public void destroy() {} + }; + } +} diff --git a/src/main/java/com/homeadvisor/kafdrop/config/CuratorConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/CuratorConfiguration.java index dec959f..8d63e47 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/CuratorConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/CuratorConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/config/HealthCheckConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/HealthCheckConfiguration.java index ea1c2a0..bf89647 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/HealthCheckConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/HealthCheckConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/config/ServiceDiscoveryConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/ServiceDiscoveryConfiguration.java index 224ed28..c982b17 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/ServiceDiscoveryConfiguration.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/ServiceDiscoveryConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/config/SwaggerConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/SwaggerConfiguration.java new file mode 100644 index 0000000..151a7b8 --- /dev/null +++ b/src/main/java/com/homeadvisor/kafdrop/config/SwaggerConfiguration.java @@ -0,0 +1,78 @@ +/* + * Copyright 2017 HomeAdvisor, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.homeadvisor.kafdrop.config; + +import com.google.common.base.Predicate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.MediaType; +import springfox.documentation.RequestHandler; +import springfox.documentation.builders.ApiInfoBuilder; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + +/** + * Auto configuration for Swagger. Can be disabled by setting swagger.enabled=false. + */ +@Configuration +@EnableSwagger2 +@ConditionalOnProperty(value = "swagger.enabled", matchIfMissing = true) +public class SwaggerConfiguration +{ + @Bean + public Docket swagger() + { + return new Docket(DocumentationType.SWAGGER_2) + .useDefaultResponseMessages(false) + .apiInfo(new ApiInfoBuilder() + .title("Kafdrop API") + .description("JSON APIs for Kafdrop") + .build()) + .select() + .apis(new JsonRequestHandlerPredicate()) + .paths(new IgnoreDebugPathPredicate()) + .build(); + } + + /** + * Swagger Predicate for only selecting JSON endpoints. + */ + public class JsonRequestHandlerPredicate implements Predicate + { + @Override + public boolean apply(RequestHandler input) + { + return input.produces().contains(MediaType.APPLICATION_JSON); + } + } + + /** + * Swagger Predicate for ignoring /debug endpoints. + */ + public class IgnoreDebugPathPredicate implements Predicate + { + @Override + public boolean apply(String input) + { + return !input.startsWith("/debug"); + } + } +} diff --git a/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileProperties.java b/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileProperties.java index 50c60c8..62f58d9 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileProperties.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFilePropertySource.java b/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFilePropertySource.java index fd99f26..5f4cfeb 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFilePropertySource.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFilePropertySource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileReader.java b/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileReader.java index fe16a07..d875226 100644 --- a/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileReader.java +++ b/src/main/java/com/homeadvisor/kafdrop/config/ini/IniFileReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/BrokerController.java b/src/main/java/com/homeadvisor/kafdrop/controller/BrokerController.java index 0156cfb..2ad7f23 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/BrokerController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/BrokerController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,13 +18,22 @@ package com.homeadvisor.kafdrop.controller; +import com.homeadvisor.kafdrop.model.BrokerVO; import com.homeadvisor.kafdrop.service.BrokerNotFoundException; import com.homeadvisor.kafdrop.service.KafkaMonitor; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +import java.util.List; @Controller public class BrokerController @@ -40,4 +49,25 @@ public String brokerDetails(@PathVariable("id") int brokerId, Model model) model.addAttribute("topics", kafkaMonitor.getTopics()); return "broker-detail"; } + + @ApiOperation(value = "getBroker", notes = "Get details for a specific Kafka broker") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = BrokerVO.class), + @ApiResponse(code = 404, message = "Invalid Broker ID") + }) + @RequestMapping(path = "/broker/{id}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody BrokerVO brokerDetailsJson(@PathVariable("id") int brokerId) + { + return kafkaMonitor.getBroker(brokerId).orElseThrow(() -> new BrokerNotFoundException(String.valueOf(brokerId))); + } + + @ApiOperation(value = "getAllBrokers", notes = "Get details for all known Kafka brokers") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = BrokerVO.class) + }) + @RequestMapping(path = "/broker", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody List brokerDetailsJson() + { + return kafkaMonitor.getBrokers(); + } } diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java b/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java index 519723e..3b85df8 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,16 +19,24 @@ package com.homeadvisor.kafdrop.controller; import com.homeadvisor.kafdrop.config.CuratorConfiguration; +import com.homeadvisor.kafdrop.model.BrokerVO; +import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.service.BrokerNotFoundException; import com.homeadvisor.kafdrop.service.KafkaMonitor; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; -import java.util.Collection; import java.util.Collections; +import java.util.List; @Controller public class ClusterController @@ -40,7 +48,7 @@ public class ClusterController private CuratorConfiguration.ZookeeperProperties zookeeperProperties; @RequestMapping("/") - public String allBrokers(Model model) + public String clusterInfo(Model model) { model.addAttribute("zookeeper", zookeeperProperties); model.addAttribute("brokers", kafkaMonitor.getBrokers()); @@ -48,6 +56,23 @@ public String allBrokers(Model model) return "cluster-overview"; } + @ApiOperation(value = "getCluster", notes = "Get high level broker, topic, and partition data for the Kafka cluster") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = ClusterInfoVO.class) + }) + @RequestMapping(path = "/", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody + ClusterInfoVO getCluster() throws Exception + { + ClusterInfoVO vo = new ClusterInfoVO(); + + vo.zookeeper = zookeeperProperties; + vo.brokers = kafkaMonitor.getBrokers(); + vo.topics = kafkaMonitor.getTopics(); + + return vo; + } + @ExceptionHandler(BrokerNotFoundException.class) private String brokerNotFound(Model model) { @@ -57,4 +82,15 @@ private String brokerNotFound(Model model) return "cluster-overview"; } + + /** + * Simple DTO to encapsulate the cluster state: ZK properties, broker list, + * and topic list. + */ + public static class ClusterInfoVO + { + public CuratorConfiguration.ZookeeperProperties zookeeper; + public List brokers; + public List topics; + } } diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/ConsumerController.java b/src/main/java/com/homeadvisor/kafdrop/controller/ConsumerController.java index 55e836a..fbead89 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/ConsumerController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/ConsumerController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,13 +18,20 @@ package com.homeadvisor.kafdrop.controller; +import com.homeadvisor.kafdrop.model.ConsumerVO; import com.homeadvisor.kafdrop.service.ConsumerNotFoundException; import com.homeadvisor.kafdrop.service.KafkaMonitor; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/consumer") @@ -41,4 +48,18 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode return "consumer-detail"; } + @ApiOperation(value = "getConsumer", notes = "Get topic and partition details for a consumer group") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = ConsumerVO.class), + @ApiResponse(code = 404, message = "Invalid consumer group") + }) + @RequestMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws Exception + { + final ConsumerVO consumer = kafkaMonitor.getConsumer(groupId) + .orElseThrow(() -> new ConsumerNotFoundException(groupId)); + + return consumer; + } + } diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/KafkaExceptionHandler.java b/src/main/java/com/homeadvisor/kafdrop/controller/KafkaExceptionHandler.java index abc24d2..5f0d2c3 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/KafkaExceptionHandler.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/KafkaExceptionHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java index 75bbe11..b08fb09 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/MessageController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,23 +18,29 @@ package com.homeadvisor.kafdrop.controller; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.homeadvisor.kafdrop.model.MessageVO; import com.homeadvisor.kafdrop.model.TopicVO; import com.homeadvisor.kafdrop.service.KafkaMonitor; import com.homeadvisor.kafdrop.service.MessageInspector; import com.homeadvisor.kafdrop.service.TopicNotFoundException; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.validation.BindingResult; -import org.springframework.web.bind.annotation.ModelAttribute; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.*; import javax.validation.Valid; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; @Controller public class MessageController @@ -45,16 +51,24 @@ public class MessageController @Autowired private MessageInspector messageInspector; + /** + * Human friendly view of reading messages. + * @param topicName Name of topic + * @param messageForm Message form for submitting requests to view messages. + * @param errors + * @param model + * @return View for seeing messages in a partition. + */ @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/messages") public String viewMessageForm(@PathVariable("name") String topicName, - @Valid @ModelAttribute("messageForm") MessageInspectorForm messageForm, + @Valid @ModelAttribute("messageForm") PartitionOffsetInfo messageForm, BindingResult errors, Model model) { if (messageForm.isEmpty()) { - final MessageInspectorForm defaultForm = new MessageInspectorForm(); - defaultForm.setCount(1); + final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo(); + defaultForm.setCount(1l); model.addAttribute("messageForm", defaultForm); } @@ -74,21 +88,97 @@ public String viewMessageForm(@PathVariable("name") String topicName, return "message-inspector"; } - public static class MessageInspectorForm + /** + * Return a JSON list of all partition offset info for the given topic. If specific partition + * and offset parameters are given, then this returns actual kafka messages from that partition + * (if the offsets are valid; if invalid offsets are passed then the message list is empty). + * @param topicName Name of topic. + * @return Offset or message data. + */ + @ApiOperation(value = "getPartitionOrMessages", notes = "Get offset or message data for a topic. Without query params returns all partitions with offset data. With query params, returns actual messages (if valid offsets are provided).") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = List.class), + @ApiResponse(code = 404, message = "Invalid topic name") + }) + @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/messages", produces = MediaType.APPLICATION_JSON_VALUE) + public @ResponseBody List getPartitionOrMessages( + @PathVariable("name") String topicName, + @RequestParam(name = "partition", required = false) Integer partition, + @RequestParam(name = "offset", required = false) Long offset, + @RequestParam(name = "count", required = false) Long count + ) + { + if(partition == null || offset == null || count == null) + { + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + List partitionList = new ArrayList<>(); + topic.getPartitions().stream().forEach(vo -> partitionList.add(new PartitionOffsetInfo(vo.getId(), vo.getFirstOffset(), vo.getSize()))); + + return partitionList; + } + else + { + List messages = new ArrayList<>(); + List vos = messageInspector.getMessages( + topicName, + partition, + offset, + count); + + if(vos != null) + { + vos.stream().forEach(vo -> messages.add(vo)); + } + + return messages; + } + } + + /** + * Encapsulates offset data for a single partition. + */ + public static class PartitionOffsetInfo { @NotNull @Min(0) private Integer partition; + /** + * Need to clean this up. We're re-using this form for the JSON message API + * and it's a bit confusing to have the Java variable and JSON field named + * differently. + */ @NotNull @Min(0) - private Integer offset; - + @JsonProperty("firstOffset") + private Long offset; + + /** + * Need to clean this up. We're re-using this form for the JSON message API + * and it's a bit confusing to have the Java variable and JSON field named + * differently. + */ @NotNull @Min(1) @Max(100) - private Integer count; + @JsonProperty("lastOffset") + private Long count; + + public PartitionOffsetInfo(int partition, long offset, long count) + { + this.partition = partition; + this.offset = offset; + this.count = count; + } + + public PartitionOffsetInfo() + { + + } + @JsonIgnore public boolean isEmpty() { return partition == null && offset == null && (count == null || count == 1); @@ -104,22 +194,22 @@ public void setPartition(Integer partition) this.partition = partition; } - public Integer getOffset() + public Long getOffset() { return offset; } - public void setOffset(Integer offset) + public void setOffset(Long offset) { this.offset = offset; } - public Integer getCount() + public Long getCount() { return count; } - public void setCount(Integer count) + public void setCount(Long count) { this.count = count; } diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/TopicController.java b/src/main/java/com/homeadvisor/kafdrop/controller/TopicController.java index 1d9d6f1..c49b9e4 100644 --- a/src/main/java/com/homeadvisor/kafdrop/controller/TopicController.java +++ b/src/main/java/com/homeadvisor/kafdrop/controller/TopicController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,14 +18,26 @@ package com.homeadvisor.kafdrop.controller; +import com.homeadvisor.kafdrop.model.ConsumerVO; import com.homeadvisor.kafdrop.model.TopicVO; +import com.homeadvisor.kafdrop.service.ConsumerNotFoundException; import com.homeadvisor.kafdrop.service.KafkaMonitor; import com.homeadvisor.kafdrop.service.TopicNotFoundException; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +import java.util.List; +import java.util.stream.Collectors; @Controller @RequestMapping("/topic") @@ -45,4 +57,48 @@ public String topicDetails(@PathVariable("name") String topicName, Model model) return "topic-detail"; } + @ApiOperation(value = "getTopic", notes = "Get partition and consumer details for a topic") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = TopicVO.class), + @ApiResponse(code = 404, message = "Invalid topic name or consumer group") + }) + @RequestMapping(path = "/{name:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody TopicVO getTopic(@PathVariable("name") String topicName) throws Exception + { + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + return topic; + } + + @ApiOperation(value = "getTopicAndConsumer", notes = "Get partition details for a topic and consumer group") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = ConsumerVO.class), + @ApiResponse(code = 404, message = "Invalid topic name or consumer group") + }) + @RequestMapping(path = "/{name:.+}/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody + ConsumerVO getTopicAndConsumer( + @PathVariable("name") String topicName, + @PathVariable("groupId") String groupId) + throws Exception + { + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + final ConsumerVO consumer = kafkaMonitor.getConsumerByTopic(groupId, topic) + .orElseThrow(() -> new ConsumerNotFoundException(topicName)); + + return consumer; + } + + @ApiOperation(value = "getAllTopics", notes = "Get list of all topics") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = String.class, responseContainer = "List") + }) + @RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + public @ResponseBody List getAllTopics() throws Exception + { + return kafkaMonitor.getTopics().stream().map(TopicVO::getName).collect(Collectors.toList()); + } } diff --git a/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java b/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java index 77f0b2f..7e8ba7b 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerPartitionVO.java b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerPartitionVO.java index a38c6d0..fde8e20 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerPartitionVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerPartitionVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerRegistrationVO.java b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerRegistrationVO.java index 462f784..b505c48 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerRegistrationVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerRegistrationVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerTopicVO.java b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerTopicVO.java index ad97c9c..2926045 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerTopicVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerTopicVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerVO.java b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerVO.java index 017017e..3ffe92a 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/ConsumerVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/ConsumerVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java b/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java index 2608337..2fa863d 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/MessageVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionStateVO.java b/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionStateVO.java index feda9b7..1d709c3 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionStateVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionStateVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionVO.java b/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionVO.java index b1af87a..5a07158 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/TopicPartitionVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/TopicRegistrationVO.java b/src/main/java/com/homeadvisor/kafdrop/model/TopicRegistrationVO.java index 94920c1..870afe1 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/TopicRegistrationVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/TopicRegistrationVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java b/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java index 92c0127..e48f0a5 100644 --- a/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java +++ b/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/BrokerNotFoundException.java b/src/main/java/com/homeadvisor/kafdrop/service/BrokerNotFoundException.java index 9df7f8b..d5f7a00 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/BrokerNotFoundException.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/BrokerNotFoundException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/ConsumerNotFoundException.java b/src/main/java/com/homeadvisor/kafdrop/service/ConsumerNotFoundException.java index f26e8e4..cabca55 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/ConsumerNotFoundException.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/ConsumerNotFoundException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java index c7a8b4d..397e2f5 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitorProperties.java b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitorProperties.java index 5bb07be..f6cdd9d 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitorProperties.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitorProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/KafkaMonitor.java b/src/main/java/com/homeadvisor/kafdrop/service/KafkaMonitor.java index 0bccaf7..b414064 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/KafkaMonitor.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java index ebc7a09..5143bda 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/MessageInspector.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ public class MessageInspector @Autowired private KafkaMonitor kafkaMonitor; - public List getMessages(String topicName, int partitionId, int offset, int count) + public List getMessages(String topicName, int partitionId, long offset, long count) { final TopicVO topic = kafkaMonitor.getTopic(topicName).orElseThrow(TopicNotFoundException::new); final TopicPartitionVO partition = topic.getPartition(partitionId).orElseThrow(PartitionNotFoundException::new); @@ -64,7 +64,7 @@ public List getMessages(String topicName, int partitionId, int offset .maxWait(5000) // todo: make configurable .minBytes(1); - List messages = new ArrayList<>(count); + List messages = new ArrayList<>(); long currentOffset = offset; while (messages.size() < count) { diff --git a/src/main/java/com/homeadvisor/kafdrop/service/NotInitializedException.java b/src/main/java/com/homeadvisor/kafdrop/service/NotInitializedException.java index 3a50b4d..c0ef496 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/NotInitializedException.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/NotInitializedException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/PartitionNotFoundException.java b/src/main/java/com/homeadvisor/kafdrop/service/PartitionNotFoundException.java index 17f6a14..ff02d22 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/PartitionNotFoundException.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/PartitionNotFoundException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/service/TopicNotFoundException.java b/src/main/java/com/homeadvisor/kafdrop/service/TopicNotFoundException.java index 7ed3388..5a6f8f4 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/TopicNotFoundException.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/TopicNotFoundException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/util/BrokerChannel.java b/src/main/java/com/homeadvisor/kafdrop/util/BrokerChannel.java index 836de0c..0667d52 100644 --- a/src/main/java/com/homeadvisor/kafdrop/util/BrokerChannel.java +++ b/src/main/java/com/homeadvisor/kafdrop/util/BrokerChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/homeadvisor/kafdrop/util/JmxUtils.java b/src/main/java/com/homeadvisor/kafdrop/util/JmxUtils.java index dc76685..435c664 100644 --- a/src/main/java/com/homeadvisor/kafdrop/util/JmxUtils.java +++ b/src/main/java/com/homeadvisor/kafdrop/util/JmxUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 HomeAdvisor, Inc. + * Copyright 2017 HomeAdvisor, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.