Skip to content

Commit

Permalink
Merge pull request #174 from YotpoLtd/multitopic_writing
Browse files Browse the repository at this point in the history
feat(kafka): add support for regex topic
  • Loading branch information
amitco1 authored May 2, 2019
2 parents 7540d49 + b577338 commit 2903df4
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 48 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,37 @@ This will commit the offsets to kafka, as a new dummy consumer group.

* If your subject schema name is not ```<TOPIC NAME>-value``` (e.g. if the topic is a regex pattern) you can specify the schema subject in the ```schemaSubject``` section

###### Topic Pattern
Kafka input also allows reading messages from multiple topics by using subscribe pattern:
```yaml
inputs:
testStream:
kafka:
servers:
- 127.0.0.1:9092
# topicPattern can be any Java regex string
topicPattern: my_topics_regex.*
consumerGroup: testConsumerGroupID # optional
schemaRegistryUrl: https://schema-registry-url # optional
schemaSubject: subject # optional
```
* While using topicPattern, consider using ```schemaRegistryUrl``` and ```schemaSubject``` in case your topics have different schemas.

##### Watermark
Metorikku supports Watermark method which helps a stream processing engine to deal with late data.
You can use watermarking by adding a new udf step in your metric:
```yaml
# This will become the new watermarked dataframe name.
- dataFrameName: dataframe
classpath: com.yotpo.metorikku.code.steps.Watermark
params:
# Watermark table my_table
table: my_table
# The column representing the event time (needs to be a TIMESTAMP or DATE column)
eventTime: event
delayThreshold: 2 hours
```

#### Instrumentation
One of the most useful features in Metorikku is it's instrumentation capabilities.

Expand All @@ -247,6 +278,9 @@ Metorikku sends automatically on top of what spark is already sending the follow
* In streaming: number of processed records in batch

You can also send any information you like to the instrumentation output within a metric.
by default the last column of the schema will be the field value.
Other columns that are not value or time columns will be merged together as the name of the metric.
If writing directly to influxDB these will become tags.

Check out the [example](examples/movies_metric.yaml) for further details.

Expand Down
7 changes: 4 additions & 3 deletions config/metric_config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ output:
- dataFrameName: table1
outputType: Instrumentation
outputOptions:
# This will be used to specify a column that will be treated as a string and be used as a key
keyColumn: col1
# This will be used to specify a column that will be treated as a string and be used as a value.
# by default valuecolumn will be the last column of the schema.
valueColumn: col1
# When using influx instrumentation, use the time column in the table to set the time (by default it's now)
timeColumn: col3
- dataFrameName: table1
Expand Down Expand Up @@ -98,7 +99,7 @@ output:
keyColumn: col1
# This column will be sent as the payload of the kafka message
valueColumn: col2
# In streaming mode set to append/replace/complete default is append
# In streaming mode set to append/update/complete default is append
outputMode: append
# In streaming mode set the trigger type (Once/Processing time)
triggerType: Once
Expand Down
2 changes: 1 addition & 1 deletion examples/influxdb/movies_metric.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ output:
outputType: Instrumentation
outputOptions:
timeColumn: month_ts
keyColumn: genre
valueColumn: number_of_ratings
20 changes: 20 additions & 0 deletions src/main/scala/com/yotpo/metorikku/code/steps/Watermark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.yotpo.metorikku.code.steps

import com.yotpo.metorikku.exceptions.MetorikkuException

object Watermark {
val message = "You need to send 3 parameters: table, eventTime, delayThreshold"

def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
params match {
case Some(paramaters) => {
val table = paramaters.get("table").get
val eventTime = paramaters.get("eventTime").get
val delayThreshold = paramaters.get("delayThreshold").get

ss.table(table).withWatermark(eventTime, delayThreshold).createOrReplaceTempView(dataFrameName)
}
case None => throw MetorikkuException(message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import com.yotpo.metorikku.input.Reader
import com.yotpo.metorikku.input.readers.kafka.KafkaInput

case class Kafka(servers: Seq[String],
topic: String,
topic: Option[String],
topicPattern: Option[String],
consumerGroup: Option[String],
options: Option[Map[String, String]],
schemaRegistryUrl: Option[String],
schemaSubject: Option[String]
) extends InputConfig {
require(Option(servers).isDefined, "Servers Must be Defined")
require(Option(topic).isDefined, "Topic must be defined")
require(topic.isDefined && !topicPattern.isDefined || !topic.isDefined && topicPattern.isDefined, "Exactly one of (topic, topicPattern) must be defined")

override def getReader(name: String): Reader = KafkaInput(name, servers, topic, consumerGroup, options, schemaRegistryUrl, schemaSubject)
override def getReader(name: String): Reader = KafkaInput(name, servers, topic, topicPattern, consumerGroup, options, schemaRegistryUrl, schemaSubject)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,35 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.spark.sql.{DataFrame, SparkSession}


case class KafkaInput(name: String, servers: Seq[String], topic: String, consumerGroup: Option[String],
options: Option[Map[String, String]], schemaRegistryUrl: Option[String], schemaSubject: Option[String]) extends Reader {
case class KafkaInput(name: String, servers: Seq[String], topic: Option[String], topicPattern: Option[String], consumerGroup: Option[String],
options: Option[Map[String, String]], schemaRegistryUrl: Option[String], schemaSubject: Option[String]) extends Reader {
@transient lazy val log = org.apache.log4j.LogManager.getLogger(this.getClass)


def read(sparkSession: SparkSession): DataFrame = {
consumerGroup match {
case Some(group) =>
log.info(s"creating consumer group with id $group")
val consumer = createKafkaConsumer(group)
val lagWriter = new KafkaLagWriter(consumer, topic)
val chosen_topic = topic.getOrElse(topicPattern.getOrElse(""))
val lagWriter = new KafkaLagWriter(consumer, chosen_topic)
sparkSession.streams.addListener(lagWriter)
case _ =>
}

val bootstrapServers = servers.mkString(",")
val inputStream = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
topic match {
case Some(regular_topic) =>
inputStream.option("subscribe", regular_topic)
case _ =>
}
topicPattern match {
case Some(regex_topic) =>
inputStream.option("subscribePattern", regex_topic)
case _ =>
}

if (options.nonEmpty) {
inputStream.options(options.get)
Expand All @@ -34,12 +45,11 @@ case class KafkaInput(name: String, servers: Seq[String], topic: String, consume
val kafkaDataFrame = inputStream.load()
schemaRegistryUrl match {
case Some(url) => {
val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, topic, schemaSubject)
val schemaRegistryDeserializer = new SchemaRegistryDeserializer(url, topic.getOrElse(""), schemaSubject)
schemaRegistryDeserializer.getDeserializedDataframe(sparkSession, kafkaDataFrame)
}
case None => kafkaDataFrame
}

}

private def createKafkaConsumer(consumerGroupID: String) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class KafkaLagWriter(kafkaConsumer: KafkaConsumer[String, String], topic: String
om.registerModule(DefaultScalaModule)
event.progress.sources.foreach(source => {
val jsonOffsets = om.readValue(source.endOffset, classOf[Map[String, Map[String, Long]]])
jsonOffsets.keys.filter(key => key == topic)
jsonOffsets.keys
.foreach(topic => {
log.debug(s"committing offsets for topic $topic")
val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,51 @@ class InstrumentationOutputWriter(props: Map[String, String],
instrumentationFactory: InstrumentationFactory) extends Writer {
@transient lazy val log: Logger = LogManager.getLogger(this.getClass)

val keyColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("keyColumn")
val valueColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("valueColumn")
val timeColumnProperty: Option[String] = Option(props).getOrElse(Map()).get("timeColumn")

override def write(dataFrame: DataFrame): Unit = {
val columns = dataFrame.schema.fields.zipWithIndex
val indexOfKeyCol = keyColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col)))
val indexOfValCol = valueColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col)))
val indexOfTimeCol = timeColumnProperty.flatMap(col => Option(dataFrame.schema.fieldNames.indexOf(col)))

log.info(s"Starting to write Instrumentation of data frame: $dataFrameName on metric: $metricName")
dataFrame.foreachPartition(p => {
val client = instrumentationFactory.create()

// use last column if valueColumn is missing
val actualIndexOfValCol = indexOfValCol.getOrElse(columns.length - 1)

p.foreach(row => {
for ((column, i) <- columns) {
try {
// Don't write key/time column to metric
if ((!indexOfKeyCol.isDefined || i != indexOfKeyCol.get) && (!indexOfTimeCol.isDefined || i != indexOfTimeCol.get)) {
val valueOfRowAtCurrentCol = row.get(i)
// Only if value is numeric
if (valueOfRowAtCurrentCol != null && classOf[Number].isAssignableFrom(valueOfRowAtCurrentCol.getClass)) {
val longValue = valueOfRowAtCurrentCol.asInstanceOf[Number].longValue()
val keyColumnTags = getTagsForKeyColumn(indexOfKeyCol, row)
val time = getTime(indexOfTimeCol, row)
val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++ keyColumnTags
try {

val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName) ++
columns.filter {
case (column, index) => index != actualIndexOfValCol && (!indexOfTimeCol.isDefined || index != indexOfTimeCol.get)
}.map {
case (column, index) =>
column.name -> row.get(index).asInstanceOf[AnyVal].toString
}.toMap

val time = getTime(indexOfTimeCol, row)
val metricValue = row.get(actualIndexOfValCol)

client.gauge(name = column.name, value = longValue, tags = tags, time = time)
} else {
throw MetorikkuWriteFailedException("Value column doesn't contain a number")
}
}
} catch {
case ex: Throwable =>
throw MetorikkuWriteFailedException(s"failed to write instrumentation on data frame: $dataFrameName " +
s"for row: ${row.toString()} on column: ${column.name}", ex)
if (metricValue != null && classOf[Number].isAssignableFrom(metricValue.getClass)) {
val longValue = metricValue.asInstanceOf[Number].longValue()
val valueColumnName = row.schema.fieldNames(actualIndexOfValCol)
client.gauge(name = valueColumnName, value = longValue, tags = tags, time = time)
}

}catch {
case ex: Throwable =>
throw MetorikkuWriteFailedException(s"failed to write instrumentation on data frame: $dataFrameName " +
s"for row: ${row.toString()}", ex)
}
})

client.close()
})
}

def getTagsForKeyColumn(indexOfKeyCol: Option[Int], row: Row): Map[String, String] = {
if (indexOfKeyCol.isDefined) {
if (!row.isNullAt(indexOfKeyCol.get)) {
return Map(keyColumnProperty.get -> row.get(indexOfKeyCol.get).asInstanceOf[AnyVal].toString)
} else {
throw MetorikkuWriteFailedException("Defined key column is null for row")
}
}
Map()
}

def getTime(indexOfTimeCol: Option[Int], row: Row): Long = {
if (indexOfTimeCol.isDefined) {
if (!row.isNullAt(indexOfTimeCol.get)) {
Expand Down

0 comments on commit 2903df4

Please sign in to comment.