diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index d38c497279..ad7b97ced6 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -27,10 +27,7 @@ import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender; import org.apache.linkis.monitor.jobhistory.index.JobIndexRule; import org.apache.linkis.monitor.jobhistory.index.JobIndexSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedAlertSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedRule; -import org.apache.linkis.monitor.jobhistory.jobtime.StarrocksTimeExceedAlterSender; -import org.apache.linkis.monitor.jobhistory.jobtime.StarrocksTimeExceedRule; +import org.apache.linkis.monitor.jobhistory.jobtime.*; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsAlertSender; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsRule; import org.apache.linkis.monitor.jobhistory.runtime.CommonJobRunTimeRule; @@ -100,7 +97,7 @@ public void jobHistoryFinishedScan() { logger.info("Get JobHistoryId from cache ID:" + id); } List fetchers = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "updated_time"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "finished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -178,7 +175,7 @@ public void jobHistoryFinishedScan() { JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender()); scannerIndex.addScanRule(jobIndexRule); List createFetcher = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "department"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, ""); JobMonitorUtils.run(scannerIndex, createFetcher, true); } @@ -195,7 +192,7 @@ public void jobHistoryUnfinishedScan() { AnomalyScanner scanner = new DefaultScanner(); boolean shouldStart = false; List fetchers = - JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "created_time"); + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "unfinished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -215,9 +212,52 @@ public void jobHistoryUnfinishedScan() { jobTimeAlerts.keySet(), new JobTimeExceedAlertSender(jobTimeAlerts)); scanner.addScanRule(jobTimeExceedRule); } + JobMonitorUtils.run(scanner, fetchers, shouldStart); + } + + /** * 每10分钟扫描一次,扫描两个小时之内的任务,告警要求:管理台配置告警相关参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.alert.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedAlertScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedAlertScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } StarrocksTimeExceedRule starrocksTimeExceedRule = - new StarrocksTimeExceedRule(new StarrocksTimeExceedAlterSender()); + new StarrocksTimeExceedRule(new StarrocksTimeExceedAlertSender()); scanner.addScanRule(starrocksTimeExceedRule); - JobMonitorUtils.run(scanner, fetchers, shouldStart); + JobMonitorUtils.run(scanner, fetchers, true); + } + + /** * 每10分钟扫描一次,扫描两个小时之内的任务,满足要求触发kill kill要求:数据源配置kill参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedKillScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } + StarrocksTimeKillRule starrocksTimeKillRule = + new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); + scanner.addScanRule(starrocksTimeKillRule); + JobMonitorUtils.run(scanner, fetchers, true); } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java index 3366014c89..66dd4a3b68 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java @@ -44,14 +44,14 @@ public static void run(AnomalyScanner scanner, List fetchers, Boole } public static List generateFetchers( - long startTime, long endTime, long maxIntervalMs, long id, String timeType) { + long startTime, long endTime, long maxIntervalMs, long id, String jobStatus) { List ret = new ArrayList<>(); long pe = endTime; long ps; while (pe > startTime) { ps = Math.max(pe - maxIntervalMs, startTime); String[] fetcherArgs = - new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), timeType}; + new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), jobStatus}; ret.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( "Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime: " + new Date(pe)); @@ -61,11 +61,11 @@ public static List generateFetchers( } public static List generateFetchersfortime( - long startTime, long endTime, long id, String timeType) { + long startTime, long endTime, long id, String jobStatus) { List fetchers = new ArrayList<>(); String[] fetcherArgs = new String[] { - String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), timeType + String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), jobStatus }; fetchers.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( diff --git a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml index f6807b8857..c711af0f5d 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml +++ b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml @@ -127,7 +127,7 @@ FROM linkis_ps_job_history_group_history job JOIN linkis_org_user org ON job.submit_user = org.user_name - job.id > #{id} + job.id >= #{id} and job.submit_user = #{umUser} and job.engine_type = #{engineType} and job.created_time >= #{startDate} AND job.created_time #{endDate} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala index 4f43d86d40..4f553847f6 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala @@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper: JobHistoryMapper) "Wrong input for JobHistoryDataFetcher. DataType: " + args.getClass.getCanonicalName ) } - if (args != null && args.length == 2) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - mapper - .search(null, null, null, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else if (args != null && args.length == 4) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - if ( - StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3) - .asInstanceOf[String] - .equals("updated_time") - ) { - val list = new util.ArrayList[String]() - Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) - mapper - .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else { - var list = new util.ArrayList[String]() - Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) - if (args(3).asInstanceOf[String].equals("department")) { - list = null; - } - mapper - .searchByCache(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] + if (args != null) { + val start = args(0).asInstanceOf[String].toLong + val end = args(1).asInstanceOf[String].toLong + // 根据参数数量进行不同的处理 + args.length match { + // 参数数量为2,则数据库查询仅筛选开始和结束时间 + case 2 => + mapper + .search(null, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 参数数量为4,根据第四个参数进行不同的查询 + case 4 => + val id = args(2).asInstanceOf[String].toLong + val parm = args(3).asInstanceOf[String] + parm match { + // 筛选任务包含id,时间,已完成状态任务 + case "finished_job" => + val list = new util.ArrayList[String]() + Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间,未完成状态任务 + case "unfinished_job" => + var list = new util.ArrayList[String]() + Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCache(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间 + case _ => + mapper + .searchByCache(id, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + } + case _ => + throw new AnomalyScannerException( + 21304, + "Wrong input for JobHistoryDataFetcher. Data: " + args + ) } } else { throw new AnomalyScannerException( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala index 9e633496a7..0367bfc05e 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala @@ -73,7 +73,6 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer) val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() for (sd <- data.asScala) { if (sd != null && sd.getData() != null) { - var idLong = 0L for (d <- sd.getData().asScala) { if (d.isInstanceOf[JobHistory]) { val jobHistory = d.asInstanceOf[JobHistory] @@ -84,24 +83,11 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer) alertData.add(d.asInstanceOf[JobHistory]) } } - if (idLong == 0L || jobHistory.getId < idLong) { - idLong = jobHistory.getId - } + scanRuleList.put("jobhistoryScan", jobHistory.getId) } else { logger.warn("Ignored wrong input data Type : " + d + ", " + d.getClass.getCanonicalName) } } - if (idLong > 0L) { - val id = Optional - .ofNullable(CacheUtils.cacheBuilder.getIfPresent("jobhistoryScan")) - .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue) - if (id == 0) { - scanRuleList.put("jobhistoryScan", idLong) - } - if (id > idLong) { - scanRuleList.put("jobhistoryScan", idLong) - } - } } else { logger.warn("Ignored null scanned data") } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala similarity index 98% rename from linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala rename to linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala index 492d1f0a23..4e6e707335 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala @@ -32,7 +32,7 @@ import java.util import scala.collection.JavaConverters.asScalaBufferConverter -class StarrocksTimeExceedAlterSender extends Observer with Logging { +class StarrocksTimeExceedAlertSender extends Observer with Logging { /** * Observer Pattern diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala index 8521af6b57..b616c5c02c 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala @@ -37,6 +37,8 @@ class StarrocksTimeExceedRule(hitObserver: Observer) extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer = hitObserver) with Logging { + private val scanRuleList = CacheUtils.cacheBuilder + /** * if data match the pattern, return true and trigger observer should call isMatched() * @@ -52,6 +54,7 @@ class StarrocksTimeExceedRule(hitObserver: Observer) val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() for (scannedData <- data.asScala) { if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; for (jobHistory <- scannedData.getData().asScala) { jobHistory match { case job: JobHistory => @@ -80,27 +83,10 @@ class StarrocksTimeExceedRule(hitObserver: Observer) alertData.add(job) } } - // 获取超时kill配置信息 - if (StringUtils.isNotBlank(job.getParams)) { - val connectParamsMap = MapUtils.getMap( - datasourceConfMap, - "connectParams", - new util.HashMap[AnyRef, AnyRef] - ) - val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") - logger.info("starock killTime: {}", killTime) - if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { - if (StringUtils.isNotBlank(killTime)) { - val timeoutInSeconds = timeValue.toDouble - val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong - if (elapse > timeoutInMillis) { - // 触发kill任务 - HttpsUntils.killJob(job) - } - } - } - } -// } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedAlertScan", taskMinID) } case _ => logger.warn( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala new file mode 100644 index 0000000000..67cbadd2e8 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.core.ob.{Event, Observer} + +class StarrocksTimeKillAlertSender extends Observer with Logging { + + /** + * Observer Pattern + */ + override def update(e: Event, jobHistroyList: scala.Any): Unit = { + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala new file mode 100644 index 0000000000..7cdabf33a8 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.monitor.core.ob.SingleObserverEvent + +class StarrocksTimeKillHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala new file mode 100644 index 0000000000..dfcd934908 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} +import org.apache.linkis.server.BDPJettyServerHelper + +import java.util +import java.util.Locale +import scala.collection.JavaConverters._ + +class StarrocksTimeKillRule(hitObserver: Observer) + extends AbstractScanRule(event = new StarrocksTimeKillHitEvent, observer = hitObserver) + with Logging { + + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + + if (!getHitEvent().isRegistered || data == null) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + for (scannedData <- data.asScala) { + if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; + for (jobHistory <- scannedData.getData().asScala) { + jobHistory match { + case job: JobHistory => + val status = job.getStatus.toUpperCase(Locale.getDefault) + val engineType = job.getEngineType.toUpperCase(Locale.getDefault) + if ( + Constants.UNFINISHED_JOB_STATUS + .contains(status) && engineType.equals( + Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault) + ) + ) { + // 计算任务执行时间 + val elapse = System.currentTimeMillis() - job.getCreatedTime.getTime + // 获取超时kill配置信息 + if (StringUtils.isNotBlank(job.getParams)) { + val connectParamsMap = MapUtils.getMap( + getDatasourceConf(job), + "connectParams", + new util.HashMap[AnyRef, AnyRef] + ) + val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") + logger.info("starock killTime: {}", killTime) + if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { + if (StringUtils.isNotBlank(killTime)) { + val timeoutInSeconds = killTime.toDouble + val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong + if (elapse > timeoutInMillis) { + // 触发kill任务 + HttpsUntils.killJob(job) + } + } + } + } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedKillScan", taskMinID) + } + case _ => + logger.warn( + "Ignored wrong input data Type : " + jobHistory + ", " + jobHistory.getClass.getCanonicalName + ) + } + } + } else { + logger.warn("Ignored null scanned data") + } + } + true + } + + private def getDatasourceConf(job: JobHistory): util.Map[_, _] = { + // 获取任务参数中datasourcename + val parmMap = + BDPJettyServerHelper.gson.fromJson(job.getParams, classOf[java.util.Map[String, String]]) + val configurationMap = + MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, String]()) + val runtimeMap = + MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, String]()) + val datasourceName = MapUtils.getString(runtimeMap, Constants.JOB_DATASOURCE_CONF, "") + // 获取datasource信息 + if (StringUtils.isNotBlank(datasourceName)) { + HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName) + } else { + new util.HashMap[String, String]() + } + } + +}