Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka version #886

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ RUNNING_PID
.DS_Store
*.log
*.swp
.metals
.vscode
12 changes: 12 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
LogkafkaNewConfigs.configMaps(Kafka_3_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_3_1_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_3_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_3_1_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_3_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_3_2_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_3_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_3_2_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_3_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)

val defaultCreateForm = Form(
mapping(
Expand Down Expand Up @@ -199,6 +205,9 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext)
case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext)
case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext)
case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext)
case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext)
case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_2_1_Default), clusterContext)
}
}
}
Expand Down Expand Up @@ -317,6 +326,9 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_8_1 => LogkafkaNewConfigs.configNames(Kafka_2_8_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_3_0_0 => LogkafkaNewConfigs.configNames(Kafka_3_0_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_3_1_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_3_1_1 => LogkafkaNewConfigs.configNames(Kafka_3_1_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_3_2_0 => LogkafkaNewConfigs.configNames(Kafka_3_2_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_3_2_1 => LogkafkaNewConfigs.configNames(Kafka_3_2_1).map(n => (n,LKConfig(n,None))).toMap
}
val identityOption = li.identityMap.get(log_path)
if (identityOption.isDefined) {
Expand Down
9 changes: 9 additions & 0 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
val kafka_2_8_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_3_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_3_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_3_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_3_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_3_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)

val defaultCreateForm = Form(
mapping(
Expand Down Expand Up @@ -197,6 +200,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext)
case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext)
case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext)
case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext)
case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext)
case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_2_1_Default), clusterContext)
}
}
}
Expand Down Expand Up @@ -461,6 +467,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_8_1 => TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_3_0_0 => TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_3_1_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_3_1_1 => TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_3_2_0 => TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_3_2_1 => TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
}
val updatedConfigMap = ti.config.toMap
val updatedConfigList = defaultConfigs.map {
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0)
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_1_1, Kafka_3_2_0, Kafka_3_2_1)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
Expand Down
17 changes: 16 additions & 1 deletion app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ case object Kafka_3_1_0 extends KafkaVersion {
override def toString = "3.1.0"
}

case object Kafka_3_1_1 extends KafkaVersion {
override def toString = "3.1.1"
}

case object Kafka_3_2_0 extends KafkaVersion {
override def toString = "3.2.0"
}

case object Kafka_3_2_1 extends KafkaVersion {
override def toString = "3.2.1"
}

object KafkaVersion {
val supportedVersions: Map[String,KafkaVersion] = Map(
"0.8.1.1" -> Kafka_0_8_1_1,
Expand Down Expand Up @@ -194,7 +206,10 @@ object KafkaVersion {
"2.8.0" -> Kafka_2_8_0,
"2.8.1" -> Kafka_2_8_1,
"3.0.0" -> Kafka_3_0_0,
"3.1.0" -> Kafka_3_1_0
"3.1.0" -> Kafka_3_1_0,
"3.1.1" -> Kafka_3_1_1,
"3.2.0" -> Kafka_3_2_0,
"3.2.1" -> Kafka_3_2_1
)

val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1))
Expand Down
5 changes: 4 additions & 1 deletion app/kafka/manager/utils/LogkafkaNewConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ object LogkafkaNewConfigs {
Kafka_2_8_0 -> logkafka82.LogConfig,
Kafka_2_8_1 -> logkafka82.LogConfig,
Kafka_3_0_0 -> logkafka82.LogConfig,
Kafka_3_1_0 -> logkafka82.LogConfig
Kafka_3_1_0 -> logkafka82.LogConfig,
Kafka_3_1_1 -> logkafka82.LogConfig,
Kafka_3_2_0 -> logkafka82.LogConfig,
Kafka_3_2_1 -> logkafka82.LogConfig
)

def configNames(version: KafkaVersion) : Set[String] = {
Expand Down
5 changes: 4 additions & 1 deletion app/kafka/manager/utils/TopicConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ object TopicConfigs {
Kafka_2_8_0 -> two40.LogConfig,
Kafka_2_8_1 -> two40.LogConfig,
Kafka_3_0_0 -> two40.LogConfig,
Kafka_3_1_0 -> two40.LogConfig
Kafka_3_1_0 -> two40.LogConfig,
Kafka_3_1_1 -> two40.LogConfig,
Kafka_3_2_0 -> two40.LogConfig,
Kafka_3_2_1 -> two40.LogConfig
)

def configNames(version: KafkaVersion): Seq[String] = {
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
name := """cmak"""

/* For packaging purposes, -SNAPSHOT MUST contain a digit */
version := "3.0.0.6"
version := "3.0.0.6-1"

scalaVersion := "2.12.10"

Expand Down
5 changes: 4 additions & 1 deletion sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ set -o pipefail
declare -r sbt_release_version="1.3.8"
declare -r sbt_unreleased_version="1.3.8"

declare -r latest_31="3.1.0"
declare -r latest_32="3.2.1"
declare -r latest_31="3.1.1"
declare -r latest_30="3.0.3"
declare -r latest_213="2.13.8"
declare -r latest_212="2.12.15"
Expand Down Expand Up @@ -400,6 +401,7 @@ are not special.
-213 use $latest_213
-30 use $latest_30
-31 use $latest_31
-31 use $latest_32
-scala-home <path> use the scala build at the specified directory
-scala-version <version> use the specified version of scala
-binary-version <version> use the specified scala version when searching for dependencies
Expand Down Expand Up @@ -475,6 +477,7 @@ process_args() {
-213) setScalaVersion "$latest_213" && shift ;;
-30) setScalaVersion "$latest_30" && shift ;;
-31) setScalaVersion "$latest_31" && shift ;;
-32) setScalaVersion "$latest_32" && shift ;;

-scala-version) require_arg version "$1" "$2" && setScalaVersion "$2" && shift 2 ;;
-binary-version) require_arg version "$1" "$2" && setThisBuild scalaBinaryVersion "\"$2\"" && shift 2 ;;
Expand Down
14 changes: 7 additions & 7 deletions test/kafka/manager/TestKafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("add cluster") {
val cc = ClusterConfig("dev","3.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val cc = ClusterConfig("dev","3.2.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult =>
result.result.get
Thread.sleep(1000)
Expand All @@ -80,7 +80,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster zkhost") {
val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand Down Expand Up @@ -112,7 +112,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster version") {
val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand All @@ -139,7 +139,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
println(result)
result.msg.contains("dev")
}
val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(1000)
Expand All @@ -156,7 +156,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster logkafka enabled") {
val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand All @@ -168,7 +168,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {

test("update cluster tuning") {
val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1)
val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false,
val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false,
tuning = Option(newTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None
)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
Expand All @@ -185,7 +185,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster security protocol") {
val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand Down
10 changes: 8 additions & 2 deletions test/kafka/manager/model/KafkaVersionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ class KafkaVersionTest extends FunSuite {
"2.8.0" -> Kafka_2_8_0,
"2.8.1" -> Kafka_2_8_1,
"3.0.0" -> Kafka_3_0_0,
"3.1.0" -> Kafka_3_1_0
"3.1.0" -> Kafka_3_1_0,
"3.1.1" -> Kafka_3_1_1,
"3.2.0" -> Kafka_3_2_0,
"3.2.1" -> Kafka_3_2_1
)

test("apply method: supported version.") {
Expand Down Expand Up @@ -103,7 +106,10 @@ class KafkaVersionTest extends FunSuite {
("2.8.0","2.8.0"),
("2.8.1","2.8.1"),
("3.0.0","3.0.0"),
("3.1.0","3.1.0")
("3.1.0","3.1.0"),
("3.1.1","3.1.1"),
("3.2.0","3.2.0"),
("3.2.1","3.2.1")
)
assertResult(expected)(KafkaVersion.formSelectList)
}
Expand Down
24 changes: 24 additions & 0 deletions test/kafka/manager/utils/TestClusterConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,4 +341,28 @@ class TestClusterConfig extends FunSuite with Matchers {
assert(cc == deserialize.get)
}

test("serialize and deserialize 3.1.1") {
val cc = ClusterConfig("qa", "3.1.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

test("serialize and deserialize 3.2.0") {
val cc = ClusterConfig("qa", "3.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

test("serialize and deserialize 3.2.1") {
val cc = ClusterConfig("qa", "3.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

}