From 0a7e29722c9490b0c89ec21da99cf7c47a6bb0ad Mon Sep 17 00:00:00 2001 From: Liran Yogev <23477645+lyogev@users.noreply.github.com> Date: Mon, 9 Aug 2021 12:53:56 +0300 Subject: [PATCH] fix(periodic): fix periodic not working (#448) --- .../scala/com/yotpo/metorikku/Metorikku.scala | 23 +++++++++++-------- .../configuration/job/Periodic.scala | 8 +++++++ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/Metorikku.scala b/src/main/scala/com/yotpo/metorikku/Metorikku.scala index 0a2288c2a..e3cb3e366 100644 --- a/src/main/scala/com/yotpo/metorikku/Metorikku.scala +++ b/src/main/scala/com/yotpo/metorikku/Metorikku.scala @@ -1,7 +1,5 @@ package com.yotpo.metorikku -import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit} - import com.yotpo.metorikku.configuration.job.{ConfigurationParser, Periodic} import com.yotpo.metorikku.metric.MetricSet import org.apache.log4j.LogManager @@ -34,15 +32,22 @@ object Metorikku extends App { } private def executePeriodicTask(periodic: Periodic, job: Job) = { - val task = new Runnable { - def run() = { - sparkSession.catalog.clearCache() - runMetrics(job) + val duration = periodic.getTriggerDurationInMillis() + + while(true) { + val start = System.currentTimeMillis + log.info(s"Starting a periodic task at ${start}") + sparkSession.catalog.clearCache() + runMetrics(job) + + val period = System.currentTimeMillis - start + + if (period < duration) { + val sleepTime = duration - period + log.info(s"Waiting for ${sleepTime} milliseconds before starting next run") + Thread.sleep(duration - period) } } - val ex = new ScheduledThreadPoolExecutor(1) - val initialDelay = 0 - ex.scheduleAtFixedRate(task, initialDelay, periodic.getTriggerDurationInSeconds(), TimeUnit.SECONDS) } def runMetrics(job: Job): Unit = { diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/Periodic.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/Periodic.scala index f20d2503e..1bdcbc76f 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/Periodic.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/Periodic.scala @@ -12,4 +12,12 @@ case class Periodic(triggerDuration: Option[String]) { case e: Exception => throw MetorikkuException("Invaiid periodic trigger duration", e) } } + + def getTriggerDurationInMillis(): Long = { + try { + Duration(triggerDuration.get).toMillis + } catch { + case e: Exception => throw MetorikkuException("Invaiid periodic trigger duration", e) + } + } }