diff --git a/pom.xml b/pom.xml index 89ef34d..e96d3fb 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.kafkamgt.clusterapi kafkawizeclusterapi - 3.1 + 3.2 jar kafkaclusterapi diff --git a/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java b/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java index 7d1782d..6d54468 100644 --- a/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java +++ b/src/main/java/com/kafkamgt/clusterapi/controller/ClusterApiController.java @@ -40,9 +40,7 @@ public ResponseEntity createTopics(@RequestBody MultiValueMap("success", HttpStatus.OK); diff --git a/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java b/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java index 6395c70..b679301 100644 --- a/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java +++ b/src/main/java/com/kafkamgt/clusterapi/services/ManageKafkaComponents.java @@ -1,5 +1,6 @@ package com.kafkamgt.clusterapi.services; +import com.kafkamgt.clusterapi.utils.GetAdminClient; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.acl.*; @@ -8,7 +9,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; @@ -27,12 +27,13 @@ public class ManageKafkaComponents { @Autowired Environment env; + @Autowired + GetAdminClient getAdminClient; + public Set> loadAcls(String environment){ Set> acls = new HashSet<>(); - Properties props = new Properties(); - props.put("bootstrap.servers",environment); - AdminClient client = AdminClient.create(props); + AdminClient client = getAdminClient.getAdminClient(environment); AclBindingFilter aclBindingFilter = AclBindingFilter.ANY; DescribeAclsResult s = client.describeAcls(aclBindingFilter); @@ -64,10 +65,7 @@ public Set> loadAcls(String environment){ } public Set loadTopics(String environment){ - Properties props = new Properties(); - props.put("bootstrap.servers",environment); - - AdminClient client = AdminClient.create(props); + AdminClient client = getAdminClient.getAdminClient(environment); ListTopicsResult topicsResult = client.listTopics(); Set topics = new HashSet<>(); try { @@ -95,17 +93,13 @@ public Set loadTopics(String environment){ } - public String createTopic(String name, String partitions, String replicationFactor, String environment, String acl_ip, String acl_ssl) { + public String createTopic(String name, String partitions, String replicationFactor, String environment) { - LOG.info(name + "--"+partitions + "--"+replicationFactor + "--" + environment); - Properties props = new Properties(); - //props.put("bootstrap.servers",env.getProperty(environment+BOOTSTRAP_SERVERS)); - props.put("bootstrap.servers",environment); - try (AdminClient client = AdminClient.create(props)) { + LOG.info(name + "--"+partitions + "--"+replicationFactor + "--" + environment); - NewTopic topic = null; + try (AdminClient client = getAdminClient.getAdminClient(environment)) { - topic = new NewTopic(name, Integer.parseInt(partitions), Short.parseShort(replicationFactor)); + NewTopic topic = new NewTopic(name, Integer.parseInt(partitions), Short.parseShort(replicationFactor)); CreateTopicsResult result = client.createTopics(Collections.singletonList(topic)); result.values().get(name).get(); @@ -129,19 +123,16 @@ public String createTopic(String name, String partitions, String replicationFact LOG.error("Unable to create topic {}", name, e); } - createProducerAcl(name,environment,acl_ip,acl_ssl); + //createProducerAcl(name,environment,acl_ip,acl_ssl); return "success"; } public String createProducerAcl(String topicName, String environment, String acl_ip, String acl_ssl) { - Properties props = new Properties(); LOG.info("In producer alcs::"+acl_ip +"--"+ acl_ssl); - //props.put("bootstrap.servers",env.getProperty(environment+BOOTSTRAP_SERVERS)); - props.put("bootstrap.servers",environment); - try (AdminClient client = AdminClient.create(props)) { + try (AdminClient client = getAdminClient.getAdminClient(environment)) { List aclListArray = new ArrayList(); String host = null, principal=null; if(acl_ssl!=null && acl_ssl.trim().length()>0){ @@ -201,11 +192,7 @@ public String createProducerAcl(String topicName, String environment, String acl public String createConsumerAcl(String topicName, String environment, String acl_ip, String acl_ssl, String consumerGroup) { - Properties props = new Properties(); - //props.put("bootstrap.servers",env.getProperty(environment+BOOTSTRAP_SERVERS)); - props.put("bootstrap.servers",environment); - - try (AdminClient client = AdminClient.create(props)) { + try (AdminClient client = getAdminClient.getAdminClient(environment)) { List aclListArray = new ArrayList(); String host = null, principal=null; diff --git a/src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java b/src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java new file mode 100644 index 0000000..d1d108a --- /dev/null +++ b/src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java @@ -0,0 +1,58 @@ +package com.kafkamgt.clusterapi.utils; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import java.util.Properties; + +@Service +public class GetAdminClient { + + @Autowired + Environment env; + + public AdminClient getAdminClient(String envHost){ + + String envOnlyHost = envHost.substring(0,envHost.indexOf(":")); + String ssl_acl_enabled = env.getProperty(envOnlyHost+".connect_with_ssl_kafkacluster"); + if(ssl_acl_enabled==null) + return AdminClient.create(getPlainProperties(envHost)); + else if(ssl_acl_enabled!=null && ssl_acl_enabled.equals("true")) + return AdminClient.create(getSslProperties(envHost)); + else + return AdminClient.create(getPlainProperties(envHost)); + } + + public Properties getPlainProperties(String environment){ + Properties props = new Properties(); + + props.put("bootstrap.servers",environment); + + return props; + } + + public Properties getSslProperties(String environment){ + Properties props = new Properties(); + + String envOnlyHost = environment.substring(0,environment.indexOf(":")); + String bootStrapServer = envOnlyHost +":" +env.getProperty("kafkassl."+envOnlyHost+".port"); + props.put("bootstrap.servers",bootStrapServer); + props.put("ssl.truststore.location",env.getProperty("kafkassl."+envOnlyHost+".truststore.location")); + props.put("ssl.truststore.password",env.getProperty("kafkassl."+envOnlyHost+".truststore.pwd")); + props.put("ssl.keystore.location",env.getProperty("kafkassl."+envOnlyHost+".keystore.location")); + props.put("ssl.keystore.password",env.getProperty("kafkassl."+envOnlyHost+".keystore.pwd")); + props.put("ssl.key.password",env.getProperty("kafkassl."+envOnlyHost+".key.pwd")); + props.put("ssl.keystore.type","JKS"); + props.put("ssl.truststore.type","JKS"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL"); + props.put(AdminClientConfig.CLIENT_ID_CONFIG,"testclient"); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,"1000000"); + + return props; + } +} diff --git a/src/main/java/com/kafkamgt/clusterapi/utils/LoadKafkaProperties.java b/src/main/java/com/kafkamgt/clusterapi/utils/LoadKafkaProperties.java deleted file mode 100644 index dc2ace1..0000000 --- a/src/main/java/com/kafkamgt/clusterapi/utils/LoadKafkaProperties.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.kafkamgt.clusterapi.utils; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.springframework.stereotype.Component; - -import java.util.Properties; - -@Component -public class LoadKafkaProperties { - public Properties getPlainProperties(String environment){ - Properties props = new Properties(); - - props.put("bootstrap.servers",environment); - - return props; - } - - public Properties getSslProperties(String environment){ - Properties props = new Properties(); - - props.put("bootstrap.servers","localhost:9093"); - props.put("ssl.truststore.location","C:/Software/certs_kafka_client/kafka.truststore.jks"); - props.put("ssl.truststore.password","pwd"); - props.put("ssl.keystore.location","C:/Software/certs_kafka_client/kafka.keystore.jks"); - props.put("ssl.keystore.password","pwd"); - props.put("ssl.keystore.type","JKS"); - props.put("ssl.truststore.type","JKS"); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL"); - props.put(AdminClientConfig.CLIENT_ID_CONFIG,"testclient"); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,"1000000"); - - return props; - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5af78a3..e52366d 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,20 @@ server.port:9343 +# kafka cluster env for DEV/localhost , add more for other envs +localhost.connect_with_ssl_kafkacluster=false + +# SSL properties for DEV/localhost +kafkassl.localhost.port=9093 +kafkassl.localhost.keystore.location=/path/server.keystore.jks +kafkassl.localhost.keystore.pwd=password +kafkassl.localhost.key.pwd=password +kafkassl.localhost.truststore.location=/path/server.truststore.jks +kafkassl.localhost.truststore.pwd=password + +# SSL properties for any other env/tst + + +# These are redundant for now. securityconfig.user1.username=user1 securityconfig.user1.pwd=pwd securityconfig.user1.role=USER