From c422c5f34546e0ec677b61d5e0d6a82f7573b5af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Tue, 10 Dec 2024 16:53:36 +0800 Subject: [PATCH] Code optimization --- .../monitor/scheduled/JobHistoryMonitor.java | 38 ++++-- ...a => StarrocksTimeExceedAlertSender.scala} | 2 +- .../jobtime/StarrocksTimeExceedRule.scala | 22 +--- .../StarrocksTimeKillAlertSender.scala | 31 +++++ .../jobtime/StarrocksTimeKillHitEvent.scala | 22 ++++ .../jobtime/StarrocksTimeKillRule.scala | 123 ++++++++++++++++++ 6 files changed, 207 insertions(+), 31 deletions(-) rename linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/{StarrocksTimeExceedAlterSender.scala => StarrocksTimeExceedAlertSender.scala} (98%) create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index 85c289b5ea..b2479ff794 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -27,10 +27,7 @@ import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender; import org.apache.linkis.monitor.jobhistory.index.JobIndexRule; import org.apache.linkis.monitor.jobhistory.index.JobIndexSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedAlertSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedRule; -import org.apache.linkis.monitor.jobhistory.jobtime.StarrocksTimeExceedAlterSender; -import org.apache.linkis.monitor.jobhistory.jobtime.StarrocksTimeExceedRule; +import org.apache.linkis.monitor.jobhistory.jobtime.*; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsAlertSender; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsRule; import org.apache.linkis.monitor.jobhistory.runtime.CommonJobRunTimeRule; @@ -218,11 +215,11 @@ public void jobHistoryUnfinishedScan() { JobMonitorUtils.run(scanner, fetchers, shouldStart); } - /** * 扫描两个小时之内的任务,满足要求触发,或者kill kill要求:数据源配置kill参数 告警要求:管理台配置告警相关参数 */ - @Scheduled(cron = "${linkis.monitor.jdbc.timeout.cron:0 0/10 0 * * ?}") - public void jdbcUnfinishedScan() { + /** * 扫描两个小时之内的任务,告警要求:管理台配置告警相关参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.alert.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedAlertScan() { long id = - Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedScan")) + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedAlertScan")) .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); long intervalMs = 7200 * 1000; long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; @@ -236,8 +233,31 @@ public void jdbcUnfinishedScan() { return; } StarrocksTimeExceedRule starrocksTimeExceedRule = - new StarrocksTimeExceedRule(new StarrocksTimeExceedAlterSender()); + new StarrocksTimeExceedRule(new StarrocksTimeExceedAlertSender()); scanner.addScanRule(starrocksTimeExceedRule); JobMonitorUtils.run(scanner, fetchers, true); } + + /** * 扫描两个小时之内的任务,满足要求触发kill kill要求:数据源配置kill参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedKillScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } + StarrocksTimeKillRule starrocksTimeKillRule = + new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); + scanner.addScanRule(starrocksTimeKillRule); + JobMonitorUtils.run(scanner, fetchers, true); + } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala similarity index 98% rename from linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala rename to linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala index 492d1f0a23..4e6e707335 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala @@ -32,7 +32,7 @@ import java.util import scala.collection.JavaConverters.asScalaBufferConverter -class StarrocksTimeExceedAlterSender extends Observer with Logging { +class StarrocksTimeExceedAlertSender extends Observer with Logging { /** * Observer Pattern diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala index ad4f82406b..b616c5c02c 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala @@ -83,30 +83,10 @@ class StarrocksTimeExceedRule(hitObserver: Observer) alertData.add(job) } } - // 获取超时kill配置信息 - if (StringUtils.isNotBlank(job.getParams)) { - val connectParamsMap = MapUtils.getMap( - datasourceConfMap, - "connectParams", - new util.HashMap[AnyRef, AnyRef] - ) - val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") - logger.info("starock killTime: {}", killTime) - if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { - if (StringUtils.isNotBlank(killTime)) { - val timeoutInSeconds = timeValue.toDouble - val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong - if (elapse > timeoutInMillis) { - // 触发kill任务 - HttpsUntils.killJob(job) - } - } - } - } } if (taskMinID == 0L || taskMinID > job.getId) { taskMinID = job.getId - scanRuleList.put("jdbcUnfinishedScan", taskMinID) + scanRuleList.put("jdbcUnfinishedAlertScan", taskMinID) } case _ => logger.warn( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala new file mode 100644 index 0000000000..67cbadd2e8 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.core.ob.{Event, Observer} + +class StarrocksTimeKillAlertSender extends Observer with Logging { + + /** + * Observer Pattern + */ + override def update(e: Event, jobHistroyList: scala.Any): Unit = { + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala new file mode 100644 index 0000000000..7cdabf33a8 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.monitor.core.ob.SingleObserverEvent + +class StarrocksTimeKillHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala new file mode 100644 index 0000000000..dfcd934908 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} +import org.apache.linkis.server.BDPJettyServerHelper + +import java.util +import java.util.Locale +import scala.collection.JavaConverters._ + +class StarrocksTimeKillRule(hitObserver: Observer) + extends AbstractScanRule(event = new StarrocksTimeKillHitEvent, observer = hitObserver) + with Logging { + + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + + if (!getHitEvent().isRegistered || data == null) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + for (scannedData <- data.asScala) { + if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; + for (jobHistory <- scannedData.getData().asScala) { + jobHistory match { + case job: JobHistory => + val status = job.getStatus.toUpperCase(Locale.getDefault) + val engineType = job.getEngineType.toUpperCase(Locale.getDefault) + if ( + Constants.UNFINISHED_JOB_STATUS + .contains(status) && engineType.equals( + Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault) + ) + ) { + // 计算任务执行时间 + val elapse = System.currentTimeMillis() - job.getCreatedTime.getTime + // 获取超时kill配置信息 + if (StringUtils.isNotBlank(job.getParams)) { + val connectParamsMap = MapUtils.getMap( + getDatasourceConf(job), + "connectParams", + new util.HashMap[AnyRef, AnyRef] + ) + val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") + logger.info("starock killTime: {}", killTime) + if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { + if (StringUtils.isNotBlank(killTime)) { + val timeoutInSeconds = killTime.toDouble + val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong + if (elapse > timeoutInMillis) { + // 触发kill任务 + HttpsUntils.killJob(job) + } + } + } + } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedKillScan", taskMinID) + } + case _ => + logger.warn( + "Ignored wrong input data Type : " + jobHistory + ", " + jobHistory.getClass.getCanonicalName + ) + } + } + } else { + logger.warn("Ignored null scanned data") + } + } + true + } + + private def getDatasourceConf(job: JobHistory): util.Map[_, _] = { + // 获取任务参数中datasourcename + val parmMap = + BDPJettyServerHelper.gson.fromJson(job.getParams, classOf[java.util.Map[String, String]]) + val configurationMap = + MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, String]()) + val runtimeMap = + MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, String]()) + val datasourceName = MapUtils.getString(runtimeMap, Constants.JOB_DATASOURCE_CONF, "") + // 获取datasource信息 + if (StringUtils.isNotBlank(datasourceName)) { + HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName) + } else { + new util.HashMap[String, String]() + } + } + +}