Skip to content

Commit

Permalink
fix(scheduler): add fair scheduler and a new parallel mode to metorikku
Browse files Browse the repository at this point in the history
  • Loading branch information
lyogev committed May 27, 2020
1 parent 6164df1 commit da451a2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 38 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ libraryDependencies ++= Seq(
"org.apache.commons" % "commons-text" % "1.6",
"org.influxdb" % "influxdb-java" % "2.14",
"org.apache.kafka" %% "kafka" % "2.2.0" % "provided",
"za.co.absa" % "abris_2.11" % "3.1.1" % "provided" excludeAll(excludeAvro, excludeSpark),
"za.co.absa" % "abris_2.11" % "3.2.0" % "provided" excludeAll(excludeAvro, excludeSpark),
"org.apache.hudi" %% "hudi-spark-bundle" % "0.5.2-incubating" % "provided" excludeAll excludeFasterXML,
"org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided",
"org.apache.avro" % "avro" % "1.8.2" % "provided",
Expand Down
61 changes: 30 additions & 31 deletions src/main/scala/com/yotpo/metorikku/Metorikku.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,51 @@ import org.apache.log4j.LogManager
object Metorikku extends App {
val log = LogManager.getLogger(this.getClass)
log.info("Starting Metorikku - Parsing configuration")
val session = Job(ConfigurationParser.parse(args))

session.config.periodic match {
case Some(periodic) => {
executePeriodicTask(periodic)
val configurations = ConfigurationParser.parse(args)

val jobs = configurations.map(config =>
new Runnable {
def run(): Unit = {
val job = Job(config)

job.config.periodic match {
case Some(periodic) => {
executePeriodicTask(job, periodic)
}
case _ => runMetrics(job)
}
}
}
case _ => runMetrics(session)
)

jobs match {
case s if s.length > 1 => {
val threads = jobs.map(r => new Thread(r))
threads.foreach(t => t.start())
threads.foreach(t => t.join())
}
case _ => jobs.foreach(r => r.run())
}

private def executePeriodicTask(periodic: Periodic) = {
private def executePeriodicTask(job: Job, periodic: Periodic) = {
val task = new Runnable {
def run() = {
session.sparkSession.catalog.clearCache()
runMetrics(session)
job.sparkSession.catalog.clearCache()
runMetrics(job)
}
}
val ex = new ScheduledThreadPoolExecutor(1)
val initialDelay = 0
ex.scheduleAtFixedRate(task, initialDelay, periodic.getTriggerDurationInSeconds(), TimeUnit.SECONDS)
}

def runMetricsInParallel(job: Job, metrics: Seq[String]): Unit = {
val threads = metrics.map(metricSetPath => new Thread(new Runnable {
def run() {
val metricSet = new MetricSet(metricSetPath)
metricSet.run(job)
}
})).toList

threads.foreach(t => t.start())
threads.foreach(t => t.join())
}

def runMetrics(job: Job): Unit = {
job.config.metrics match {
case Some(metrics) => {
session.config.parallel match {
case Some(true) => runMetricsInParallel(job, metrics)
case _ => {
metrics.foreach(metricSetPath => {
val metricSet = new MetricSet(metricSetPath)
metricSet.run(job)
})
}
}
}
case Some(metrics) => metrics.foreach(metricSetPath => {
val metricSet = new MetricSet(metricSetPath)
metricSet.run(job)
})
case None => log.warn("No mertics were defined, exiting")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,32 @@ import scopt.OptionParser
object ConfigurationParser {
val log: Logger = LogManager.getLogger(this.getClass)

case class ConfigFileName(job: Option[String] = None, filename: Option[String] = None)
case class ConfigFileName(job: Option[String] = None, filename: Option[Seq[String]] = None)

val CLIparser: OptionParser[ConfigFileName] = new scopt.OptionParser[ConfigFileName]("Metorikku") {
head("Metorikku", "1.0")
opt[String]('j', "job")
.action((x, c) => c.copy(job = Option(x)))
.text("Job configuration JSON")
opt[String]('c', "config")
.text("Path to the job config file (YAML/JSON)")
opt[Seq[String]]('c', "config")
.text("Path to the job config file (YAML/JSON), you can add multiple files by concatenating the file names with ,")
.action((x, c) => c.copy(filename = Option(x)))
help("help") text "use command line arguments to specify the configuration file path or content"
}

def parse(args: Array[String]): Configuration = {
def parse(args: Array[String]): Seq[Configuration] = {
log.info("Starting Metorikku - Parsing configuration")

CLIparser.parse(args, ConfigFileName()) match {
case Some(arguments) =>
arguments.job match {
case Some(job) => parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json"))
case Some(job) => Seq(parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json")))
case None => arguments.filename match {
case Some(filename) => parseConfigurationFile(FileUtils.readConfigurationFile(filename), FileUtils.getObjectMapperByFileName(filename))
case Some(filenames) =>
filenames.
map(filename =>
parseConfigurationFile(FileUtils.readConfigurationFile(filename),
FileUtils.getObjectMapperByFileName(filename))).toList
case None => throw new MetorikkuException("Failed to parse config file")
}
}
Expand Down

0 comments on commit da451a2

Please sign in to comment.