Skip to content

Commit

Permalink
Merge pull request #4 from muralibasani/improvements_3.2
Browse files Browse the repository at this point in the history
Improvements 3.2 - SSL connectivity
  • Loading branch information
muralibasani authored Sep 25, 2019
2 parents 3c11875 + 9d47afc commit b54fb61
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.kafkamgt.clusterapi</groupId>
<artifactId>kafkawizeclusterapi</artifactId>
<version>3.1</version>
<version>3.2</version>
<packaging>jar</packaging>

<name>kafkaclusterapi</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public ResponseEntity<String> createTopics(@RequestBody MultiValueMap<String, St
topicRequest.get("topicName").get(0),
topicRequest.get("partitions").get(0),
topicRequest.get("rf").get(0),
topicRequest.get("env").get(0),
topicRequest.get("acl_ip").get(0),
topicRequest.get("acl_ssl").get(0)
topicRequest.get("env").get(0)
);

return new ResponseEntity<String>("success", HttpStatus.OK);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.kafkamgt.clusterapi.services;

import com.kafkamgt.clusterapi.utils.GetAdminClient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.acl.*;
Expand All @@ -8,7 +9,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
Expand All @@ -27,12 +27,13 @@ public class ManageKafkaComponents {
@Autowired
Environment env;

@Autowired
GetAdminClient getAdminClient;

public Set<HashMap<String,String>> loadAcls(String environment){
Set<HashMap<String,String>> acls = new HashSet<>();
Properties props = new Properties();
props.put("bootstrap.servers",environment);

AdminClient client = AdminClient.create(props);
AdminClient client = getAdminClient.getAdminClient(environment);

AclBindingFilter aclBindingFilter = AclBindingFilter.ANY;
DescribeAclsResult s = client.describeAcls(aclBindingFilter);
Expand Down Expand Up @@ -64,10 +65,7 @@ public Set<HashMap<String,String>> loadAcls(String environment){
}

public Set<String> loadTopics(String environment){
Properties props = new Properties();
props.put("bootstrap.servers",environment);

AdminClient client = AdminClient.create(props);
AdminClient client = getAdminClient.getAdminClient(environment);
ListTopicsResult topicsResult = client.listTopics();
Set<String> topics = new HashSet<>();
try {
Expand Down Expand Up @@ -95,17 +93,13 @@ public Set<String> loadTopics(String environment){

}

public String createTopic(String name, String partitions, String replicationFactor, String environment, String acl_ip, String acl_ssl) {
public String createTopic(String name, String partitions, String replicationFactor, String environment) {

LOG.info(name + "--"+partitions + "--"+replicationFactor + "--" + environment);
Properties props = new Properties();
//props.put("bootstrap.servers",env.getProperty(environment+BOOTSTRAP_SERVERS));
props.put("bootstrap.servers",environment);
try (AdminClient client = AdminClient.create(props)) {
LOG.info(name + "--"+partitions + "--"+replicationFactor + "--" + environment);

NewTopic topic = null;
try (AdminClient client = getAdminClient.getAdminClient(environment)) {

topic = new NewTopic(name, Integer.parseInt(partitions), Short.parseShort(replicationFactor));
NewTopic topic = new NewTopic(name, Integer.parseInt(partitions), Short.parseShort(replicationFactor));

CreateTopicsResult result = client.createTopics(Collections.singletonList(topic));
result.values().get(name).get();
Expand All @@ -129,19 +123,16 @@ public String createTopic(String name, String partitions, String replicationFact
LOG.error("Unable to create topic {}", name, e);
}

createProducerAcl(name,environment,acl_ip,acl_ssl);
//createProducerAcl(name,environment,acl_ip,acl_ssl);
return "success";

}

public String createProducerAcl(String topicName, String environment, String acl_ip, String acl_ssl) {

Properties props = new Properties();
LOG.info("In producer alcs::"+acl_ip +"--"+ acl_ssl);
//props.put("bootstrap.servers",env.getProperty(environment+BOOTSTRAP_SERVERS));
props.put("bootstrap.servers",environment);

try (AdminClient client = AdminClient.create(props)) {
try (AdminClient client = getAdminClient.getAdminClient(environment)) {
List<AclBinding> aclListArray = new ArrayList<AclBinding>();
String host = null, principal=null;
if(acl_ssl!=null && acl_ssl.trim().length()>0){
Expand Down Expand Up @@ -201,11 +192,7 @@ public String createProducerAcl(String topicName, String environment, String acl

public String createConsumerAcl(String topicName, String environment, String acl_ip, String acl_ssl, String consumerGroup) {

Properties props = new Properties();
//props.put("bootstrap.servers",env.getProperty(environment+BOOTSTRAP_SERVERS));
props.put("bootstrap.servers",environment);

try (AdminClient client = AdminClient.create(props)) {
try (AdminClient client = getAdminClient.getAdminClient(environment)) {
List<AclBinding> aclListArray = new ArrayList<AclBinding>();
String host = null, principal=null;

Expand Down
58 changes: 58 additions & 0 deletions src/main/java/com/kafkamgt/clusterapi/utils/GetAdminClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.kafkamgt.clusterapi.utils;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import java.util.Properties;

@Service
public class GetAdminClient {

@Autowired
Environment env;

public AdminClient getAdminClient(String envHost){

String envOnlyHost = envHost.substring(0,envHost.indexOf(":"));
String ssl_acl_enabled = env.getProperty(envOnlyHost+".connect_with_ssl_kafkacluster");
if(ssl_acl_enabled==null)
return AdminClient.create(getPlainProperties(envHost));
else if(ssl_acl_enabled!=null && ssl_acl_enabled.equals("true"))
return AdminClient.create(getSslProperties(envHost));
else
return AdminClient.create(getPlainProperties(envHost));
}

public Properties getPlainProperties(String environment){
Properties props = new Properties();

props.put("bootstrap.servers",environment);

return props;
}

public Properties getSslProperties(String environment){
Properties props = new Properties();

String envOnlyHost = environment.substring(0,environment.indexOf(":"));
String bootStrapServer = envOnlyHost +":" +env.getProperty("kafkassl."+envOnlyHost+".port");
props.put("bootstrap.servers",bootStrapServer);
props.put("ssl.truststore.location",env.getProperty("kafkassl."+envOnlyHost+".truststore.location"));
props.put("ssl.truststore.password",env.getProperty("kafkassl."+envOnlyHost+".truststore.pwd"));
props.put("ssl.keystore.location",env.getProperty("kafkassl."+envOnlyHost+".keystore.location"));
props.put("ssl.keystore.password",env.getProperty("kafkassl."+envOnlyHost+".keystore.pwd"));
props.put("ssl.key.password",env.getProperty("kafkassl."+envOnlyHost+".key.pwd"));
props.put("ssl.keystore.type","JKS");
props.put("ssl.truststore.type","JKS");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");
props.put(AdminClientConfig.CLIENT_ID_CONFIG,"testclient");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,"1000000");

return props;
}
}

This file was deleted.

15 changes: 15 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
server.port:9343

# kafka cluster env for DEV/localhost , add more for other envs
localhost.connect_with_ssl_kafkacluster=false

# SSL properties for DEV/localhost
kafkassl.localhost.port=9093
kafkassl.localhost.keystore.location=/path/server.keystore.jks
kafkassl.localhost.keystore.pwd=password
kafkassl.localhost.key.pwd=password
kafkassl.localhost.truststore.location=/path/server.truststore.jks
kafkassl.localhost.truststore.pwd=password

# SSL properties for any other env/tst


# These are redundant for now.
securityconfig.user1.username=user1
securityconfig.user1.pwd=pwd
securityconfig.user1.role=USER
Expand Down

0 comments on commit b54fb61

Please sign in to comment.