Skip to content

Commit

Permalink
[SPARK-26371][SS] Increase kafka ConfigUpdater test coverage.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

As Kafka delegation token added logic into ConfigUpdater it would be good to test it.
This PR contains the following changes:
* ConfigUpdater extracted to a separate file and renamed to KafkaConfigUpdater
* mockito-core dependency added to kafka-0-10-sql
* Unit tests added

## How was this patch tested?

Existing + new unit tests + on cluster.

Closes apache#23321 from gaborgsomogyi/SPARK-26371.

Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
gaborgsomogyi authored and dongjoon-hyun committed Dec 17, 2018
1 parent 6d45e6e commit 5a116e6
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 93 deletions.
5 changes: 5 additions & 0 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import scala.collection.JavaConverters._

import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka

/**
* Class to conveniently update Kafka config params, while logging the changes
*/
private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

def setIfUnset(key: String, value: Object): this.type = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
}
this
}

def setAuthenticationConfigIfNeeded(): this.type = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}

def build(): ju.Map[String, Object] = map
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,9 @@ import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -483,7 +479,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
}

def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): ju.Map[String, Object] =
ConfigUpdater("source", specifiedKafkaParams)
KafkaConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)

Expand All @@ -506,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
def kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, Object] =
ConfigUpdater("executor", specifiedKafkaParams)
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)

Expand Down Expand Up @@ -537,48 +533,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
}

/** Class to conveniently update Kafka config params, while logging the changes */
private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

def setIfUnset(key: String, value: Object): ConfigUpdater = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
}
this
}

def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}

def build(): ju.Map[String, Object] = map
}

private[kafka010] def kafkaParamsForProducer(
parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
Expand All @@ -596,7 +550,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {

val specifiedKafkaParams = convertToSpecifiedParams(parameters)

ConfigUpdater("executor", specifiedKafkaParams)
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
.setAuthenticationConfigIfNeeded()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config._

class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest {
private val testModule = "testModule"
private val testKey = "testKey"
private val testValue = "testValue"
private val otherTestValue = "otherTestValue"

test("set should always set value") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.set(testKey, testValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setIfUnset without existing key should set value") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.setIfUnset(testKey, testValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setIfUnset with existing key should not set value") {
val params = Map[String, String](testKey -> testValue)

val updatedParams = KafkaConfigUpdater(testModule, params)
.setIfUnset(testKey, otherTestValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setAuthenticationConfigIfNeeded with global security should not set values") {
val params = Map.empty[String, String]
setGlobalKafkaClientConfig()

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 0)
}

test("setAuthenticationConfigIfNeeded with token should set values") {
val params = Map.empty[String, String]
setSparkEnv(Map.empty)
addTokenToUGI()

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 2)
assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) ===
Kafka.TOKEN_SASL_MECHANISM.defaultValueString)
}

test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") {
val params = Map.empty[String, String]
setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID"))
addTokenToUGI()

val e = intercept[IllegalArgumentException] {
KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()
}

assert(e.getMessage.contains("Delegation token works only with SCRAM mechanism."))
}

test("setAuthenticationConfigIfNeeded without security should not set values") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 0)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}
import javax.security.auth.login.{AppConfigurationEntry, Configuration}

import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.mockito.Mockito.{doReturn, mock}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier

/**
* This is a trait which provides functionalities for Kafka delegation token related test suites.
*/
trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
self: SparkFunSuite =>

protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString
protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString

private class KafkaJaasConfiguration extends Configuration {
val entry =
new AppConfigurationEntry(
"DummyModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
ju.Collections.emptyMap[String, Object]()
)

override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
if (name.equals("KafkaClient")) {
Array(entry)
} else {
null
}
}
}

override def afterEach(): Unit = {
try {
Configuration.setConfiguration(null)
UserGroupInformation.setLoginUser(null)
SparkEnv.set(null)
} finally {
super.afterEach()
}
}

protected def setGlobalKafkaClientConfig(): Unit = {
Configuration.setConfiguration(new KafkaJaasConfiguration)
}

protected def addTokenToUGI(): Unit = {
val token = new Token[KafkaDelegationTokenIdentifier](
tokenId.getBytes,
tokenPassword.getBytes,
KafkaTokenUtil.TOKEN_KIND,
KafkaTokenUtil.TOKEN_SERVICE
)
val creds = new Credentials()
creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
val conf = new SparkConf().setAll(settings)
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
}
}
Loading

0 comments on commit 5a116e6

Please sign in to comment.