From 8134a0685584e5f924866d1eb2e590f7a51b1321 Mon Sep 17 00:00:00 2001 From: Marc LAMY Date: Wed, 10 Apr 2024 23:35:46 +0200 Subject: [PATCH 1/2] make the topic field optional in the streaming KafkaOutput --- .../pipes/kafka/streaming/KafkaOutput.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala index b10be45..71cfb5c 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala @@ -22,7 +22,7 @@ import scala.util.Try */ case class KafkaOutput( brokers: String, - topic: String, + topic: Option[String], processingTimeTrigger: Trigger, timeout: Long, mode: String, @@ -46,14 +46,15 @@ case class KafkaOutput( val streamWriter = data.writeStream .queryName(queryName) .format("kafka") - .option("kafka.bootstrap.servers", brokers) - .option("topic", topic) .options(options) + .option("kafka.bootstrap.servers", brokers) .outputMode(mode) - - val streamingQuery = streamWriter .trigger(processingTimeTrigger) - .start() + + val streamingQuery = topic match { + case Some(t) => streamWriter.option("topic", t).start() + case _ => streamWriter.start() + } streamingQuery.awaitTermination(timeout) streamingQuery.stop() @@ -87,10 +88,7 @@ object KafkaOutput { */ def apply(implicit config: Config): KafkaOutput = { val brokers = getBroker - val topic = getTopic match { - case Some(topicToUse) if topicToUse.nonEmpty => topicToUse - case _ => throw new IllegalArgumentException("No topic specified for Kafka source") - } + val topic = getTopic val duration = Duration(config.getString("Duration")) val processingTimeTrigger = Trigger.ProcessingTime(duration) From 00db9649265c9ac7797b392433ece7c1bb0e607e Mon Sep 17 00:00:00 2001 From: Marc LAMY Date: Thu, 11 Apr 2024 00:07:26 +0200 Subject: [PATCH 2/2] update the streaming KafkaOutput createQueryName function and tests --- .../pipes/kafka/streaming/KafkaOutput.scala | 8 +- .../kafka/streaming/KafkaOutputTest.scala | 100 +++++++----------- 2 files changed, 46 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala index 71cfb5c..33e8aaf 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala @@ -67,9 +67,11 @@ case class KafkaOutput( */ private[streaming] def createQueryName(): String = { - outputName match { - case Some(name) => s"QN_${name}_${topic}_${java.util.UUID.randomUUID}" - case _ => s"QN_${topic}_${java.util.UUID.randomUUID}" + (outputName, topic) match { + case (Some(name), Some(t)) => s"QN_${name}_${t}_${java.util.UUID.randomUUID}" + case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}" + case (None, Some(t)) => s"QN_KafkaOutput_${t}_${java.util.UUID.randomUUID}" + case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}" } } diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala index 045e5fe..04c35ae 100644 --- a/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala @@ -28,7 +28,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.outputName shouldEqual Some("my-test-kafka-output") kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" - kafkaStreamOutput.topic shouldEqual "test.topic" + kafkaStreamOutput.topic shouldEqual Some("test.topic") } "be initialized with all optional properties" in { @@ -56,7 +56,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.outputName shouldEqual None kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" - kafkaStreamOutput.topic shouldEqual "test.topic" + kafkaStreamOutput.topic shouldEqual Some("test.topic") kafkaStreamOutput.options shouldEqual Map( "failOnDataLoss" -> "false", "maxOffsetsPerTrigger" -> "20000000", @@ -65,60 +65,6 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { ) } - "throw an exception given missing topic but pattern" in { - - val config = ConfigFactory.parseMap( - Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput", - "Name" -> "my-test-kafka", - "Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000", - "Pattern" -> "test.pattern", - "Mode" -> "append", - "Duration" -> "60 seconds", - "Timeout" -> "24", - "Options" -> Map( - "failOnDataLoss" -> "false", - "maxOffsetsPerTrigger" -> "20000000", - "\"kafka.security.protocol\"" -> "SASL_PLAINTEXT", - "\"kafka.sasl.kerberos.service.name\"" -> "kafka" - ) - ) - ) - ) - - intercept[IllegalArgumentException] { - KafkaOutput(config.getConfig("Output")) - } - } - - "throw an exception given missing topic but assign" in { - - val config = ConfigFactory.parseMap( - Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput", - "Name" -> "my-test-kafka", - "Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000", - "Assign" -> "test.assign", - "Mode" -> "append", - "Duration" -> "60 seconds", - "Timeout" -> "24", - "Options" -> Map( - "failOnDataLoss" -> "false", - "maxOffsetsPerTrigger" -> "20000000", - "\"kafka.security.protocol\"" -> "SASL_PLAINTEXT", - "\"kafka.sasl.kerberos.service.name\"" -> "kafka" - ) - ) - ) - ) - - intercept[IllegalArgumentException] { - KafkaOutput(config.getConfig("Output")) - } - } - "throw an exception given missing brokers" in { val config = ConfigFactory.parseMap( @@ -155,7 +101,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { val kafkaOutput = KafkaOutput( brokers = "bktv001:9000, bktv002.amadeus.net:8000", - topic = "test.topic", + topic = Some("test.topic"), processingTimeTrigger = Trigger.Once(), timeout = 0, mode = "append", @@ -164,7 +110,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { val queryName = kafkaOutput.createQueryName() - queryName should fullyMatch regex "^QN_test.topic_" + uuidPattern + "$" + queryName should fullyMatch regex "^QN_KafkaOutput_test.topic_" + uuidPattern + "$" } @@ -173,7 +119,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { val kafkaOutput = KafkaOutput( brokers = "bktv001:9000, bktv002.amadeus.net:8000", - topic = "test.topic", + topic = Some("test.topic"), processingTimeTrigger = Trigger.Once(), timeout = 0, mode = "append", @@ -185,5 +131,41 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { queryName should fullyMatch regex "^QN_myTestOutput_test.topic_" + uuidPattern + "$" } + + "return a query name based on output name" in { + + val kafkaOutput = + KafkaOutput( + brokers = "bktv001:9000, bktv002.amadeus.net:8000", + topic = None, + processingTimeTrigger = Trigger.Once(), + timeout = 0, + mode = "append", + outputName = Some("myTestOutput") + ) + + val queryName = kafkaOutput.createQueryName() + + queryName should fullyMatch regex "^QN_myTestOutput_" + uuidPattern + "$" + + } + + "return a query name based without name or topic" in { + + val kafkaOutput = + KafkaOutput( + brokers = "bktv001:9000, bktv002.amadeus.net:8000", + topic = None, + processingTimeTrigger = Trigger.Once(), + timeout = 0, + mode = "append", + outputName = None + ) + + val queryName = kafkaOutput.createQueryName() + + queryName should fullyMatch regex "^QN_KafkaOutput_" + uuidPattern + "$" + + } } }