From b98f78bcf365b2be7cb8415acde0410e248e6819 Mon Sep 17 00:00:00 2001 From: amitco1 Date: Tue, 16 Apr 2019 16:36:26 +0300 Subject: [PATCH 01/10] wip(multitopic_writing): add kafka multitopic reading --- .../configuration/job/input/Kafka.scala | 8 ++-- .../input/readers/kafka/KafkaInput.scala | 47 ++++++++++++++++--- .../input/readers/kafka/KafkaLagWriter.scala | 2 +- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala index b83ba4b40..779052e58 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala @@ -5,14 +5,14 @@ import com.yotpo.metorikku.input.Reader import com.yotpo.metorikku.input.readers.kafka.KafkaInput case class Kafka(servers: Seq[String], - topic: String, + topic: Option[String], + topicPattern: Option[String], consumerGroup: Option[String], options: Option[Map[String, String]], schemaRegistryUrl: Option[String], schemaSubject: Option[String] ) extends InputConfig { - require(Option(servers).isDefined, "Servers Must be Defined") - require(Option(topic).isDefined, "Topic must be defined") + require(topic.isDefined && !topicPattern.isDefined || !topic.isDefined && topicPattern.isDefined, "Exactly one of (topic, topicPattern) must be defined") - override def getReader(name: String): Reader = KafkaInput(name, servers, topic, consumerGroup, options, schemaRegistryUrl, schemaSubject) + override def getReader(name: String): Reader = KafkaInput(name, servers, topic, topicPattern, consumerGroup, options, schemaRegistryUrl, schemaSubject) } diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala index dcea70c4a..aba26d42b 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala @@ -8,24 +8,47 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.spark.sql.{DataFrame, SparkSession} -case class KafkaInput(name: String, servers: Seq[String], topic: String, consumerGroup: Option[String], +case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], topicPattern: Option[String], consumerGroup: Option[String], options: Option[Map[String, String]], schemaRegistryUrl: Option[String], schemaSubject: Option[String]) extends Reader { @transient lazy val log = org.apache.log4j.LogManager.getLogger(this.getClass) + def read(sparkSession: SparkSession): DataFrame = { consumerGroup match { case Some(group) => log.info(s"creating consumer group with id $group") val consumer = createKafkaConsumer(group) - val lagWriter = new KafkaLagWriter(consumer, topic) - sparkSession.streams.addListener(lagWriter) + topic match { + case Some(regular_topic) => + val lagWriter = new KafkaLagWriter(consumer, regular_topic) + sparkSession.streams.addListener(lagWriter) + case _ => + } + topicPattern match { + case Some(regex_topic) => + val lagWriter = new KafkaLagWriter(consumer, regex_topic) + sparkSession.streams.addListener(lagWriter) + case _ => + } case _ => } val bootstrapServers = servers.mkString(",") val inputStream = sparkSession.readStream.format("kafka") - .option("kafka.bootstrap.servers", bootstrapServers) - .option("subscribe", topic) + topic match { + case Some(regular_topic) => + inputStream + .option("kafka.bootstrap.servers", bootstrapServers) + .option("subscribe", regular_topic) + case _ => + } + topicPattern match { + case Some(regex_topic) => + inputStream + .option("kafka.bootstrap.servers", bootstrapServers) + .option("subscribePattern", regex_topic) + case _ => + } if (options.nonEmpty) { inputStream.options(options.get) @@ -34,8 +57,18 @@ case class KafkaInput(name: String, servers: Seq[String], topic: String, consume val kafkaDataFrame = inputStream.load() schemaRegistryUrl match { case Some(url) => { - val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, topic, schemaSubject) - schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) + topic match { + case Some(regular_topic) => + val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, regular_topic, schemaSubject) + schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) + case _ => kafkaDataFrame + } + topicPattern match { + case Some(regex_topic) => + val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, regex_topic, schemaSubject) + schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) + case _ => kafkaDataFrame + } } case None => kafkaDataFrame } diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaLagWriter.scala b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaLagWriter.scala index 62f8742ed..2f84ab731 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaLagWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaLagWriter.scala @@ -28,7 +28,7 @@ class KafkaLagWriter(kafkaConsumer: KafkaConsumer[String, String], topic: String om.registerModule(DefaultScalaModule) event.progress.sources.foreach(source => { val jsonOffsets = om.readValue(source.endOffset, classOf[Map[String, Map[String, Long]]]) - jsonOffsets.keys.filter(key => key == topic) + jsonOffsets.keys .foreach(topic => { log.debug(s"committing offsets for topic $topic") val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]() From bc93ca69accb71007668e7d8cffa9cc7ca6814c5 Mon Sep 17 00:00:00 2001 From: amitco1 Date: Sun, 21 Apr 2019 17:01:52 +0300 Subject: [PATCH 02/10] wip(multitopic_writing) --- .../configuration/job/input/Kafka.scala | 1 + .../input/readers/kafka/KafkaInput.scala | 18 ++++-------------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala index 779052e58..3ef41b21d 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/input/Kafka.scala @@ -12,6 +12,7 @@ case class Kafka(servers: Seq[String], schemaRegistryUrl: Option[String], schemaSubject: Option[String] ) extends InputConfig { + require(Option(servers).isDefined, "Servers Must be Defined") require(topic.isDefined && !topicPattern.isDefined || !topic.isDefined && topicPattern.isDefined, "Exactly one of (topic, topicPattern) must be defined") override def getReader(name: String): Reader = KafkaInput(name, servers, topic, topicPattern, consumerGroup, options, schemaRegistryUrl, schemaSubject) diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala index aba26d42b..2cec25c60 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], topicPattern: Option[String], consumerGroup: Option[String], - options: Option[Map[String, String]], schemaRegistryUrl: Option[String], schemaSubject: Option[String]) extends Reader { + options: Option[Map[String, String]], schemaRegistryUrl: Option[String], schemaSubject: Option[String]) extends Reader { @transient lazy val log = org.apache.log4j.LogManager.getLogger(this.getClass) @@ -57,22 +57,12 @@ case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], val kafkaDataFrame = inputStream.load() schemaRegistryUrl match { case Some(url) => { - topic match { - case Some(regular_topic) => - val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, regular_topic, schemaSubject) - schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) - case _ => kafkaDataFrame - } - topicPattern match { - case Some(regex_topic) => - val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, regex_topic, schemaSubject) - schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) - case _ => kafkaDataFrame - } + val amit = topic.getOrElse(topicPattern.getOrElse("")) + val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, amit, schemaSubject) + schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) } case None => kafkaDataFrame } - } private def createKafkaConsumer(consumerGroupID: String) = { From 9739b9b56702c737e7e913ef277a9c2363f1dd3a Mon Sep 17 00:00:00 2001 From: amitco1 Date: Sun, 21 Apr 2019 17:24:04 +0300 Subject: [PATCH 03/10] feat(multitopic_writing): add topicPattern, read topics from kafka by regex --- .../com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala index 2cec25c60..80692899e 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala @@ -57,8 +57,8 @@ case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], val kafkaDataFrame = inputStream.load() schemaRegistryUrl match { case Some(url) => { - val amit = topic.getOrElse(topicPattern.getOrElse("")) - val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, amit, schemaSubject) + val chosen_topic = topic.getOrElse(topicPattern.getOrElse("")) + val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, chosen_topic, schemaSubject) schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) } case None => kafkaDataFrame From 2c8a382e669cc8d6097d2ec7ca989c1e57587efa Mon Sep 17 00:00:00 2001 From: amitco1 Date: Sun, 21 Apr 2019 17:35:04 +0300 Subject: [PATCH 04/10] feat(multitopic_writing): fix chosen_topic --- .../input/readers/kafka/KafkaInput.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala index 80692899e..2e721a9a0 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala @@ -18,18 +18,9 @@ case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], case Some(group) => log.info(s"creating consumer group with id $group") val consumer = createKafkaConsumer(group) - topic match { - case Some(regular_topic) => - val lagWriter = new KafkaLagWriter(consumer, regular_topic) - sparkSession.streams.addListener(lagWriter) - case _ => - } - topicPattern match { - case Some(regex_topic) => - val lagWriter = new KafkaLagWriter(consumer, regex_topic) - sparkSession.streams.addListener(lagWriter) - case _ => - } + val chosen_topic = topic.getOrElse(topicPattern.getOrElse("")) + val lagWriter = new KafkaLagWriter(consumer, chosen_topic) + sparkSession.streams.addListener(lagWriter) case _ => } From dd458fbd7cd08c3ae86b53bc8dc4bda92080f30a Mon Sep 17 00:00:00 2001 From: amitco1 Date: Mon, 22 Apr 2019 10:41:29 +0300 Subject: [PATCH 05/10] feat(multitopic_writing): SchemaRegistryDeserializer send regular topic --- .../metorikku/input/readers/kafka/KafkaInput.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala index 2e721a9a0..34ceddb6d 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala @@ -26,18 +26,15 @@ case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], val bootstrapServers = servers.mkString(",") val inputStream = sparkSession.readStream.format("kafka") + .option("kafka.bootstrap.servers", bootstrapServers) topic match { case Some(regular_topic) => - inputStream - .option("kafka.bootstrap.servers", bootstrapServers) - .option("subscribe", regular_topic) + inputStream.option("subscribe", regular_topic) case _ => } topicPattern match { case Some(regex_topic) => - inputStream - .option("kafka.bootstrap.servers", bootstrapServers) - .option("subscribePattern", regex_topic) + inputStream.option("subscribePattern", regex_topic) case _ => } @@ -48,8 +45,7 @@ case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], val kafkaDataFrame = inputStream.load() schemaRegistryUrl match { case Some(url) => { - val chosen_topic = topic.getOrElse(topicPattern.getOrElse("")) - val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, chosen_topic, schemaSubject) + val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, topic.getOrElse(""), schemaSubject) schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame) } case None => kafkaDataFrame From 63bf4eded9a84e86e24e595c59f5f1f335cc303d Mon Sep 17 00:00:00 2001 From: amitco1 Date: Mon, 29 Apr 2019 16:15:30 +0300 Subject: [PATCH 06/10] feat(multitopic_writing): wip --- .../metorikku/code/steps/Watermark.scala | 20 ++++++++++ .../InstrumentationOutputWriter.scala | 37 ++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/yotpo/metorikku/code/steps/Watermark.scala diff --git a/src/main/scala/com/yotpo/metorikku/code/steps/Watermark.scala b/src/main/scala/com/yotpo/metorikku/code/steps/Watermark.scala new file mode 100644 index 000000000..43f38f894 --- /dev/null +++ b/src/main/scala/com/yotpo/metorikku/code/steps/Watermark.scala @@ -0,0 +1,20 @@ +package com.yotpo.metorikku.code.steps + +import com.yotpo.metorikku.exceptions.MetorikkuException + +object Watermark { + val message = "You need to send 3 parameters: table, eventTime, delayThreshold" + + def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = { + params match { + case Some(paramaters) => { + val table = paramaters.get("table").get + val eventTime = paramaters.get("eventTime").get + val delayThreshold = paramaters.get("delayThreshold").get + + ss.table(table).withWatermark(eventTime, delayThreshold).createOrReplaceTempView(dataFrameName) + } + case None => throw MetorikkuException(message) + } + } +} diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala index 33ac064a2..b7a9c39f0 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala @@ -4,6 +4,7 @@ import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException import com.yotpo.metorikku.instrumentation.InstrumentationFactory import com.yotpo.metorikku.output.Writer import org.apache.log4j.{LogManager, Logger} +import org.apache.spark.sql.types.StructField import org.apache.spark.sql.{DataFrame, Row} @@ -13,12 +14,18 @@ class InstrumentationOutputWriter(props: Map[String, String], instrumentationFactory: InstrumentationFactory) extends Writer { @transient lazy val log: Logger = LogManager.getLogger(this.getClass) - val keyColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("keyColumn") + // val keyColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("keyColumn") + val valueColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("valueColumn") val timeColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("timeColumn") override def write(dataFrame: DataFrame): Unit = { val columns = dataFrame.schema.fields.zipWithIndex - val indexOfKeyCol = keyColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) + // val valueColumn or last + // val indexOfKeyCol = keyColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) + if (columns.length == 2){ + val indexOfValCol = 1 + } + val indexOfValCol = valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) val indexOfTimeCol = timeColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) log.info(s"Starting to write Instrumentation of data frame: $dataFrameName on metric: $metricName") @@ -26,6 +33,32 @@ class InstrumentationOutputWriter(props: Map[String, String], val client = instrumentationFactory.create() p.foreach(row => { + val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ + columns.filter{ + case (column, index) => index != indexOfValCol + }.map{ + case (column, index) => + column.name -> row.get(index).asInstanceOf[AnyVal].toString + }.toMap + + + val valueOfValueColumn = row.get(indexOfValCol.getOrElse(null)) + val longValue = valueOfValueColumn.asInstanceOf[Number].longValue() + val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ keyColumnTags + + if (valueOfRowAtCurrentCol != null && classOf[Number].isAssignableFrom(valueOfRowAtCurrentCol.getClass)) { + val longValue = valueOfRowAtCurrentCol.asInstanceOf[Number].longValue() + val keyColumnTags = getTagsForKeyColumn(indexOfKeyCol, row) + val time = getTime(indexOfTimeCol, row) + val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ keyColumnTags + + client.gauge(name = column.name, value = longValue, tags = tags, time = time) + } else { + throw MetorikkuWriteFailedException("Value column doesn't contain a number") + } + + + for ((column, i) <- columns) { try { // Don't write key/time column to metric From e7c8a0283c402a693316d6b27508aaa27dfcf3ae Mon Sep 17 00:00:00 2001 From: amitco1 Date: Wed, 1 May 2019 14:29:33 +0300 Subject: [PATCH 07/10] feat(multitopic_writing): add watermark, add instrumentation writing with tags --- .../InstrumentationOutputWriter.scala | 84 +++++-------------- 1 file changed, 21 insertions(+), 63 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala index b7a9c39f0..463cc3eb3 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala @@ -4,7 +4,6 @@ import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException import com.yotpo.metorikku.instrumentation.InstrumentationFactory import com.yotpo.metorikku.output.Writer import org.apache.log4j.{LogManager, Logger} -import org.apache.spark.sql.types.StructField import org.apache.spark.sql.{DataFrame, Row} @@ -14,17 +13,11 @@ class InstrumentationOutputWriter(props: Map[String, String], instrumentationFactory: InstrumentationFactory) extends Writer { @transient lazy val log: Logger = LogManager.getLogger(this.getClass) - // val keyColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("keyColumn") val valueColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("valueColumn") val timeColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("timeColumn") override def write(dataFrame: DataFrame): Unit = { val columns = dataFrame.schema.fields.zipWithIndex - // val valueColumn or last - // val indexOfKeyCol = keyColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) - if (columns.length == 2){ - val indexOfValCol = 1 - } val indexOfValCol = valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) val indexOfTimeCol = timeColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col))) @@ -32,73 +25,38 @@ class InstrumentationOutputWriter(props: Map[String, String], dataFrame.foreachPartition(p => { val client = instrumentationFactory.create() - p.foreach(row => { - val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ - columns.filter{ - case (column, index) => index != indexOfValCol - }.map{ - case (column, index) => - column.name -> row.get(index).asInstanceOf[AnyVal].toString - }.toMap + val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1) + p.foreach(row => { + try { - val valueOfValueColumn = row.get(indexOfValCol.getOrElse(null)) - val longValue = valueOfValueColumn.asInstanceOf[Number].longValue() - val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ keyColumnTags + val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ + columns.filter { + case (column, index) => index != actualIndexOfValCol || (!row.isNullAt(indexOfTimeCol.get) && index != indexOfValCol.get) + }.map { + case (column, index) => + column.name -> row.get(index).asInstanceOf[AnyVal].toString + }.toMap - if (valueOfRowAtCurrentCol != null && classOf[Number].isAssignableFrom(valueOfRowAtCurrentCol.getClass)) { - val longValue = valueOfRowAtCurrentCol.asInstanceOf[Number].longValue() - val keyColumnTags = getTagsForKeyColumn(indexOfKeyCol, row) val time = getTime(indexOfTimeCol, row) - val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ keyColumnTags - - client.gauge(name = column.name, value = longValue, tags = tags, time = time) - } else { - throw MetorikkuWriteFailedException("Value column doesn't contain a number") - } - - - - for ((column, i) <- columns) { - try { - // Don't write key/time column to metric - if ((!indexOfKeyCol.isDefined || i != indexOfKeyCol.get) && (!indexOfTimeCol.isDefined || i != indexOfTimeCol.get)) { - val valueOfRowAtCurrentCol = row.get(i) - // Only if value is numeric - if (valueOfRowAtCurrentCol != null && classOf[Number].isAssignableFrom(valueOfRowAtCurrentCol.getClass)) { - val longValue = valueOfRowAtCurrentCol.asInstanceOf[Number].longValue() - val keyColumnTags = getTagsForKeyColumn(indexOfKeyCol, row) - val time = getTime(indexOfTimeCol, row) - val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ keyColumnTags + val metricValue = row.get(actualIndexOfValCol) - client.gauge(name = column.name, value = longValue, tags = tags, time = time) - } else { - throw MetorikkuWriteFailedException("Value column doesn't contain a number") - } - } - } catch { - case ex: Throwable => - throw MetorikkuWriteFailedException(s"failed to write instrumentation on data frame: $dataFrameName " + - s"for row: ${row.toString()} on column: ${column.name}", ex) + if (metricValue != null && classOf[Number].isAssignableFrom(metricValue.getClass)) { + val longValue = metricValue.asInstanceOf[Number].longValue() + val valueColumnName = row.schema.fieldNames(actualIndexOfValCol) + client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time) } + + }catch { + case ex: Throwable => + throw MetorikkuWriteFailedException(s"failed to write instrumentation on data frame: $dataFrameName " + + s"for row: ${row.toString()}", ex) } }) - client.close() }) } - def getTagsForKeyColumn(indexOfKeyCol: Option[Int], row: Row): Map[String, String] = { - if (indexOfKeyCol.isDefined) { - if (!row.isNullAt(indexOfKeyCol.get)) { - return Map(keyColumnProperty.get -> row.get(indexOfKeyCol.get).asInstanceOf[AnyVal].toString) - } else { - throw MetorikkuWriteFailedException("Defined key column is null for row") - } - } - Map() - } - def getTime(indexOfTimeCol: Option[Int], row: Row): Long = { if (indexOfTimeCol.isDefined) { if (!row.isNullAt(indexOfTimeCol.get)) { @@ -113,4 +71,4 @@ class InstrumentationOutputWriter(props: Map[String, String], } System.currentTimeMillis() } -} +} \ No newline at end of file From e64b702f9ce8ed13d96c3e483f31e0225a8efbcd Mon Sep 17 00:00:00 2001 From: amitco1 Date: Wed, 1 May 2019 16:45:48 +0300 Subject: [PATCH 08/10] feat(multitopic_writing): add watermark to README. fix filtering in instrumentationOutputWriter --- README.md | 13 +++++++++++++ config/metric_config_sample.yaml | 7 ++++--- .../InstrumentationOutputWriter.scala | 6 ++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 2eedb5e97..6b9fac5ff 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,7 @@ Metorikku sends automatically on top of what spark is already sending the follow * In streaming: number of processed records in batch You can also send any information you like to the instrumentation output within a metric. +by default the last column of the schema will be the field value. all other columns will be tags Check out the [example](examples/movies_metric.yaml) for further details. @@ -306,6 +307,18 @@ Check out the built-in code steps [here](src/main/scala/com/yotpo/metorikku/code *NOTE: If you added some dependencies to your custom JAR build.sbt you have to either use [sbt-assembly](https://github.com/sbt/sbt-assembly) to add them to the JAR or you can use the ```--packages``` when running the spark-submit command* +#### Watermark +Metorikku supports Watermark method which helps a stream processing engine to deal with latenes. +You can use watermarking by adding a new udf step in your metric: +```yaml +- dataFrameName: dataframe + classpath: com.yotpo.metorikku.code.steps.Watermark + params: + table: my_table + eventTime: event + delayThreshold: 2 hours +``` + #### Apache Hive metastore Metorikku supports reading and saving tables with Apache hive metastore. To enable hive support via spark-submit (assuming you're using MySQL as Hive's DB but any backend can work) send the following configurations: diff --git a/config/metric_config_sample.yaml b/config/metric_config_sample.yaml index 9b60f020b..080d1c080 100644 --- a/config/metric_config_sample.yaml +++ b/config/metric_config_sample.yaml @@ -65,8 +65,9 @@ output: - dataFrameName: table1 outputType: Instrumentation outputOptions: - # This will be used to specify a column that will be treated as a string and be used as a key - keyColumn: col1 + # This will be used to specify a column that will be treated as a string and be used as a value. + # by default valuecolumn will be the last column of the schema. + valueColumn: col1 # When using influx instrumentation, use the time column in the table to set the time (by default it's now) timeColumn: col3 - dataFrameName: table1 @@ -98,7 +99,7 @@ output: keyColumn: col1 # This column will be sent as the payload of the kafka message valueColumn: col2 - # In streaming mode set to append/replace/complete default is append + # In streaming mode set to append/update/complete default is append outputMode: append # In streaming mode set the trigger type (Once/Processing time) triggerType: Once diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala index 463cc3eb3..7dbfccfc8 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala @@ -25,14 +25,16 @@ class InstrumentationOutputWriter(props: Map[String, String], dataFrame.foreachPartition(p => { val client = instrumentationFactory.create() + // use last column if valueColumn is missing val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1) + val actualIndexOfTimeCol = indexOfTimeCol.getOrElse(columns.length) p.foreach(row => { try { val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ columns.filter { - case (column, index) => index != actualIndexOfValCol || (!row.isNullAt(indexOfTimeCol.get) && index != indexOfValCol.get) + case (column, index) => index != actualIndexOfValCol && index != actualIndexOfTimeCol }.map { case (column, index) => column.name -> row.get(index).asInstanceOf[AnyVal].toString @@ -71,4 +73,4 @@ class InstrumentationOutputWriter(props: Map[String, String], } System.currentTimeMillis() } -} \ No newline at end of file +} From 5ad1bdf7d5827f5af38e61c1f10ae22ec5dd415e Mon Sep 17 00:00:00 2001 From: amitco1 Date: Thu, 2 May 2019 10:51:02 +0300 Subject: [PATCH 09/10] feat(multitopic_writing): fix README, fix TimeCol treatment --- README.md | 9 +++++++-- examples/influxdb/movies_metric.yaml | 2 +- .../instrumentation/InstrumentationOutputWriter.scala | 3 +-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6b9fac5ff..b6c49d04a 100644 --- a/README.md +++ b/README.md @@ -247,7 +247,9 @@ Metorikku sends automatically on top of what spark is already sending the follow * In streaming: number of processed records in batch You can also send any information you like to the instrumentation output within a metric. -by default the last column of the schema will be the field value. all other columns will be tags +by default the last column of the schema will be the field value. +Other columns that are not value or time columns will be merged together as the name of the metric. +If writing directly to influxDB these will become tags. Check out the [example](examples/movies_metric.yaml) for further details. @@ -308,13 +310,16 @@ Check out the built-in code steps [here](src/main/scala/com/yotpo/metorikku/code *NOTE: If you added some dependencies to your custom JAR build.sbt you have to either use [sbt-assembly](https://github.com/sbt/sbt-assembly) to add them to the JAR or you can use the ```--packages``` when running the spark-submit command* #### Watermark -Metorikku supports Watermark method which helps a stream processing engine to deal with latenes. +Metorikku supports Watermark method which helps a stream processing engine to deal with late data. You can use watermarking by adding a new udf step in your metric: ```yaml +# This will become the new watermarked dataframe name. - dataFrameName: dataframe classpath: com.yotpo.metorikku.code.steps.Watermark params: + # Watermark table my_table table: my_table + # The column representing the event time (needs to be a TIMESTAMP or DATE column) eventTime: event delayThreshold: 2 hours ``` diff --git a/examples/influxdb/movies_metric.yaml b/examples/influxdb/movies_metric.yaml index 7236f2ad3..ef64e45a0 100644 --- a/examples/influxdb/movies_metric.yaml +++ b/examples/influxdb/movies_metric.yaml @@ -29,4 +29,4 @@ output: outputType: Instrumentation outputOptions: timeColumn: month_ts - keyColumn: genre \ No newline at end of file + valueColumn: number_of_ratings \ No newline at end of file diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala index 7dbfccfc8..8bb23b111 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/instrumentation/InstrumentationOutputWriter.scala @@ -27,14 +27,13 @@ class InstrumentationOutputWriter(props: Map[String, String], // use last column if valueColumn is missing val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1) - val actualIndexOfTimeCol = indexOfTimeCol.getOrElse(columns.length) p.foreach(row => { try { val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ columns.filter { - case (column, index) => index != actualIndexOfValCol && index != actualIndexOfTimeCol + case (column, index) => index != actualIndexOfValCol && (!indexOfTimeCol.isDefined || index != indexOfTimeCol.get) }.map { case (column, index) => column.name -> row.get(index).asInstanceOf[AnyVal].toString From b577338b7abc32c89f8006cc591b18d4ab760f9e Mon Sep 17 00:00:00 2001 From: amitco1 Date: Thu, 2 May 2019 11:45:37 +0300 Subject: [PATCH 10/10] feat(multitopic_writing): add topic pattern description, fix watermark description --- README.md | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index b6c49d04a..875b12ef0 100644 --- a/README.md +++ b/README.md @@ -229,6 +229,37 @@ This will commit the offsets to kafka, as a new dummy consumer group. * If your subject schema name is not ```-value``` (e.g. if the topic is a regex pattern) you can specify the schema subject in the ```schemaSubject``` section +###### Topic Pattern +Kafka input also allows reading messages from multiple topics by using subscribe pattern: +```yaml +inputs: + testStream: + kafka: + servers: + - 127.0.0.1:9092 + # topicPattern can be any Java regex string + topicPattern: my_topics_regex.* + consumerGroup: testConsumerGroupID # optional + schemaRegistryUrl: https://schema-registry-url # optional + schemaSubject: subject # optional +``` +* While using topicPattern, consider using ```schemaRegistryUrl``` and ```schemaSubject``` in case your topics have different schemas. + +##### Watermark +Metorikku supports Watermark method which helps a stream processing engine to deal with late data. +You can use watermarking by adding a new udf step in your metric: +```yaml +# This will become the new watermarked dataframe name. +- dataFrameName: dataframe + classpath: com.yotpo.metorikku.code.steps.Watermark + params: + # Watermark table my_table + table: my_table + # The column representing the event time (needs to be a TIMESTAMP or DATE column) + eventTime: event + delayThreshold: 2 hours +``` + #### Instrumentation One of the most useful features in Metorikku is it's instrumentation capabilities. @@ -309,21 +340,6 @@ Check out the built-in code steps [here](src/main/scala/com/yotpo/metorikku/code *NOTE: If you added some dependencies to your custom JAR build.sbt you have to either use [sbt-assembly](https://github.com/sbt/sbt-assembly) to add them to the JAR or you can use the ```--packages``` when running the spark-submit command* -#### Watermark -Metorikku supports Watermark method which helps a stream processing engine to deal with late data. -You can use watermarking by adding a new udf step in your metric: -```yaml -# This will become the new watermarked dataframe name. -- dataFrameName: dataframe - classpath: com.yotpo.metorikku.code.steps.Watermark - params: - # Watermark table my_table - table: my_table - # The column representing the event time (needs to be a TIMESTAMP or DATE column) - eventTime: event - delayThreshold: 2 hours -``` - #### Apache Hive metastore Metorikku supports reading and saving tables with Apache hive metastore. To enable hive support via spark-submit (assuming you're using MySQL as Hive's DB but any backend can work) send the following configurations: