Skip to content

Commit

Permalink
feat(udf): add align dataframe code step
Browse files Browse the repository at this point in the history
  • Loading branch information
lyogev committed Feb 12, 2019
1 parent 3724892 commit f80dff3
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 7 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ Check out the [UDF examples directory](examples/udf) for a very simple example o
The only thing important in this JAR is that you have an object with the following method:
```scala
object SomeObject {
def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String): Unit = {}
def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {}
}
```
Inside the run function do whatever you feel like, in the example folder you'll see that we registered a new UDF.
Expand All @@ -269,8 +269,11 @@ Now all that's left is to add it as a new step in your metric:
```yaml
- dataFrameName: dataframe
classpath: com.example.SomeObject
params:
param1: value1
```
This will trigger your ```run``` method with the above dataFrameName.
Check out the built-in code steps [here](src/main/scala/com/yotpo/metorikku/code/steps).

*NOTE: If you added some dependencies to your custom JAR build.sbt you have to either use [sbt-assembly](https://github.com/sbt/sbt-assembly) to add them to the JAR or you can use the ```--packages``` when running the spark-submit command*

Expand Down
2 changes: 1 addition & 1 deletion examples/udf/Example.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object TestUDF {
"Z" + s
}

def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String): Unit = {
def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
ss.udf.register(dataFrameName, udf[String, String](addZPrefix))
}

Expand Down
34 changes: 34 additions & 0 deletions src/main/scala/com/yotpo/metorikku/code/steps/AlignTables.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.yotpo.metorikku.code.steps

import com.yotpo.metorikku.exceptions.MetorikkuException
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, lit}

object AlignTables {
val message = "You need to send 2 parameters with the names of the dataframes to align: from, to"

private def align(fromCols: Array[String], toCols: Array[String]): Array[Column] = {
toCols.map( {
case x if fromCols.contains(x) => col(x)
// scalastyle:off null
case y => lit(null).as(y)
// scalastyle:on null
})
}

def run(ss: org.apache.spark.sql.SparkSession, metricName: String, dataFrameName: String, params: Option[Map[String, String]]): Unit = {
params match {
case Some(paramaters) => {
val fromName = paramaters.get("from").get
val toName = paramaters.get("to").get

val from = ss.table(fromName)
val to = ss.table(toName)

val aligned = from.select(align(from.columns, to.columns): _*)
aligned.createOrReplaceTempView(dataFrameName)
}
case None => throw MetorikkuException(message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ case class Step(sql: Option[String],
file: Option[String],
classpath: Option[String],
dataFrameName: String,
params: Option[Map[String, String]],
var ignoreOnFailures: Option[Boolean]) {

ignoreOnFailures = Option(ignoreOnFailures.getOrElse(false))
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/yotpo/metorikku/metric/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
val errorMessage = s"Failed to calculate dataFrame: ${step.dataFrameName} on metric: ${metricName}"
session.instrumentationClient.count(name="failedSteps", value=1, tags=tags)
if (stepConfig.ignoreOnFailures.get || session.config.continueOnFailedStep.get) {
log.error(errorMessage, ex)
log.error(errorMessage + " - " + ex.getMessage)
} else {
throw MetorikkuFailedStepException(errorMessage, ex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object StepFactory {
case None => {
configuration.classpath match {
case Some(cp) => {
Code(cp, metricName, configuration.dataFrameName)
Code(cp, metricName, configuration.dataFrameName, configuration.params)
}
case None => throw MetorikkuException("Each step requires an SQL query or a path to a file (SQL/Scala)")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import com.yotpo.metorikku.metric.StepAction
import org.apache.spark.sql.SparkSession
import scala.reflect.runtime.universe._

case class Code(objectClassPath: String, metricName: String, dataFrameName: String) extends StepAction[Unit] {
case class Code(objectClassPath: String, metricName: String, dataFrameName: String, params: Option[Map[String, String]]) extends StepAction[Unit] {
type MetorikkuCustomCode = {
def run(sparkSession: SparkSession, metric: String, step: String): Unit
def run(sparkSession: SparkSession, metric: String, step: String, params: Option[Map[String, String]]): Unit
}

val rm = runtimeMirror(getClass.getClassLoader)
Expand All @@ -15,6 +15,6 @@ case class Code(objectClassPath: String, metricName: String, dataFrameName: Stri
val obj = rm.reflectModule(module).instance.asInstanceOf[MetorikkuCustomCode]

override def run(sparkSession: SparkSession): Unit = {
obj.run(sparkSession, metricName, dataFrameName)
obj.run(sparkSession, metricName, dataFrameName, params)
}
}

0 comments on commit f80dff3

Please sign in to comment.