From 4288a6eeb565979fdc9b11aa0fcf408e26eefbb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Thu, 28 Nov 2024 14:26:52 +0800 Subject: [PATCH] starrock monitor update --- .../monitor/scheduled/JobHistoryMonitor.java | 28 +++++- .../linkis/monitor/until/JobMonitorUtils.java | 8 +- .../mapper/common/JobHistoryMapper.xml | 2 +- .../jobhistory/JobHistoryDataFetcher.scala | 94 ++++++++----------- .../jobtime/JobTimeExceedRule.scala | 16 +--- .../jobtime/StarrocksTimeExceedRule.scala | 8 +- 6 files changed, 77 insertions(+), 79 deletions(-) diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index d38c497279..85c289b5ea 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -100,7 +100,7 @@ public void jobHistoryFinishedScan() { logger.info("Get JobHistoryId from cache ID:" + id); } List fetchers = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "updated_time"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "finished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -178,7 +178,7 @@ public void jobHistoryFinishedScan() { JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender()); scannerIndex.addScanRule(jobIndexRule); List createFetcher = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "department"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, ""); JobMonitorUtils.run(scannerIndex, createFetcher, true); } @@ -195,7 +195,7 @@ public void jobHistoryUnfinishedScan() { AnomalyScanner scanner = new DefaultScanner(); boolean shouldStart = false; List fetchers = - JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "created_time"); + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "unfinished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -215,9 +215,29 @@ public void jobHistoryUnfinishedScan() { jobTimeAlerts.keySet(), new JobTimeExceedAlertSender(jobTimeAlerts)); scanner.addScanRule(jobTimeExceedRule); } + JobMonitorUtils.run(scanner, fetchers, shouldStart); + } + + /** * 扫描两个小时之内的任务,满足要求触发,或者kill kill要求:数据源配置kill参数 告警要求:管理台配置告警相关参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } StarrocksTimeExceedRule starrocksTimeExceedRule = new StarrocksTimeExceedRule(new StarrocksTimeExceedAlterSender()); scanner.addScanRule(starrocksTimeExceedRule); - JobMonitorUtils.run(scanner, fetchers, shouldStart); + JobMonitorUtils.run(scanner, fetchers, true); } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java index 3366014c89..66dd4a3b68 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java @@ -44,14 +44,14 @@ public static void run(AnomalyScanner scanner, List fetchers, Boole } public static List generateFetchers( - long startTime, long endTime, long maxIntervalMs, long id, String timeType) { + long startTime, long endTime, long maxIntervalMs, long id, String jobStatus) { List ret = new ArrayList<>(); long pe = endTime; long ps; while (pe > startTime) { ps = Math.max(pe - maxIntervalMs, startTime); String[] fetcherArgs = - new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), timeType}; + new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), jobStatus}; ret.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( "Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime: " + new Date(pe)); @@ -61,11 +61,11 @@ public static List generateFetchers( } public static List generateFetchersfortime( - long startTime, long endTime, long id, String timeType) { + long startTime, long endTime, long id, String jobStatus) { List fetchers = new ArrayList<>(); String[] fetcherArgs = new String[] { - String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), timeType + String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), jobStatus }; fetchers.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( diff --git a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml index f6807b8857..c711af0f5d 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml +++ b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml @@ -127,7 +127,7 @@ FROM linkis_ps_job_history_group_history job JOIN linkis_org_user org ON job.submit_user = org.user_name - job.id > #{id} + job.id >= #{id} and job.submit_user = #{umUser} and job.engine_type = #{engineType} and job.created_time >= #{startDate} AND job.created_time #{endDate} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala index 4f43d86d40..4f553847f6 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala @@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper: JobHistoryMapper) "Wrong input for JobHistoryDataFetcher. DataType: " + args.getClass.getCanonicalName ) } - if (args != null && args.length == 2) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - mapper - .search(null, null, null, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else if (args != null && args.length == 4) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - if ( - StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3) - .asInstanceOf[String] - .equals("updated_time") - ) { - val list = new util.ArrayList[String]() - Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) - mapper - .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else { - var list = new util.ArrayList[String]() - Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) - if (args(3).asInstanceOf[String].equals("department")) { - list = null; - } - mapper - .searchByCache(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] + if (args != null) { + val start = args(0).asInstanceOf[String].toLong + val end = args(1).asInstanceOf[String].toLong + // 根据参数数量进行不同的处理 + args.length match { + // 参数数量为2,则数据库查询仅筛选开始和结束时间 + case 2 => + mapper + .search(null, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 参数数量为4,根据第四个参数进行不同的查询 + case 4 => + val id = args(2).asInstanceOf[String].toLong + val parm = args(3).asInstanceOf[String] + parm match { + // 筛选任务包含id,时间,已完成状态任务 + case "finished_job" => + val list = new util.ArrayList[String]() + Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间,未完成状态任务 + case "unfinished_job" => + var list = new util.ArrayList[String]() + Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCache(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间 + case _ => + mapper + .searchByCache(id, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + } + case _ => + throw new AnomalyScannerException( + 21304, + "Wrong input for JobHistoryDataFetcher. Data: " + args + ) } } else { throw new AnomalyScannerException( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala index 9e633496a7..0367bfc05e 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala @@ -73,7 +73,6 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer) val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() for (sd <- data.asScala) { if (sd != null && sd.getData() != null) { - var idLong = 0L for (d <- sd.getData().asScala) { if (d.isInstanceOf[JobHistory]) { val jobHistory = d.asInstanceOf[JobHistory] @@ -84,24 +83,11 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer) alertData.add(d.asInstanceOf[JobHistory]) } } - if (idLong == 0L || jobHistory.getId < idLong) { - idLong = jobHistory.getId - } + scanRuleList.put("jobhistoryScan", jobHistory.getId) } else { logger.warn("Ignored wrong input data Type : " + d + ", " + d.getClass.getCanonicalName) } } - if (idLong > 0L) { - val id = Optional - .ofNullable(CacheUtils.cacheBuilder.getIfPresent("jobhistoryScan")) - .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue) - if (id == 0) { - scanRuleList.put("jobhistoryScan", idLong) - } - if (id > idLong) { - scanRuleList.put("jobhistoryScan", idLong) - } - } } else { logger.warn("Ignored null scanned data") } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala index 8521af6b57..ad4f82406b 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala @@ -37,6 +37,8 @@ class StarrocksTimeExceedRule(hitObserver: Observer) extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer = hitObserver) with Logging { + private val scanRuleList = CacheUtils.cacheBuilder + /** * if data match the pattern, return true and trigger observer should call isMatched() * @@ -52,6 +54,7 @@ class StarrocksTimeExceedRule(hitObserver: Observer) val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() for (scannedData <- data.asScala) { if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; for (jobHistory <- scannedData.getData().asScala) { jobHistory match { case job: JobHistory => @@ -100,7 +103,10 @@ class StarrocksTimeExceedRule(hitObserver: Observer) } } } -// } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedScan", taskMinID) } case _ => logger.warn(