Skip to content

Commit

Permalink
add get all undone task method
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Nov 25, 2023
1 parent 5c204b2 commit fe4f1f6
Showing 1 changed file with 94 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.linkis.entrance

import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException}
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
Expand All @@ -29,7 +28,7 @@ import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
import org.apache.linkis.entrance.exception.{EntranceErrorException, SubmitFailedException}
import org.apache.linkis.entrance.execute.EntranceJob
import org.apache.linkis.entrance.job.EntranceExecutionJob
import org.apache.linkis.entrance.log.{Cache, CacheLogWriter, HDFSCacheLogWriter, LogReader}
import org.apache.linkis.entrance.log.LogReader
import org.apache.linkis.entrance.parser.ParserUtils
import org.apache.linkis.entrance.timeout.JobTimeoutManager
import org.apache.linkis.entrance.utils.JobHistoryHelper
Expand All @@ -44,16 +43,14 @@ import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.conf.RPCConfiguration
import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState}
import org.apache.linkis.server.conf.ServerConfiguration
import org.apache.linkis.storage.utils.StorageUtils

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.springframework.beans.BeanUtils

import java.{lang, util}
import java.text.{MessageFormat, SimpleDateFormat}
import java.util.Date
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

Expand All @@ -63,6 +60,8 @@ abstract class EntranceServer extends Logging {

private val jobTimeoutManager: JobTimeoutManager = new JobTimeoutManager()

private val timeoutCheck = EntranceConfiguration.ENABLE_JOB_TIMEOUT_CHECK.getValue

def init(): Unit

def getName: String
Expand Down Expand Up @@ -126,6 +125,17 @@ abstract class EntranceServer extends Logging {
* this to trigger JobListener.onJobinit()
*/
Utils.tryAndWarn(job.getJobListener.foreach(_.onJobInited(job)))
if (logger.isDebugEnabled()) {
logger.debug(
s"After code preprocessing, the real execution code is:${jobRequest.getExecutionCode}"
)
}
if (StringUtils.isBlank(jobRequest.getExecutionCode)) {
throw new SubmitFailedException(
SUBMIT_CODE_ISEMPTY.getErrorCode,
SUBMIT_CODE_ISEMPTY.getErrorDesc
)
}
getEntranceContext.getOrCreateScheduler().submit(job)
val msg = LogUtils.generateInfo(
s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted "
Expand All @@ -135,7 +145,7 @@ abstract class EntranceServer extends Logging {
job match {
case entranceJob: EntranceJob =>
entranceJob.getJobRequest.setReqId(job.getId())
if (jobTimeoutManager.timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob)) {
if (timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob)) {
jobTimeoutManager.add(job.getId(), entranceJob)
}
entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg))
Expand Down Expand Up @@ -492,14 +502,14 @@ abstract class EntranceServer extends Logging {
job.setProgressListener(getEntranceContext.getOrCreatePersistenceManager())
job.setJobListener(getEntranceContext.getOrCreatePersistenceManager())
job match {
case entranceJob: EntranceJob => {
case entranceJob: EntranceJob =>
entranceJob.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus)
}
case _ =>
}
Utils.tryCatch {
if (logAppender.length() > 0)
if (logAppender.length() > 0) {
job.getLogListener.foreach(_.onLogUpdate(job, logAppender.toString.trim))
}
} { t =>
logger.error("Failed to write init JobRequest log, reason: ", t)
}
Expand All @@ -510,6 +520,17 @@ abstract class EntranceServer extends Logging {
* this to trigger JobListener.onJobinit()
*/
Utils.tryAndWarn(job.getJobListener.foreach(_.onJobInited(job)))
if (logger.isDebugEnabled()) {
logger.debug(
s"After code preprocessing, the real execution code is:${jobRequest.getExecutionCode}"
)
}
if (StringUtils.isBlank(jobRequest.getExecutionCode)) {
throw new SubmitFailedException(
SUBMIT_CODE_ISEMPTY.getErrorCode,
SUBMIT_CODE_ISEMPTY.getErrorDesc
)
}
getEntranceContext.getOrCreateScheduler().submit(job)
val msg = LogUtils.generateInfo(
s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted, success to failover"
Expand All @@ -519,8 +540,9 @@ abstract class EntranceServer extends Logging {
job match {
case entranceJob: EntranceJob =>
entranceJob.getJobRequest.setReqId(job.getId())
if (jobTimeoutManager.timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob))
if (timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob)) {
jobTimeoutManager.add(job.getId(), entranceJob)
}
entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg))
case _ =>
}
Expand Down Expand Up @@ -621,6 +643,68 @@ abstract class EntranceServer extends Logging {
logger.info(s"job ${jobRequest.getId} success to initialize the properties")
}

def getAllUndoneTask(filterWords: String, ecType: String = null): Array[EntranceJob] = {
val consumers = getEntranceContext
.getOrCreateScheduler()
.getSchedulerContext
.getOrCreateConsumerManager
.listConsumers()
.toSet
val filterConsumer = if (StringUtils.isNotBlank(filterWords)) {
if (StringUtils.isNotBlank(ecType)) {
consumers.filter(consumer =>
consumer.getGroup.getGroupName.contains(filterWords) && consumer.getGroup.getGroupName
.contains(ecType)
)
} else {
consumers.filter(_.getGroup.getGroupName.contains(filterWords))
}
} else {
consumers
}
filterConsumer
.flatMap { consumer =>
consumer.getRunningEvents ++ consumer.getConsumeQueue.getWaitingEvents
}
.filter(job => job != null && job.isInstanceOf[EntranceJob])
.map(_.asInstanceOf[EntranceJob])
.toArray
}

/**
* to check timeout task,and kill timeout task timeout: default > 48h
*/
def startTimeOutCheck(): Unit = {
Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable() {
override def run(): Unit = {
Utils.tryCatch {

val timeoutType = EntranceConfiguration.ENTRANCE_TASK_TIMEOUT.getHotValue()
logger.info(s"Start to check timeout Job, timout is ${timeoutType}")
val timeoutTime = System.currentTimeMillis() - timeoutType.toLong
getAllUndoneTask(null, null).filter(job => job.createTime < timeoutTime).foreach {
job =>
job.onFailure(s"Job has run for longer than the maximum time $timeoutType", null)
}
logger.info(s"Finished to check timeout Job, timout is ${timeoutType}")
} { case t: Throwable =>
logger.warn(s"TimeoutDetective Job failed. ${t.getMessage}", t)
}
}

},
EntranceConfiguration.ENTRANCE_TASK_TIMEOUT_SCAN.getValue.toLong,
EntranceConfiguration.ENTRANCE_TASK_TIMEOUT_SCAN.getValue.toLong,
TimeUnit.MILLISECONDS
)
}

if (timeoutCheck) {
logger.info("Job time check is enabled")
startTimeOutCheck()
}

}

object EntranceServer {
Expand Down

0 comments on commit fe4f1f6

Please sign in to comment.