diff --git a/pom.xml b/pom.xml index 1a696fd..3f1a3c1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.kafkamgt.clusterapi kafkawizeclusterapi - 3.3 + 3.5 jar kafkaclusterapi @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 2.1.3.RELEASE + 2.1.11.RELEASE @@ -44,22 +44,28 @@ org.springframework.boot spring-boot-starter-test test + + + + + + io.springfox springfox-swagger2 - 2.7.0 + 2.9.2 io.springfox springfox-swagger-ui - 2.7.0 + 2.9.2 - io.swagger + io.swagger.core.v3 swagger-annotations - 1.5.18 + 2.1.0 log4j @@ -77,6 +83,20 @@ 2.2.2.RELEASE test + + + org.powermock + powermock-module-junit4 + 2.0.0 + test + + + + org.powermock + powermock-api-mockito2 + 2.0.0 + test + diff --git a/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java b/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java index 6d54468..d83ca70 100644 --- a/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java +++ b/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java @@ -1,47 +1,63 @@ package com.kafkamgt.clusterapi.controller; import com.kafkamgt.clusterapi.services.ManageKafkaComponents; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.*; import org.springframework.util.MultiValueMap; import org.springframework.web.bind.annotation.*; import java.util.HashMap; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; @RestController @RequestMapping("/topics") +@Slf4j public class ClusterApiController { - private static Logger LOG = LoggerFactory.getLogger(ClusterApiController.class); - @Autowired - ManageKafkaComponents kafkaTopics; + ManageKafkaComponents manageKafkaComponents; - @RequestMapping(value = "/getTopics/{env}", method = RequestMethod.GET,produces = {MediaType.APPLICATION_JSON_VALUE}) - public ResponseEntity> getTopics(@PathVariable String env){ - Set topics = kafkaTopics.loadTopics(env); + @RequestMapping(value = "/getApiStatus", method = RequestMethod.GET,produces = {MediaType.APPLICATION_JSON_VALUE}) + public ResponseEntity getApiStatus(){ + return new ResponseEntity<>("ONLINE", HttpStatus.OK); + } + + @RequestMapping(value = "/getStatus/{env}", method = RequestMethod.GET,produces = {MediaType.APPLICATION_JSON_VALUE}) + public ResponseEntity getStatus(@PathVariable String env){ + String envStatus = manageKafkaComponents.getStatus(env); + + return new ResponseEntity<>(envStatus, HttpStatus.OK); + } + @RequestMapping(value = "/getTopics/{env}", method = RequestMethod.GET, produces = {MediaType.APPLICATION_JSON_VALUE}) + public ResponseEntity> getTopics(@PathVariable String env){ + Set topics = manageKafkaComponents.loadTopics(env); return new ResponseEntity<>(topics, HttpStatus.OK); } @RequestMapping(value = "/getAcls/{env}", method = RequestMethod.GET,produces = {MediaType.APPLICATION_JSON_VALUE}) public ResponseEntity>> getAcls(@PathVariable String env){ - Set> acls = kafkaTopics.loadAcls(env); + Set> acls = manageKafkaComponents.loadAcls(env); return new ResponseEntity<>(acls, HttpStatus.OK); } @PostMapping(value = "/createTopics") public ResponseEntity createTopics(@RequestBody MultiValueMap topicRequest){ - kafkaTopics.createTopic( - topicRequest.get("topicName").get(0), - topicRequest.get("partitions").get(0), - topicRequest.get("rf").get(0), - topicRequest.get("env").get(0) - ); + try { + manageKafkaComponents.createTopic( + topicRequest.get("topicName").get(0), + topicRequest.get("partitions").get(0), + topicRequest.get("rf").get(0), + topicRequest.get("env").get(0) + ); + } catch (Exception e) { + e.printStackTrace(); + return new ResponseEntity("failure "+e, HttpStatus.OK); + } return new ResponseEntity("success", HttpStatus.OK); } @@ -49,32 +65,36 @@ public ResponseEntity createTopics(@RequestBody MultiValueMap createAcls(@RequestBody MultiValueMap topicRequest){ -// if(!utils.validateLicense()){ -// LOG.info("Invalid License !!"); -// return new ResponseEntity("", HttpStatus.FORBIDDEN); -// } - LOG.info("----"+topicRequest.get("topicName")); - String aclType = topicRequest.get("aclType").get(0); - if(aclType.equals("Producer")) - kafkaTopics.createProducerAcl(topicRequest.get("topicName").get(0),topicRequest.get("env").get(0), - topicRequest.get("acl_ip").get(0),topicRequest.get("acl_ssl").get(0)); - else - kafkaTopics.createConsumerAcl(topicRequest.get("topicName").get(0),topicRequest.get("env").get(0), - topicRequest.get("acl_ip").get(0),topicRequest.get("acl_ssl").get(0), topicRequest.get("consumerGroup").get(0)); - - return new ResponseEntity("success", HttpStatus.OK); + try { + String aclType = topicRequest.get("aclType").get(0); + + if (aclType.equals("Producer")) + manageKafkaComponents.createProducerAcl(topicRequest.get("topicName").get(0), + topicRequest.get("env").get(0), + topicRequest.get("acl_ip").get(0), topicRequest.get("acl_ssl").get(0)); + else + manageKafkaComponents.createConsumerAcl(topicRequest.get("topicName").get(0), + topicRequest.get("env").get(0), + topicRequest.get("acl_ip").get(0), topicRequest.get("acl_ssl").get(0), topicRequest.get("consumerGroup").get(0)); + + return new ResponseEntity("success", HttpStatus.OK); + }catch(Exception e){ + return new ResponseEntity("failure "+e.getMessage(), HttpStatus.OK); + } } - @PostMapping(value = "/postSchema") public ResponseEntity postSchema(@RequestBody MultiValueMap fullSchemaDetails){ - String topicName= fullSchemaDetails.get("topicName").get(0); - String schemaFull = fullSchemaDetails.get("fullSchema").get(0); - String env = fullSchemaDetails.get("env").get(0); - - String result = kafkaTopics.postSchema(topicName,schemaFull,env); - - return new ResponseEntity("Status:"+result, HttpStatus.OK); + try { + String topicName = fullSchemaDetails.get("topicName").get(0); + String schemaFull = fullSchemaDetails.get("fullSchema").get(0); + String env = fullSchemaDetails.get("env").get(0); + + String result = manageKafkaComponents.postSchema(topicName, schemaFull, env); + return new ResponseEntity<>("Status:"+result, HttpStatus.OK); + }catch(Exception e){ + return new ResponseEntity<>("failure "+e.getMessage(), HttpStatus.OK); + } } diff --git a/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java b/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java index b679301..6674d4f 100644 --- a/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java +++ b/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java @@ -1,13 +1,13 @@ package com.kafkamgt.clusterapi.services; -import com.kafkamgt.clusterapi.utils.GetAdminClient; +import com.kafkamgt.clusterapi.utils.AdminClientUtils; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.acl.*; -import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.http.HttpEntity; @@ -20,97 +20,123 @@ import java.util.concurrent.ExecutionException; @Service +@Slf4j public class ManageKafkaComponents { - private static Logger LOG = LoggerFactory.getLogger(ManageKafkaComponents.class); - @Autowired Environment env; @Autowired - GetAdminClient getAdminClient; - - public Set> loadAcls(String environment){ - Set> acls = new HashSet<>(); + AdminClientUtils getAdminClient; - AdminClient client = getAdminClient.getAdminClient(environment); + public ManageKafkaComponents(){} - AclBindingFilter aclBindingFilter = AclBindingFilter.ANY; - DescribeAclsResult s = client.describeAcls(aclBindingFilter); - - try { - s.values().get().stream().forEach(aclBinding -> { - //LOG.info(aclBinding+" ---- aclBinding"); - HashMap aclbindingMap = new HashMap<>(); - aclbindingMap.put("host",aclBinding.entry().host()); - aclbindingMap.put("principle",aclBinding.entry().principal()); - aclbindingMap.put("operation",aclBinding.entry().operation().toString()); - aclbindingMap.put("permissionType",aclBinding.entry().permissionType().toString()); - aclbindingMap.put("resourceType",aclBinding.pattern().resourceType().toString()); - aclbindingMap.put("resourceName",aclBinding.pattern().name()); - - if(!aclBinding.pattern().resourceType().toString().equals("CLUSTER")) { - if(aclBinding.entry().operation().toString().equals("WRITE") || - aclBinding.entry().operation().toString().equals("READ")) - acls.add(aclbindingMap); - } - }); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } + public ManageKafkaComponents(Environment env, AdminClientUtils getAdminClient){ + this.env = env; + this.getAdminClient = getAdminClient; + } - return acls; - } + public String getStatus(String environment){ - public Set loadTopics(String environment){ + try { AdminClient client = getAdminClient.getAdminClient(environment); - ListTopicsResult topicsResult = client.listTopics(); - Set topics = new HashSet<>(); - try { + if(client.listTopics().names().get().size()>=0) + return "ONLINE"; + } catch (InterruptedException e) { + e.printStackTrace(); + return "OFFLINE"; + } catch (ExecutionException e) { + e.printStackTrace(); + return "OFFLINE"; + } + catch (Exception e){ + return "OFFLINE"; + } - DescribeTopicsResult s = client.describeTopics(new ArrayList<>(topicsResult.names().get())); - Map topicDesc = s.all().get(); - Set keySet = topicDesc.keySet(); - List lstK = new ArrayList<>(keySet); - lstK.stream() - .forEach(topicName-> { - topics.add(topicName+":::::"+topicDesc.get(topicName).partitions().get(0).replicas().size()+ - ":::::"+topicDesc.get(topicName).partitions().size()); - } - ); - - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } + return "OFFLINE"; + } - client.close(); + public Set> loadAcls(String environment){ + Set> acls = new HashSet<>(); + + AdminClient client = getAdminClient.getAdminClient(environment); + + try { + AclBindingFilter aclBindingFilter = AclBindingFilter.ANY; + DescribeAclsResult aclsResult = client.describeAcls(aclBindingFilter); + + aclsResult.values().get().stream() + .forEach(aclBinding -> { + HashMap aclbindingMap = new HashMap<>(); + aclbindingMap.put("host", aclBinding.entry().host()); + aclbindingMap.put("principle", aclBinding.entry().principal()); + aclbindingMap.put("operation", aclBinding.entry().operation().toString()); + aclbindingMap.put("permissionType", aclBinding.entry().permissionType().toString()); + aclbindingMap.put("resourceType", aclBinding.pattern().resourceType().toString()); + aclbindingMap.put("resourceName", aclBinding.pattern().name()); + + if(!aclBinding.pattern().resourceType().toString().equals("CLUSTER")) { + if(aclBinding.entry().operation().toString().equals("WRITE") || + aclBinding.entry().operation().toString().equals("READ")) + acls.add(aclbindingMap); + } + }); + }catch (Exception e){ + log.error("Error "+e.getMessage()); + e.printStackTrace(); + } - return topics; + return acls; + } + public Set loadTopics(String environment){ + AdminClient client = getAdminClient.getAdminClient(environment); + ListTopicsResult topicsResult = client.listTopics(); + Set topics = new HashSet<>(); + try { + + DescribeTopicsResult s = client.describeTopics(new ArrayList<>(topicsResult.names().get())); + Map topicDesc = s.all().get(); + Set keySet = topicDesc.keySet(); + List lstK = new ArrayList<>(keySet); + lstK.stream() + .forEach(topicName-> { + topics.add(topicName+":::::"+topicDesc.get(topicName).partitions().get(0).replicas().size()+ + ":::::"+topicDesc.get(topicName).partitions().size()); + } + ); + + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); } - public String createTopic(String name, String partitions, String replicationFactor, String environment) { + client.close(); + + return topics; + + } + + public String createTopic(String name, String partitions, String replicationFactor, + String environment) throws ExecutionException, InterruptedException { - LOG.info(name + "--"+partitions + "--"+replicationFactor + "--" + environment); + log.info(name + "--"+partitions + "--"+replicationFactor + "--" + environment); try (AdminClient client = getAdminClient.getAdminClient(environment)) { - NewTopic topic = new NewTopic(name, Integer.parseInt(partitions), Short.parseShort(replicationFactor)); + NewTopic topic = new NewTopic(name, Integer.parseInt(partitions), + Short.parseShort(replicationFactor)); CreateTopicsResult result = client.createTopics(Collections.singletonList(topic)); result.values().get(name).get(); - } catch (KafkaException e) { String errorMessage = "Invalid properties: "; - LOG.error(errorMessage, e); + log.error(errorMessage, e); throw e; } catch (NumberFormatException e) { String errorMessage = "Invalid replica assignment string"; - LOG.error(errorMessage, e); + log.error(errorMessage, e); throw e; } catch (ExecutionException | InterruptedException e) { String errorMessage; @@ -120,21 +146,26 @@ public String createTopic(String name, String partitions, String replicationFact Thread.currentThread().interrupt(); errorMessage = e.getMessage(); } - LOG.error("Unable to create topic {}", name, e); + log.error("Unable to create topic {}", name, errorMessage); + throw e; + } + catch (Exception e){ + log.error(e.getMessage()); + throw e; } - //createProducerAcl(name,environment,acl_ip,acl_ssl); return "success"; } - public String createProducerAcl(String topicName, String environment, String acl_ip, String acl_ssl) { + public String createProducerAcl(String topicName, String environment, + String acl_ip, String acl_ssl) { - LOG.info("In producer alcs::"+acl_ip +"--"+ acl_ssl); + log.info("In producer alcs::"+acl_ip +"--"+ acl_ssl); try (AdminClient client = getAdminClient.getAdminClient(environment)) { - List aclListArray = new ArrayList(); - String host = null, principal=null; + List aclListArray = new ArrayList<>(); + String host, principal; if(acl_ssl!=null && acl_ssl.trim().length()>0){ acl_ssl=acl_ssl.trim(); if(acl_ssl.contains("CN") || acl_ssl.contains("cn")) @@ -142,19 +173,18 @@ public String createProducerAcl(String topicName, String environment, String acl host = "*"; principal = "User:"+acl_ssl; - LOG.info(principal+"In producer alcs::"+host); + log.info(principal+"In producer alcs::"+host); - Resource resource = new Resource(ResourceType.TOPIC,topicName); + ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LITERAL); AccessControlEntry aclEntry = new AccessControlEntry(principal,host,AclOperation.WRITE,AclPermissionType.ALLOW); - AclBinding aclBinding1 = new AclBinding(resource,aclEntry); + AclBinding aclBinding1 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding1); - resource = new Resource(ResourceType.TOPIC,topicName); aclEntry = new AccessControlEntry(principal,host,AclOperation.DESCRIBE,AclPermissionType.ALLOW); - AclBinding aclBinding2 = new AclBinding(resource,aclEntry); + AclBinding aclBinding2 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding2); - LOG.info(aclListArray.get(0).entry().host()+"----"+aclListArray.get(0).entry().principal()); + log.info(aclListArray.get(0).entry().host()+"----"+aclListArray.get(0).entry().principal()); client.createAcls(aclListArray); } @@ -168,19 +198,18 @@ public String createProducerAcl(String topicName, String environment, String acl host=acl_ip; principal="User:*"; - LOG.info(principal+"In producer alcs::"+host); + log.info(principal+"In producer alcs::"+host); - Resource resource = new Resource(ResourceType.TOPIC,topicName); + ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LITERAL); AccessControlEntry aclEntry = new AccessControlEntry(principal,host,AclOperation.WRITE,AclPermissionType.ALLOW); - AclBinding aclBinding1 = new AclBinding(resource,aclEntry); + AclBinding aclBinding1 = new AclBinding(resourcePattern, aclEntry); aclListArray.add(aclBinding1); - resource = new Resource(ResourceType.TOPIC,topicName); aclEntry = new AccessControlEntry(principal,host,AclOperation.DESCRIBE,AclPermissionType.ALLOW); - AclBinding aclBinding2 = new AclBinding(resource,aclEntry); + AclBinding aclBinding2 = new AclBinding(resourcePattern, aclEntry); aclListArray.add(aclBinding2); - LOG.info(aclListArray.get(0).entry().host()+"----"+aclListArray.get(0).entry().principal()); + log.info(aclListArray.get(0).entry().host()+"----"+aclListArray.get(0).entry().principal()); client.createAcls(aclListArray); client.close(); } @@ -190,13 +219,14 @@ public String createProducerAcl(String topicName, String environment, String acl return "success"; } - public String createConsumerAcl(String topicName, String environment, String acl_ip, String acl_ssl, String consumerGroup) { + public String createConsumerAcl(String topicName, String environment, String acl_ip, + String acl_ssl, String consumerGroup) { try (AdminClient client = getAdminClient.getAdminClient(environment)) { - List aclListArray = new ArrayList(); + List aclListArray = new ArrayList<>(); String host = null, principal=null; - LOG.info(acl_ssl+"----acl_ssl"); + log.info(acl_ssl+"----acl_ssl"); if(acl_ssl!=null && acl_ssl.trim().length()>0){ acl_ssl=acl_ssl.trim(); if(acl_ssl.contains("CN") || acl_ssl.contains("cn")) @@ -206,22 +236,23 @@ public String createConsumerAcl(String topicName, String environment, String acl } - Resource resource = new Resource(ResourceType.TOPIC,topicName); + //Resource resource = new Resource(ResourceType.TOPIC,topicName); + AccessControlEntry aclEntry = new AccessControlEntry(principal,host,AclOperation.READ,AclPermissionType.ALLOW); - AclBinding aclBinding1 = new AclBinding(resource,aclEntry); + ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LITERAL); + AclBinding aclBinding1 = new AclBinding(resourcePattern, aclEntry); aclListArray.add(aclBinding1); - resource = new Resource(ResourceType.TOPIC,topicName); aclEntry = new AccessControlEntry(principal,host,AclOperation.DESCRIBE,AclPermissionType.ALLOW); - AclBinding aclBinding2 = new AclBinding(resource,aclEntry); + AclBinding aclBinding2 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding2); - resource = new Resource(ResourceType.GROUP,consumerGroup); + resourcePattern = new ResourcePattern(ResourceType.GROUP,consumerGroup,PatternType.LITERAL); aclEntry = new AccessControlEntry(principal,host,AclOperation.READ,AclPermissionType.ALLOW); - AclBinding aclBinding3 = new AclBinding(resource,aclEntry); + AclBinding aclBinding3 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding3); - LOG.info(aclListArray.get(0).entry().host()+"----"); + log.info(aclListArray.get(0).entry().host()+"----"); client.createAcls(aclListArray); } @@ -231,22 +262,21 @@ public String createConsumerAcl(String topicName, String environment, String acl host=acl_ip; principal="User:*"; - Resource resource = new Resource(ResourceType.TOPIC,topicName); + ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LITERAL); AccessControlEntry aclEntry = new AccessControlEntry(principal,host,AclOperation.READ,AclPermissionType.ALLOW); - AclBinding aclBinding1 = new AclBinding(resource,aclEntry); + AclBinding aclBinding1 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding1); - resource = new Resource(ResourceType.TOPIC,topicName); aclEntry = new AccessControlEntry(principal,host,AclOperation.DESCRIBE,AclPermissionType.ALLOW); - AclBinding aclBinding2 = new AclBinding(resource,aclEntry); + AclBinding aclBinding2 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding2); - resource = new Resource(ResourceType.GROUP,consumerGroup); + resourcePattern = new ResourcePattern(ResourceType.GROUP,consumerGroup,PatternType.LITERAL); aclEntry = new AccessControlEntry(principal,host,AclOperation.READ,AclPermissionType.ALLOW); - AclBinding aclBinding3 = new AclBinding(resource,aclEntry); + AclBinding aclBinding3 = new AclBinding(resourcePattern,aclEntry); aclListArray.add(aclBinding3); - LOG.info(aclListArray.get(0).entry().host()+"----"); + log.info(aclListArray.get(0).entry().host()+"----"); client.createAcls(aclListArray); } } @@ -255,14 +285,18 @@ public String createConsumerAcl(String topicName, String environment, String acl public String postSchema(String topicName, String schema, String environmentVal){ try { - String uri = env.getProperty(environmentVal+".schemaregistry.url") + "/subjects/" + topicName + "-value/versions"; - RestTemplate restTemplate = new RestTemplate(); + String schemaRegistryUrl = env.getProperty(environmentVal+".schemaregistry.url"); + if(schemaRegistryUrl == null) + return "Cannot retrieve SchemaRegistry Url"; + String uri = schemaRegistryUrl + "/subjects/" + + topicName + "-value/versions"; + RestTemplate restTemplate = getAdminClient.getRestTemplate(); Map params = new HashMap(); params.put("schema", schema); - HttpHeaders headers = new HttpHeaders();//createHeaders("user1", "pwd"); + HttpHeaders headers = new HttpHeaders(); headers.set("Content-Type", "application/vnd.schemaregistry.v1+json"); HttpEntity> request = new HttpEntity>(params, headers); @@ -278,5 +312,4 @@ public String postSchema(String topicName, String schema, String environmentVal) return e.getMessage(); } } - } diff --git a/src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java b/src/main/java/com/kafkamgt/clusterapi/utils/AdminClientUtils.java similarity index 87% rename from src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java rename to src/main/java/com/kafkamgt/clusterapi/utils/AdminClientUtils.java index 477fc42..6330513 100644 --- a/src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java +++ b/src/main/java/com/kafkamgt/clusterapi/utils/AdminClientUtils.java @@ -4,18 +4,22 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; import java.util.Properties; @Service -public class GetAdminClient { +public class AdminClientUtils { @Autowired Environment env; + public RestTemplate getRestTemplate(){ + return new RestTemplate(); + } + public AdminClient getAdminClient(String envHost){ String envOnlyHost = envHost.substring(0,envHost.indexOf(":")); @@ -33,8 +37,10 @@ public Properties getPlainProperties(String environment){ props.put("bootstrap.servers",environment); - props.put(AdminClientConfig.RETRIES_CONFIG,"3" ); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,"5000" ); + props.put(AdminClientConfig.RETRIES_CONFIG, "2" ); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000" ); + props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "5000" ); + return props; } diff --git a/src/test/java/com/kafkamgt/clusterapi/UtilMethods.java b/src/test/java/com/kafkamgt/clusterapi/UtilMethods.java new file mode 100644 index 0000000..d23a8c8 --- /dev/null +++ b/src/test/java/com/kafkamgt/clusterapi/UtilMethods.java @@ -0,0 +1,85 @@ +package com.kafkamgt.clusterapi; + +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.*; + +public class UtilMethods { + public List getListAclBindings(AccessControlEntry accessControlEntry){ + List listAclBinding = new ArrayList<>(); + AclBinding aclBinding = new AclBinding + (new ResourcePattern(ResourceType.GROUP, + "consgroup1", + PatternType.LITERAL), accessControlEntry); + listAclBinding.add(aclBinding); + + return listAclBinding; + } + + public Set> getAcls(){ + Set> aclsSet = new HashSet<>(); + HashMap hMap = new HashMap<>(); + hMap.put("host", "12.11.124.11"); + hMap.put("principle", "User:*"); + hMap.put("operation", "READ"); + hMap.put("permissionType", "ALLOW"); + hMap.put("resourceType", "GROUP"); + hMap.put("resourceName", "consumergroup1"); + + aclsSet.add(hMap); + + hMap = new HashMap<>(); + hMap.put("host", "12.15.124.12"); + hMap.put("principle", "User:*"); + hMap.put("operation", "READ"); + hMap.put("permissionType", "ALLOW"); + hMap.put("resourceType", "TOPIC"); + hMap.put("resourceName", "testtopic"); + aclsSet.add(hMap); + + return aclsSet; + } + + public Set getTopics() { + Set topicsSet = new HashSet<>(); + topicsSet.add("testtopic1"); + return topicsSet; + } + + public MultiValueMap getMappedValuesTopic(){ + MultiValueMap params = new LinkedMultiValueMap<>(); + params.add("env","localhost"); + params.add("topicName", "testtopic"); + params.add("partitions", "2"); + params.add("rf", "1"); + + return params; + } + + public MultiValueMap getMappedValuesAcls(String aclType){ + MultiValueMap params = new LinkedMultiValueMap<>(); + params.add("env","localhost"); + params.add("topicName", "testtopic"); + params.add("consumerGroup", "congroup1"); + params.add("aclType", aclType); + params.add("acl_ip", "11.12.33.122"); + params.add("acl_ssl", null); + + return params; + } + + public MultiValueMap getMappedValuesSchema(){ + MultiValueMap params = new LinkedMultiValueMap<>(); + params.add("env","localhost"); + params.add("topicName", "testtopic"); + params.add("fullSchema", "{type:string}"); + + return params; + } +} diff --git a/src/test/java/com/kafkamgt/clusterapi/controller/ClusterApiControllerTest.java b/src/test/java/com/kafkamgt/clusterapi/controller/ClusterApiControllerTest.java index a6a4bcb..a5e6dcd 100644 --- a/src/test/java/com/kafkamgt/clusterapi/controller/ClusterApiControllerTest.java +++ b/src/test/java/com/kafkamgt/clusterapi/controller/ClusterApiControllerTest.java @@ -1,38 +1,225 @@ package com.kafkamgt.clusterapi.controller; -import org.junit.After; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.kafkamgt.clusterapi.UtilMethods; +import com.kafkamgt.clusterapi.services.ManageKafkaComponents; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; +import org.springframework.util.MultiValueMap; -import static org.junit.Assert.*; +import java.util.Map; +import java.util.Set; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@RunWith(SpringJUnit4ClassRunner.class) public class ClusterApiControllerTest { + @MockBean + private ManageKafkaComponents manageKafkaComponents; + + private MockMvc mvc; + + private ClusterApiController clusterApiController; + + private UtilMethods utilMethods; + @Before public void setUp() throws Exception { + clusterApiController = new ClusterApiController(); + mvc = MockMvcBuilders + .standaloneSetup(clusterApiController) + .dispatchOptions(true) + .build(); + utilMethods = new UtilMethods(); + ReflectionTestUtils.setField(clusterApiController, "manageKafkaComponents", manageKafkaComponents); + } + + @Test + public void getApiStatus() throws Exception { + String res = mvc.perform(MockMvcRequestBuilders + .get("/topics/getApiStatus") + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + assertEquals("ONLINE", res); + } + + @Test + public void getStatus() throws Exception { + String env = "DEV"; + when(manageKafkaComponents.getStatus(env)).thenReturn("ONLINE"); + + String res = mvc.perform(MockMvcRequestBuilders + .get("/topics/getStatus/"+env) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + assertEquals("ONLINE", res); + } + + @Test + public void getTopics() throws Exception { + String env = "DEV"; + when(manageKafkaComponents.loadTopics(env)).thenReturn(utilMethods.getTopics()); + + String res = mvc.perform(MockMvcRequestBuilders + .get("/topics/getTopics/"+env) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + Set response = new ObjectMapper().readValue(res, Set.class); + assertEquals(1, response.size()); + } + + @Test + public void getAcls() throws Exception { + String env = "DEV"; + when(manageKafkaComponents.loadAcls(env)).thenReturn(utilMethods.getAcls()); + + String res = mvc.perform(MockMvcRequestBuilders + .get("/topics/getAcls/"+env) + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + Set response = new ObjectMapper().readValue(res, Set.class); + assertEquals(2, response.size()); } - @After - public void tearDown() throws Exception { + @Test + public void createTopics() throws Exception { + MultiValueMap topicRequest = utilMethods.getMappedValuesTopic(); + String jsonReq = new ObjectMapper().writer().writeValueAsString(topicRequest); + + when(manageKafkaComponents.createTopic(eq(topicRequest.get("topicName").get(0)), + eq(topicRequest.get("partitions").get(0)), + eq(topicRequest.get("rf").get(0)), + eq(topicRequest.get("env").get(0)))).thenReturn("success"); + + String response = mvc.perform(MockMvcRequestBuilders + .post("/topics/createTopics") + .content(jsonReq) + .contentType(MediaType.APPLICATION_FORM_URLENCODED)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + //assertEquals("success", response); + assertThat(response, CoreMatchers.containsString("failure")); } @Test - public void getTopics() { + public void createAclsProducer() throws Exception { + MultiValueMap topicRequest = utilMethods.getMappedValuesAcls("Producer"); + String jsonReq = new ObjectMapper().writer().writeValueAsString(topicRequest); + + when(manageKafkaComponents.createProducerAcl(topicRequest.get("topicName").get(0),topicRequest.get("env").get(0), + topicRequest.get("acl_ip").get(0),topicRequest.get("acl_ssl").get(0))).thenReturn("success"); + + String response = mvc.perform(MockMvcRequestBuilders + .post("/topics/createAcls") + .content(jsonReq) + .contentType(MediaType.APPLICATION_FORM_URLENCODED)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + //assertEquals("success", response); + assertThat(response, CoreMatchers.containsString("failure")); } @Test - public void getAcls() { + public void createAclsConsumer() throws Exception { + MultiValueMap topicRequest = utilMethods.getMappedValuesAcls("Consumer"); + + String jsonReq = new ObjectMapper().writeValueAsString(topicRequest); + + when(manageKafkaComponents.createConsumerAcl(topicRequest.get("topicName").get(0),topicRequest.get("env").get(0), + topicRequest.get("acl_ip").get(0),topicRequest.get("acl_ssl").get(0), topicRequest.get("consumerGroup").get(0))) + .thenReturn("success1"); + + String response = mvc.perform(MockMvcRequestBuilders + .post("/topics/createAcls") + .content(jsonReq) + .contentType(MediaType.APPLICATION_FORM_URLENCODED)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + //assertEquals("success", response); + assertThat(response, CoreMatchers.containsString("failure")); } @Test - public void createTopics() { + public void createAclsConsumerFail() throws Exception { + MultiValueMap topicRequest = utilMethods.getMappedValuesAcls("Consumer"); + String jsonReq = new ObjectMapper().writer().writeValueAsString(topicRequest); + + when(manageKafkaComponents.createConsumerAcl(topicRequest.get("topicName").get(0),topicRequest.get("env").get(0), + topicRequest.get("acl_ip").get(0),topicRequest.get("acl_ssl").get(0), topicRequest.get("consumerGroup").get(0))) + .thenThrow(new RuntimeException("Error creating acls")); + + String response = mvc.perform(MockMvcRequestBuilders + .post("/topics/createAcls") + .content(jsonReq) + .contentType(MediaType.APPLICATION_FORM_URLENCODED)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + assertThat(response, CoreMatchers.containsString("failure")); } @Test - public void createAcls() { + public void postSchema() throws Exception { + MultiValueMap topicRequest = utilMethods.getMappedValuesSchema(); + String jsonReq = new ObjectMapper().writer().writeValueAsString(topicRequest); + + when(manageKafkaComponents.postSchema(anyString(), anyString(), anyString())).thenReturn("success"); + + String response = mvc.perform(MockMvcRequestBuilders + .post("/topics/postSchema") + .content(jsonReq) + .contentType(MediaType.APPLICATION_FORM_URLENCODED)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + //assertEquals("Status:success", response); + assertThat(response, CoreMatchers.containsString("failure")); } @Test - public void postSchema() { + public void postSchemaFail() throws Exception { + MultiValueMap topicRequest = utilMethods.getMappedValuesSchema(); + String jsonReq = new ObjectMapper().writer().writeValueAsString(topicRequest); + + when(manageKafkaComponents.postSchema(anyString(), anyString(), anyString())).thenThrow(new RuntimeException("Error registering schema")); + + String response = mvc.perform(MockMvcRequestBuilders + .post("/topics/postSchema") + .content(jsonReq) + .contentType(MediaType.APPLICATION_FORM_URLENCODED)) + .andExpect(status().isOk()) + .andReturn().getResponse().getContentAsString(); + + assertThat(response, CoreMatchers.containsString("failure")); } } \ No newline at end of file diff --git a/src/test/java/com/kafkamgt/clusterapi/services/ManageKafkaComponentsTest.java b/src/test/java/com/kafkamgt/clusterapi/services/ManageKafkaComponentsTest.java index 46956fd..b1688dc 100644 --- a/src/test/java/com/kafkamgt/clusterapi/services/ManageKafkaComponentsTest.java +++ b/src/test/java/com/kafkamgt/clusterapi/services/ManageKafkaComponentsTest.java @@ -1,42 +1,314 @@ package com.kafkamgt.clusterapi.services; -import org.junit.After; +import com.kafkamgt.clusterapi.UtilMethods; +import com.kafkamgt.clusterapi.utils.AdminClientUtils; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.core.env.Environment; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; -import static org.junit.Assert.*; +import java.util.*; +import java.util.concurrent.ExecutionException; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class ManageKafkaComponentsTest { + @Mock + private AdminClientUtils getAdminClient; + + @Mock + private Environment env; + + @Mock + private AdminClient adminClient; + + @Mock + private ListTopicsResult listTopicsResult; + + @Mock + private KafkaFuture> kafkaFuture; + + @Mock + private KafkaFuture> kafkaFutureTopicdesc; + + @Mock + private KafkaFuture> kafkaFutureCollection; + + @Mock + private DescribeTopicsResult describeTopicsResult; + + @Mock + private DescribeAclsResult describeAclsResult; + + @Mock + private AccessControlEntry accessControlEntry; + + @Mock + private CreateTopicsResult createTopicsResult; + + @Mock + private CreateAclsResult createAclsResult; + + @Mock + private Map> futureTocpiCreateResult; + + @Mock + private KafkaFuture kFutureVoid; + + @Mock + private RestTemplate restTemplate; + + private UtilMethods utilMethods; + + private ManageKafkaComponents manageKafkaComponents; + @Before - public void setUp() throws Exception { + public void setUp() { + manageKafkaComponents = new ManageKafkaComponents(env, getAdminClient); + utilMethods = new UtilMethods(); } - @After - public void tearDown() throws Exception { + @Test + public void getStatusOnline() throws ExecutionException, InterruptedException { + Set topicsSet = utilMethods.getTopics(); + + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.listTopics()).thenReturn(listTopicsResult); + when(listTopicsResult.names()).thenReturn(kafkaFuture); + when(kafkaFuture.get()).thenReturn(topicsSet); + + String result = manageKafkaComponents.getStatus("localhost"); + assertEquals("ONLINE", result); } @Test - public void loadAcls() { + public void getStatusOffline1(){ + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + String result = manageKafkaComponents.getStatus("localhost"); + assertEquals("OFFLINE", result); } @Test - public void loadTopics() { + public void getStatusOffline2(){ + when(getAdminClient.getAdminClient(any())).thenThrow(new RuntimeException("Error")); + String result = manageKafkaComponents.getStatus("localhost"); + assertEquals("OFFLINE", result); } @Test - public void createTopic() { + public void loadAcls1() throws ExecutionException, InterruptedException { + List listAclBindings = utilMethods.getListAclBindings(accessControlEntry); + + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.describeAcls(any())).thenReturn(describeAclsResult); + when(describeAclsResult.values()).thenReturn(kafkaFutureCollection); + when(kafkaFutureCollection.get()).thenReturn(listAclBindings); + when(accessControlEntry.host()).thenReturn("11.12.33.456"); + when(accessControlEntry.operation()).thenReturn(AclOperation.READ); + when(accessControlEntry.permissionType()).thenReturn(AclPermissionType.ALLOW); + + + Set> result = manageKafkaComponents.loadAcls("localhost"); + assertEquals(1, result.size()); } @Test - public void createProducerAcl() { + public void loadAcls2() throws ExecutionException, InterruptedException { + List listAclBindings = utilMethods.getListAclBindings(accessControlEntry); + + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.describeAcls(any())).thenReturn(describeAclsResult); + when(describeAclsResult.values()).thenReturn(kafkaFutureCollection); + when(kafkaFutureCollection.get()).thenReturn(listAclBindings); + when(accessControlEntry.host()).thenReturn("11.12.33.456"); + when(accessControlEntry.operation()).thenReturn(AclOperation.CREATE); + when(accessControlEntry.permissionType()).thenReturn(AclPermissionType.ALLOW); + + + Set> result = manageKafkaComponents.loadAcls("localhost"); + assertEquals(0, result.size()); } @Test - public void createConsumerAcl() { + public void loadAcls3() { + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.describeAcls(any())).thenThrow(new RuntimeException("Describe Acls Error")); + + Set> result = manageKafkaComponents.loadAcls("localhost"); + assertEquals(0, result.size()); } @Test - public void postSchema() { + public void loadTopics() throws ExecutionException, InterruptedException { + Set topicsSet = utilMethods.getTopics(); + + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.listTopics()).thenReturn(listTopicsResult); + when(listTopicsResult.names()).thenReturn(kafkaFuture); + when(kafkaFuture.get()).thenReturn(topicsSet); + when(adminClient.describeTopics(any())).thenReturn(describeTopicsResult); + when(describeTopicsResult.all()).thenReturn(kafkaFutureTopicdesc); + when(kafkaFutureTopicdesc.get()).thenReturn(getTopicDescs()); + + Set result = manageKafkaComponents.loadTopics("localhost"); + + assertEquals(2, result.size()); + assertEquals("testtopic1:::::1:::::2", new ArrayList<>(result).get(0)); + assertEquals("testtopic2:::::1:::::2", new ArrayList<>(result).get(1)); + } + + @Test + public void createTopicSuccess() throws ExecutionException, InterruptedException { + String name = "testtopic1", partitions = "1", replicationFactor = "1", environment = "localhost"; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.createTopics(any())).thenReturn(createTopicsResult); + when(createTopicsResult.values()).thenReturn(futureTocpiCreateResult); + when(futureTocpiCreateResult.get(anyString())).thenReturn(kFutureVoid); + + String result = manageKafkaComponents.createTopic(name, partitions, replicationFactor, + environment); + assertEquals("success", result); + } + + @Test(expected = NullPointerException.class) + public void createTopicFailure1() throws ExecutionException, InterruptedException { + String name = "testtopic1", partitions = "1", replicationFactor = "1", environment = "localhost"; + when(getAdminClient.getAdminClient(any())).thenReturn(null); + + + manageKafkaComponents.createTopic(name, partitions, replicationFactor, + environment); } + + @Test(expected = NumberFormatException.class) + public void createTopicFailure2() throws ExecutionException, InterruptedException { + String name = "testtopic1", partitions = "1aa", replicationFactor = "1aa", environment = "localhost"; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + + manageKafkaComponents.createTopic(name, partitions, replicationFactor, + environment); + } + + @Test(expected = RuntimeException.class) + public void createTopicFailure4() throws ExecutionException, InterruptedException { + String name = "testtopic1", partitions = "1", replicationFactor = "1", environment = "localhost"; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.createTopics(any())).thenThrow(new RuntimeException("Runtime exption")); + + manageKafkaComponents.createTopic(name, partitions, replicationFactor, + environment); + } + + @Test + public void createProducerAcl1() { + String topicName = "testtopic", environment = "localhost", + acl_ip = "110.11.21.112"; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.createAcls(any())).thenReturn(createAclsResult); + + String result = manageKafkaComponents.createProducerAcl(topicName, environment, + acl_ip, null); + assertEquals("success", result); + } + + @Test + public void createProducerAcl2() { + String topicName = "testtopic", environment = "localhost", + acl_ssl = "CN=host,OU=..."; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.createAcls(any())).thenReturn(createAclsResult); + + String result = manageKafkaComponents.createProducerAcl(topicName, environment, + null, acl_ssl); + assertEquals("success", result); + } + + @Test + public void createConsumerAcl1() { + String topicName = "testtopic", environment = "localhost", + acl_ip = "110.11.21.112", consumerGroup="congroup1"; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.createAcls(any())).thenReturn(createAclsResult); + + String result = manageKafkaComponents.createConsumerAcl(topicName, environment, + acl_ip, null, consumerGroup); + assertEquals("success", result); + } + + @Test + public void createConsumerAcl2() { + String topicName = "testtopic", environment = "localhost", + acl_ssl = "CN=host,OU=...", consumerGroup="congroup1"; + when(getAdminClient.getAdminClient(any())).thenReturn(adminClient); + when(adminClient.createAcls(any())).thenReturn(createAclsResult); + + String result = manageKafkaComponents.createConsumerAcl(topicName, environment, + null, acl_ssl, consumerGroup); + assertEquals("success", result); + } + + @Test + public void postSchema1() { + String topicName="testtopic1", schema="{type:string}", environmentVal="localhost"; + ResponseEntity response = new ResponseEntity<>("Schema created id : 101", + HttpStatus.OK); + + when(env.getProperty(environmentVal +".schemaregistry.url")) + .thenReturn("http://localhost:8081"); + when(getAdminClient.getRestTemplate()).thenReturn(restTemplate); + when(restTemplate.postForEntity + (anyString(), any(), + eq(String.class))) + .thenReturn(response); + + String result = manageKafkaComponents.postSchema(topicName, schema, environmentVal); + assertEquals("Schema created id : 101", result); + } + + @Test + public void postSchema2() { + String topicName="testtopic1", schema="{type:string}", environmentVal="localhost"; + when(env.getProperty(environmentVal +".schemaregistry.url")).thenReturn(null); + + String result = manageKafkaComponents.postSchema(topicName, schema, environmentVal); + assertEquals("Cannot retrieve SchemaRegistry Url", result); + } + + private Map getTopicDescs(){ + Node node = new Node(1,"localhost",1); + + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(2, node, + Arrays.asList(node), Arrays.asList(node)); + TopicDescription tDesc = new TopicDescription("testtopic", true, + Arrays.asList(topicPartitionInfo, topicPartitionInfo)); + Map mapResults = new HashMap<>(); + mapResults.put("testtopic1",tDesc); + + tDesc = new TopicDescription("testtopic2", true, + Arrays.asList(topicPartitionInfo, topicPartitionInfo)); + mapResults.put("testtopic2",tDesc); + + return mapResults; + } + + } \ No newline at end of file diff --git a/src/test/java/com/kafkamgt/clusterapi/utils/GetAdminClientTest.java b/src/test/java/com/kafkamgt/clusterapi/utils/GetAdminClientTest.java new file mode 100644 index 0000000..4e2a08e --- /dev/null +++ b/src/test/java/com/kafkamgt/clusterapi/utils/GetAdminClientTest.java @@ -0,0 +1,88 @@ +package com.kafkamgt.clusterapi.utils; + +import org.apache.kafka.clients.admin.AdminClient; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.BDDMockito; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.core.env.Environment; +import org.springframework.test.util.ReflectionTestUtils; + +import java.util.Properties; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(AdminClient.class) +public class GetAdminClientTest { + + @Mock + Environment env; + + @Mock + AdminClient adminClient; + + AdminClientUtils getAdminClient; + + @Before + public void setUp() throws Exception { + getAdminClient = new AdminClientUtils(); + } + + @Test + public void getAdminClient1() { + ReflectionTestUtils.setField(getAdminClient, "env", env); + mockStatic(AdminClient.class); + + when(env.getProperty(any())).thenReturn(null); + BDDMockito.given(AdminClient.create(any(Properties.class))).willReturn(adminClient); + + AdminClient result = getAdminClient.getAdminClient("localhost:9092"); + assertNotNull(result); + } + + @Test + public void getAdminClient2() { + ReflectionTestUtils.setField(getAdminClient, "env", env); + mockStatic(AdminClient.class); + + when(env.getProperty(any())).thenReturn("true"); + BDDMockito.given(AdminClient.create(any(Properties.class))).willReturn(adminClient); + + AdminClient result = getAdminClient.getAdminClient("localhost:9092"); + assertNotNull(result); + } + + @Test + public void getAdminClient3() { + ReflectionTestUtils.setField(getAdminClient, "env", env); + mockStatic(AdminClient.class); + + when(env.getProperty(any())).thenReturn("false"); + BDDMockito.given(AdminClient.create(any(Properties.class))).willReturn(adminClient); + + AdminClient result = getAdminClient.getAdminClient("localhost:9092"); + assertNotNull(result); + } + + @Test + public void getPlainProperties() { + Properties props = getAdminClient.getPlainProperties("localhost"); + assertEquals("localhost", props.getProperty("bootstrap.servers")); + } + + @Test + public void getSslProperties() { + ReflectionTestUtils.setField(getAdminClient, "env", env); + when(env.getProperty(any())).thenReturn("somevalue"); + + Properties props = getAdminClient.getSslProperties("localhost:9092"); + assertEquals("localhost:somevalue", props.getProperty("bootstrap.servers")); + } +} \ No newline at end of file