diff --git a/build.sbt b/build.sbt index b6473688f..8478ab642 100644 --- a/build.sbt +++ b/build.sbt @@ -63,7 +63,7 @@ libraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.6", "org.influxdb" % "influxdb-java" % "2.14", "org.apache.kafka" %% "kafka" % "2.2.0" % "provided", - "za.co.absa" % "abris_2.11" % "3.1.1" % "provided" excludeAll(excludeAvro, excludeSpark), + "za.co.absa" % "abris_2.11" % "3.2.0" % "provided" excludeAll(excludeAvro, excludeSpark), "org.apache.hudi" %% "hudi-spark-bundle" % "0.5.2-incubating" % "provided" excludeAll excludeFasterXML, "org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided", "org.apache.avro" % "avro" % "1.8.2" % "provided", diff --git a/src/main/scala/com/yotpo/metorikku/Metorikku.scala b/src/main/scala/com/yotpo/metorikku/Metorikku.scala index a12943610..8d1662467 100644 --- a/src/main/scala/com/yotpo/metorikku/Metorikku.scala +++ b/src/main/scala/com/yotpo/metorikku/Metorikku.scala @@ -9,20 +9,38 @@ import org.apache.log4j.LogManager object Metorikku extends App { val log = LogManager.getLogger(this.getClass) log.info("Starting Metorikku - Parsing configuration") - val session = Job(ConfigurationParser.parse(args)) - session.config.periodic match { - case Some(periodic) => { - executePeriodicTask(periodic) + val configurations = ConfigurationParser.parse(args) + + val jobs = configurations.map(config => + new Runnable { + def run(): Unit = { + val job = Job(config) + + job.config.periodic match { + case Some(periodic) => { + executePeriodicTask(job, periodic) + } + case _ => runMetrics(job) + } + } } - case _ => runMetrics(session) + ) + + jobs match { + case s if s.length > 1 => { + val threads = jobs.map(r => new Thread(r)) + threads.foreach(t => t.start()) + threads.foreach(t => t.join()) + } + case _ => jobs.foreach(r => r.run()) } - private def executePeriodicTask(periodic: Periodic) = { + private def executePeriodicTask(job: Job, periodic: Periodic) = { val task = new Runnable { def run() = { - session.sparkSession.catalog.clearCache() - runMetrics(session) + job.sparkSession.catalog.clearCache() + runMetrics(job) } } val ex = new ScheduledThreadPoolExecutor(1) @@ -30,31 +48,12 @@ object Metorikku extends App { ex.scheduleAtFixedRate(task, initialDelay, periodic.getTriggerDurationInSeconds(), TimeUnit.SECONDS) } - def runMetricsInParallel(job: Job, metrics: Seq[String]): Unit = { - val threads = metrics.map(metricSetPath => new Thread(new Runnable { - def run() { - val metricSet = new MetricSet(metricSetPath) - metricSet.run(job) - } - })).toList - - threads.foreach(t => t.start()) - threads.foreach(t => t.join()) - } - def runMetrics(job: Job): Unit = { job.config.metrics match { - case Some(metrics) => { - session.config.parallel match { - case Some(true) => runMetricsInParallel(job, metrics) - case _ => { - metrics.foreach(metricSetPath => { - val metricSet = new MetricSet(metricSetPath) - metricSet.run(job) - }) - } - } - } + case Some(metrics) => metrics.foreach(metricSetPath => { + val metricSet = new MetricSet(metricSetPath) + metricSet.run(job) + }) case None => log.warn("No mertics were defined, exiting") } } diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala index 41e410ac8..5f3cebf4c 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala @@ -10,28 +10,32 @@ import scopt.OptionParser object ConfigurationParser { val log: Logger = LogManager.getLogger(this.getClass) - case class ConfigFileName(job: Option[String] = None, filename: Option[String] = None) + case class ConfigFileName(job: Option[String] = None, filename: Option[Seq[String]] = None) val CLIparser: OptionParser[ConfigFileName] = new scopt.OptionParser[ConfigFileName]("Metorikku") { head("Metorikku", "1.0") opt[String]('j', "job") .action((x, c) => c.copy(job = Option(x))) .text("Job configuration JSON") - opt[String]('c', "config") - .text("Path to the job config file (YAML/JSON)") + opt[Seq[String]]('c', "config") + .text("Path to the job config file (YAML/JSON), you can add multiple files by concatenating the file names with ,") .action((x, c) => c.copy(filename = Option(x))) help("help") text "use command line arguments to specify the configuration file path or content" } - def parse(args: Array[String]): Configuration = { + def parse(args: Array[String]): Seq[Configuration] = { log.info("Starting Metorikku - Parsing configuration") CLIparser.parse(args, ConfigFileName()) match { case Some(arguments) => arguments.job match { - case Some(job) => parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json")) + case Some(job) => Seq(parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json"))) case None => arguments.filename match { - case Some(filename) => parseConfigurationFile(FileUtils.readConfigurationFile(filename), FileUtils.getObjectMapperByFileName(filename)) + case Some(filenames) => + filenames. + map(filename => + parseConfigurationFile(FileUtils.readConfigurationFile(filename), + FileUtils.getObjectMapperByFileName(filename))).toList case None => throw new MetorikkuException("Failed to parse config file") } }