Skip to content

Commit

Permalink
starrock monitor update
Browse files Browse the repository at this point in the history
  • Loading branch information
“v_kkhuang” committed Nov 28, 2024
1 parent f8368d6 commit 4288a6e
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void jobHistoryFinishedScan() {
logger.info("Get JobHistoryId from cache ID:" + id);
}
List<DataFetcher> fetchers =
JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "updated_time");
JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "finished_job");
if (fetchers.isEmpty()) {
logger.warn("generated 0 dataFetchers, plz check input");
return;
Expand Down Expand Up @@ -178,7 +178,7 @@ public void jobHistoryFinishedScan() {
JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender());
scannerIndex.addScanRule(jobIndexRule);
List<DataFetcher> createFetcher =
JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "department");
JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "");
JobMonitorUtils.run(scannerIndex, createFetcher, true);
}

Expand All @@ -195,7 +195,7 @@ public void jobHistoryUnfinishedScan() {
AnomalyScanner scanner = new DefaultScanner();
boolean shouldStart = false;
List<DataFetcher> fetchers =
JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "created_time");
JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "unfinished_job");
if (fetchers.isEmpty()) {
logger.warn("generated 0 dataFetchers, plz check input");
return;
Expand All @@ -215,9 +215,29 @@ public void jobHistoryUnfinishedScan() {
jobTimeAlerts.keySet(), new JobTimeExceedAlertSender(jobTimeAlerts));
scanner.addScanRule(jobTimeExceedRule);
}
JobMonitorUtils.run(scanner, fetchers, shouldStart);
}

/** * 扫描两个小时之内的任务,满足要求触发,或者kill kill要求:数据源配置kill参数 告警要求:管理台配置告警相关参数 */
@Scheduled(cron = "${linkis.monitor.jdbc.timeout.cron:0 0/10 0 * * ?}")
public void jdbcUnfinishedScan() {
long id =
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedScan"))
.orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
long intervalMs = 7200 * 1000;
long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
long endTime = System.currentTimeMillis();
long startTime = endTime - intervalMs;
AnomalyScanner scanner = new DefaultScanner();
List<DataFetcher> fetchers =
JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "");
if (fetchers.isEmpty()) {
logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input");
return;
}
StarrocksTimeExceedRule starrocksTimeExceedRule =
new StarrocksTimeExceedRule(new StarrocksTimeExceedAlterSender());
scanner.addScanRule(starrocksTimeExceedRule);
JobMonitorUtils.run(scanner, fetchers, shouldStart);
JobMonitorUtils.run(scanner, fetchers, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public static void run(AnomalyScanner scanner, List<DataFetcher> fetchers, Boole
}

public static List<DataFetcher> generateFetchers(
long startTime, long endTime, long maxIntervalMs, long id, String timeType) {
long startTime, long endTime, long maxIntervalMs, long id, String jobStatus) {
List<DataFetcher> ret = new ArrayList<>();
long pe = endTime;
long ps;
while (pe > startTime) {
ps = Math.max(pe - maxIntervalMs, startTime);
String[] fetcherArgs =
new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), timeType};
new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), jobStatus};
ret.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper()));
logger.info(
"Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime: " + new Date(pe));
Expand All @@ -61,11 +61,11 @@ public static List<DataFetcher> generateFetchers(
}

public static List<DataFetcher> generateFetchersfortime(
long startTime, long endTime, long id, String timeType) {
long startTime, long endTime, long id, String jobStatus) {
List<DataFetcher> fetchers = new ArrayList<>();
String[] fetcherArgs =
new String[] {
String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), timeType
String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), jobStatus
};
fetchers.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper()));
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<include refid="jobhistory_query"/>
FROM linkis_ps_job_history_group_history job JOIN linkis_org_user org ON job.submit_user = org.user_name
<where>
<if test="id != null">job.id > #{id}</if>
<if test="id != null">job.id >= #{id}</if>
<if test="umUser != null">and job.submit_user = #{umUser}</if>
<if test="engineType != null">and job.engine_type = #{engineType}</if>
<if test="startDate != null">and job.created_time >= #{startDate} AND job.created_time <![CDATA[<=]]>#{endDate}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper: JobHistoryMapper)
"Wrong input for JobHistoryDataFetcher. DataType: " + args.getClass.getCanonicalName
)
}
if (args != null && args.length == 2) {
val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t =>
{
logger.error("Failed to get data from DB: Illegal arguments.", t)
throw t
}
}
val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t =>
{
logger.error("Failed to get data from DB: Illegal arguments.", t)
throw t
}
}
mapper
.search(null, null, null, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
} else if (args != null && args.length == 4) {
val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t =>
{
logger.error("Failed to get data from DB: Illegal arguments.", t)
throw t
}
}
val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t =>
{
logger.error("Failed to get data from DB: Illegal arguments.", t)
throw t
}
}
val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t =>
{
logger.error("Failed to get data from DB: Illegal arguments.", t)
throw t
}
}
if (
StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3)
.asInstanceOf[String]
.equals("updated_time")
) {
val list = new util.ArrayList[String]()
Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add)
mapper
.searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
} else {
var list = new util.ArrayList[String]()
Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add)
if (args(3).asInstanceOf[String].equals("department")) {
list = null;
}
mapper
.searchByCache(id, null, list, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
if (args != null) {
val start = args(0).asInstanceOf[String].toLong
val end = args(1).asInstanceOf[String].toLong
// 根据参数数量进行不同的处理
args.length match {
// 参数数量为2,则数据库查询仅筛选开始和结束时间
case 2 =>
mapper
.search(null, null, null, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
// 参数数量为4,根据第四个参数进行不同的查询
case 4 =>
val id = args(2).asInstanceOf[String].toLong
val parm = args(3).asInstanceOf[String]
parm match {
// 筛选任务包含id,时间,已完成状态任务
case "finished_job" =>
val list = new util.ArrayList[String]()
Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add)
mapper
.searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
// 筛选任务包含id,时间,未完成状态任务
case "unfinished_job" =>
var list = new util.ArrayList[String]()
Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add)
mapper
.searchByCache(id, null, list, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
// 筛选任务包含id,时间
case _ =>
mapper
.searchByCache(id, null, null, new Date(start), new Date(end), null)
.asInstanceOf[util.List[scala.Any]]
}
case _ =>
throw new AnomalyScannerException(
21304,
"Wrong input for JobHistoryDataFetcher. Data: " + args
)
}
} else {
throw new AnomalyScannerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer)
val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]()
for (sd <- data.asScala) {
if (sd != null && sd.getData() != null) {
var idLong = 0L
for (d <- sd.getData().asScala) {
if (d.isInstanceOf[JobHistory]) {
val jobHistory = d.asInstanceOf[JobHistory]
Expand All @@ -84,24 +83,11 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer)
alertData.add(d.asInstanceOf[JobHistory])
}
}
if (idLong == 0L || jobHistory.getId < idLong) {
idLong = jobHistory.getId
}
scanRuleList.put("jobhistoryScan", jobHistory.getId)
} else {
logger.warn("Ignored wrong input data Type : " + d + ", " + d.getClass.getCanonicalName)
}
}
if (idLong > 0L) {
val id = Optional
.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jobhistoryScan"))
.orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue)
if (id == 0) {
scanRuleList.put("jobhistoryScan", idLong)
}
if (id > idLong) {
scanRuleList.put("jobhistoryScan", idLong)
}
}
} else {
logger.warn("Ignored null scanned data")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class StarrocksTimeExceedRule(hitObserver: Observer)
extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer = hitObserver)
with Logging {

private val scanRuleList = CacheUtils.cacheBuilder

/**
* if data match the pattern, return true and trigger observer should call isMatched()
*
Expand All @@ -52,6 +54,7 @@ class StarrocksTimeExceedRule(hitObserver: Observer)
val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]()
for (scannedData <- data.asScala) {
if (scannedData != null && scannedData.getData() != null) {
var taskMinID = 0L;
for (jobHistory <- scannedData.getData().asScala) {
jobHistory match {
case job: JobHistory =>
Expand Down Expand Up @@ -100,7 +103,10 @@ class StarrocksTimeExceedRule(hitObserver: Observer)
}
}
}
// }
}
if (taskMinID == 0L || taskMinID > job.getId) {
taskMinID = job.getId
scanRuleList.put("jdbcUnfinishedScan", taskMinID)
}
case _ =>
logger.warn(
Expand Down

0 comments on commit 4288a6e

Please sign in to comment.