diff --git a/.gitignore b/.gitignore index bf996f21..4980384d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ RUNNING_PID .DS_Store *.log *.swp +.metals +.vscode \ No newline at end of file diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index cd755d17..f93305f5 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -125,6 +125,12 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_3_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_3_1_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_3_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_1_1_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_2_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_2_1_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -199,6 +205,9 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext) case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext) case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext) + case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext) + case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_2_1_Default), clusterContext) } } } @@ -317,6 +326,9 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_8_1 => LogkafkaNewConfigs.configNames(Kafka_2_8_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_0_0 => LogkafkaNewConfigs.configNames(Kafka_3_0_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_1_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_1_1 => LogkafkaNewConfigs.configNames(Kafka_3_1_1).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_2_0 => LogkafkaNewConfigs.configNames(Kafka_3_2_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_2_1 => LogkafkaNewConfigs.configNames(Kafka_3_2_1).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index de5e4f62..ab925b59 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -81,6 +81,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager val kafka_2_8_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val defaultCreateForm = Form( mapping( @@ -197,6 +200,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext) case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext) case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext) + case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext) + case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_2_1_Default), clusterContext) } } } @@ -461,6 +467,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_8_1 => TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_0_0 => TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_1_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_1_1 => TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_2_0 => TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_2_1 => TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } } val updatedConfigMap = ti.config.toMap val updatedConfigList = defaultConfigs.map { diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 95f614a0..682cc9f2 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_1_1, Kafka_3_2_0, Kafka_3_2_1) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 538da39f..585dfd0e 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -156,6 +156,18 @@ case object Kafka_3_1_0 extends KafkaVersion { override def toString = "3.1.0" } +case object Kafka_3_1_1 extends KafkaVersion { + override def toString = "3.1.1" +} + +case object Kafka_3_2_0 extends KafkaVersion { + override def toString = "3.2.0" +} + +case object Kafka_3_2_1 extends KafkaVersion { + override def toString = "3.2.1" +} + object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, @@ -194,7 +206,10 @@ object KafkaVersion { "2.8.0" -> Kafka_2_8_0, "2.8.1" -> Kafka_2_8_1, "3.0.0" -> Kafka_3_0_0, - "3.1.0" -> Kafka_3_1_0 + "3.1.0" -> Kafka_3_1_0, + "3.1.1" -> Kafka_3_1_1, + "3.2.0" -> Kafka_3_2_0, + "3.2.1" -> Kafka_3_2_1 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 29b57bd5..f6547042 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -53,7 +53,10 @@ object LogkafkaNewConfigs { Kafka_2_8_0 -> logkafka82.LogConfig, Kafka_2_8_1 -> logkafka82.LogConfig, Kafka_3_0_0 -> logkafka82.LogConfig, - Kafka_3_1_0 -> logkafka82.LogConfig + Kafka_3_1_0 -> logkafka82.LogConfig, + Kafka_3_1_1 -> logkafka82.LogConfig, + Kafka_3_2_0 -> logkafka82.LogConfig, + Kafka_3_2_1 -> logkafka82.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index 183c4e8e..f1b89db0 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -56,7 +56,10 @@ object TopicConfigs { Kafka_2_8_0 -> two40.LogConfig, Kafka_2_8_1 -> two40.LogConfig, Kafka_3_0_0 -> two40.LogConfig, - Kafka_3_1_0 -> two40.LogConfig + Kafka_3_1_0 -> two40.LogConfig, + Kafka_3_1_1 -> two40.LogConfig, + Kafka_3_2_0 -> two40.LogConfig, + Kafka_3_2_1 -> two40.LogConfig ) def configNames(version: KafkaVersion): Seq[String] = { diff --git a/build.sbt b/build.sbt index a9a33b0b..4612ff9e 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ name := """cmak""" /* For packaging purposes, -SNAPSHOT MUST contain a digit */ -version := "3.0.0.6" +version := "3.0.0.6-1" scalaVersion := "2.12.10" diff --git a/sbt b/sbt index 60b4daf8..b91f3b7e 100755 --- a/sbt +++ b/sbt @@ -9,7 +9,8 @@ set -o pipefail declare -r sbt_release_version="1.3.8" declare -r sbt_unreleased_version="1.3.8" -declare -r latest_31="3.1.0" +declare -r latest_32="3.2.1" +declare -r latest_31="3.1.1" declare -r latest_30="3.0.3" declare -r latest_213="2.13.8" declare -r latest_212="2.12.15" @@ -400,6 +401,7 @@ are not special. -213 use $latest_213 -30 use $latest_30 -31 use $latest_31 + -31 use $latest_32 -scala-home use the scala build at the specified directory -scala-version use the specified version of scala -binary-version use the specified scala version when searching for dependencies @@ -475,6 +477,7 @@ process_args() { -213) setScalaVersion "$latest_213" && shift ;; -30) setScalaVersion "$latest_30" && shift ;; -31) setScalaVersion "$latest_31" && shift ;; + -32) setScalaVersion "$latest_32" && shift ;; -scala-version) require_arg version "$1" "$2" && setScalaVersion "$2" && shift 2 ;; -binary-version) require_arg version "$1" "$2" && setThisBuild scalaBinaryVersion "\"$2\"" && shift 2 ;; diff --git a/test/kafka/manager/TestKafkaManagerActor.scala b/test/kafka/manager/TestKafkaManagerActor.scala index f82ad92e..a03498e0 100644 --- a/test/kafka/manager/TestKafkaManagerActor.scala +++ b/test/kafka/manager/TestKafkaManagerActor.scala @@ -69,7 +69,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("add cluster") { - val cc = ClusterConfig("dev","3.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc = ClusterConfig("dev","3.2.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -80,7 +80,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -112,7 +112,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -139,7 +139,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { println(result) result.msg.contains("dev") } - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -156,7 +156,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -168,7 +168,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { test("update cluster tuning") { val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1) - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(newTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None ) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => @@ -185,7 +185,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster security protocol") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) diff --git a/test/kafka/manager/model/KafkaVersionTest.scala b/test/kafka/manager/model/KafkaVersionTest.scala index dc1a5fd1..38c649f0 100644 --- a/test/kafka/manager/model/KafkaVersionTest.scala +++ b/test/kafka/manager/model/KafkaVersionTest.scala @@ -48,7 +48,10 @@ class KafkaVersionTest extends FunSuite { "2.8.0" -> Kafka_2_8_0, "2.8.1" -> Kafka_2_8_1, "3.0.0" -> Kafka_3_0_0, - "3.1.0" -> Kafka_3_1_0 + "3.1.0" -> Kafka_3_1_0, + "3.1.1" -> Kafka_3_1_1, + "3.2.0" -> Kafka_3_2_0, + "3.2.1" -> Kafka_3_2_1 ) test("apply method: supported version.") { @@ -103,7 +106,10 @@ class KafkaVersionTest extends FunSuite { ("2.8.0","2.8.0"), ("2.8.1","2.8.1"), ("3.0.0","3.0.0"), - ("3.1.0","3.1.0") + ("3.1.0","3.1.0"), + ("3.1.1","3.1.1"), + ("3.2.0","3.2.0"), + ("3.2.1","3.2.1") ) assertResult(expected)(KafkaVersion.formSelectList) } diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 0109b0ee..cc83043b 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -341,4 +341,28 @@ class TestClusterConfig extends FunSuite with Matchers { assert(cc == deserialize.get) } + test("serialize and deserialize 3.1.1") { + val cc = ClusterConfig("qa", "3.1.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + + test("serialize and deserialize 3.2.0") { + val cc = ClusterConfig("qa", "3.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + + test("serialize and deserialize 3.2.1") { + val cc = ClusterConfig("qa", "3.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + }