Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the topic field optional in the streaming KafkaOutput #5

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
*/
case class KafkaOutput(
brokers: String,
topic: String,
topic: Option[String],
processingTimeTrigger: Trigger,
timeout: Long,
mode: String,
Expand All @@ -46,14 +46,15 @@ case class KafkaOutput(
val streamWriter = data.writeStream
.queryName(queryName)
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", topic)
.options(options)
.option("kafka.bootstrap.servers", brokers)
.outputMode(mode)

val streamingQuery = streamWriter
.trigger(processingTimeTrigger)
.start()

val streamingQuery = topic match {
case Some(t) => streamWriter.option("topic", t).start()
case _ => streamWriter.start()
}

streamingQuery.awaitTermination(timeout)
streamingQuery.stop()
Expand All @@ -66,9 +67,11 @@ case class KafkaOutput(
*/
private[streaming] def createQueryName(): String = {

outputName match {
case Some(name) => s"QN_${name}_${topic}_${java.util.UUID.randomUUID}"
case _ => s"QN_${topic}_${java.util.UUID.randomUUID}"
(outputName, topic) match {
case (Some(name), Some(t)) => s"QN_${name}_${t}_${java.util.UUID.randomUUID}"
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
case (None, Some(t)) => s"QN_KafkaOutput_${t}_${java.util.UUID.randomUUID}"
case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}"
}

}
Expand All @@ -87,10 +90,7 @@ object KafkaOutput {
*/
def apply(implicit config: Config): KafkaOutput = {
val brokers = getBroker
val topic = getTopic match {
case Some(topicToUse) if topicToUse.nonEmpty => topicToUse
case _ => throw new IllegalArgumentException("No topic specified for Kafka source")
}
val topic = getTopic

val duration = Duration(config.getString("Duration"))
val processingTimeTrigger = Trigger.ProcessingTime(duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual Some("my-test-kafka-output")
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
}

"be initialized with all optional properties" in {
Expand Down Expand Up @@ -56,7 +56,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual None
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
kafkaStreamOutput.options shouldEqual Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
Expand All @@ -65,60 +65,6 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
)
}

"throw an exception given missing topic but pattern" in {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput",
"Name" -> "my-test-kafka",
"Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000",
"Pattern" -> "test.pattern",
"Mode" -> "append",
"Duration" -> "60 seconds",
"Timeout" -> "24",
"Options" -> Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
"\"kafka.security.protocol\"" -> "SASL_PLAINTEXT",
"\"kafka.sasl.kerberos.service.name\"" -> "kafka"
)
)
)
)

intercept[IllegalArgumentException] {
KafkaOutput(config.getConfig("Output"))
}
}

"throw an exception given missing topic but assign" in {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput",
"Name" -> "my-test-kafka",
"Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000",
"Assign" -> "test.assign",
"Mode" -> "append",
"Duration" -> "60 seconds",
"Timeout" -> "24",
"Options" -> Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
"\"kafka.security.protocol\"" -> "SASL_PLAINTEXT",
"\"kafka.sasl.kerberos.service.name\"" -> "kafka"
)
)
)
)

intercept[IllegalArgumentException] {
KafkaOutput(config.getConfig("Output"))
}
}

"throw an exception given missing brokers" in {

val config = ConfigFactory.parseMap(
Expand Down Expand Up @@ -155,7 +101,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = "test.topic",
topic = Some("test.topic"),
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
Expand All @@ -164,7 +110,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_test.topic_" + uuidPattern + "$"
queryName should fullyMatch regex "^QN_KafkaOutput_test.topic_" + uuidPattern + "$"

}

Expand All @@ -173,7 +119,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = "test.topic",
topic = Some("test.topic"),
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
Expand All @@ -185,5 +131,41 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
queryName should fullyMatch regex "^QN_myTestOutput_test.topic_" + uuidPattern + "$"

}

"return a query name based on output name" in {

val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = None,
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
outputName = Some("myTestOutput")
)

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_myTestOutput_" + uuidPattern + "$"

}

"return a query name based without name or topic" in {

val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = None,
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
outputName = None
)

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_KafkaOutput_" + uuidPattern + "$"

}
}
}
Loading