Skip to content

Commit

Permalink
fix(Metric.scala):close_instrumentation_client (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
Irenez753 authored Aug 10, 2020
1 parent e6daaed commit 7467fa6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
12 changes: 11 additions & 1 deletion src/main/scala/com/yotpo/metorikku/Metorikku.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ object Metorikku extends App {
case Some(periodic) => {
executePeriodicTask(periodic)
}
case _ => runMetrics(session)
case _ => {
runMetrics(session)
try
{
session.instrumentationClient.close()
}
catch
{
case e: Throwable => log.error(s"Got exception while closing connection to instrumentationClient", e)
}
}
}

private def executePeriodicTask(periodic: Periodic) = {
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/com/yotpo/metorikku/metric/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ case class Metric(configuration: Configuration, metricDir: Option[File], metricN
val dataFrameName = outputConfig.dataFrameName
val dataFrame = repartition(outputConfig, job.sparkSession.table(dataFrameName))


val outputOptions = Option(outputConfig.outputOptions).getOrElse(Map())
outputOptions.get("protectFromEmptyOutput").asInstanceOf[Option[Boolean]] match {
case Some(true) => {
Expand All @@ -158,7 +157,6 @@ case class Metric(configuration: Configuration, metricDir: Option[File], metricN
job.config.cacheCountOnOutput)
}
})

for ((dataFrameName, streamingConfig) <- streamingWriterList) writeStream(dataFrameName,
streamingConfig.streamingWritingConfiguration, job.config.streaming, job.instrumentationClient)

Expand All @@ -167,3 +165,4 @@ case class Metric(configuration: Configuration, metricDir: Option[File], metricN
}
}
}

0 comments on commit 7467fa6

Please sign in to comment.