Skip to content

Commit

Permalink
Dev 1.10.0 analyze job (#666)
Browse files Browse the repository at this point in the history
* push analyze job code

* push analyze job code

* Code optimization

---------

Co-authored-by: “v_kkhuang” <“[email protected]”>
  • Loading branch information
v-kkhuang and “v_kkhuang” authored Dec 12, 2024
1 parent 6a3d2c0 commit cc19a5f
Show file tree
Hide file tree
Showing 19 changed files with 750 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.common.utils;

import org.apache.linkis.common.conf.CommonVars;

import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class SHAUtils {

public static String DOCTOR_NONCE = "12345";
public static final CommonVars<String> DOCTOR_TOKEN =
CommonVars.apply("linkis.doctor.signature.token", "");

/**
* 对字符串加密,默认使用SHA-256
*
* @param strSrc 要加密的字符串
* @param encName 加密类型
* @return
* @throws UnsupportedEncodingException
*/
public static String Encrypt(String strSrc, String encName) throws UnsupportedEncodingException {
MessageDigest md = null;
String strDes = null;
byte[] bt = strSrc.getBytes("utf-8");
try {
if (encName == null || encName.equals("")) {
encName = "SHA-256";
}
md = MessageDigest.getInstance(encName);
md.update(bt);
strDes = bytes2Hex(md.digest()); // to HexString
} catch (NoSuchAlgorithmException e) {
return null;
}
return strDes;
}

public static String bytes2Hex(byte[] bts) {
String des = "";
String tmp = null;
for (int i = 0; i < bts.length; i++) {
tmp = (Integer.toHexString(bts[i] & 0xFF));
if (tmp.length() == 1) {
des += "0";
}
des += tmp;
}
return des;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ object Configuration extends Logging {

val IS_TEST_MODE = CommonVars("wds.linkis.test.mode", false)

val LINKIS_SYS_NAME = CommonVars("linkis.system.name", "")

val IS_PROMETHEUS_ENABLE = CommonVars("wds.linkis.prometheus.enable", false)

val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false).getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.linkis.monitor.core.scanner.AnomalyScanner;
import org.apache.linkis.monitor.core.scanner.DefaultScanner;
import org.apache.linkis.monitor.factory.MapperFactory;
import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeAlertSender;
import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeRule;
import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrCodeRule;
import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender;
import org.apache.linkis.monitor.jobhistory.index.JobIndexRule;
Expand Down Expand Up @@ -171,6 +173,14 @@ public void jobHistoryFinishedScan() {
} catch (Exception e) {
logger.warn("CommonJobRunTimeRule Scan Error msg: " + e.getMessage());
}
// 新增失败任务分析扫描
try {
JobHistoryAnalyzeRule jobHistoryAnalyzeRule =
new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender());
scanner.addScanRule(jobHistoryAnalyzeRule);
} catch (Exception e) {
logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage());
}
// 执行任务扫描
JobMonitorUtils.run(scanner, fetchers, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.linkis.monitor.entity.IndexEntity;
import org.apache.linkis.monitor.jobhistory.entity.JobHistory;
import org.apache.linkis.monitor.request.*;
import org.apache.linkis.monitor.response.AnalyzeJobResultAction;
import org.apache.linkis.monitor.response.EntranceTaskResult;
import org.apache.linkis.monitor.response.KeyvalueResult;
import org.apache.linkis.monitor.response.KillJobResultAction;
Expand Down Expand Up @@ -157,4 +158,15 @@ public static void killJob(JobHistory jobHistory) {
KillJobResultAction killJobResultAction = client.killJob(killJobAction);
Map data = MapUtils.getMap(killJobResultAction.getResultMap(), "data", new HashMap<>());
}

public static void analyzeJob(JobHistory jobHistory) {
MonitorHTTPClient client = ClientSingleton.getInstance();

AnalyzeJobAction analyzeJobAction =
AnalyzeJobAction.newBuilder()
.setTaskID(String.valueOf(jobHistory.getId()))
.setUser(Constants.ADMIN_USER())
.build();
AnalyzeJobResultAction analyzeJobResultAction = client.analyzeJob(analyzeJobAction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrat
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
import org.apache.linkis.httpclient.response.Result
import org.apache.linkis.monitor.request.{
AnalyzeJobAction,
DataSourceParamsAction,
EmsListAction,
EntranceTaskAction,
KeyvalueAction,
KillJobAction,
MonitorAction
}
import org.apache.linkis.monitor.response.{EntranceTaskResult, KeyvalueResult, KillJobResultAction}
import org.apache.linkis.monitor.response.{
AnalyzeJobResultAction,
EntranceTaskResult,
KeyvalueResult,
KillJobResultAction
}
import org.apache.linkis.ujes.client.response.EmsListResult

import java.io.Closeable
Expand Down Expand Up @@ -66,6 +72,10 @@ abstract class MonitorHTTPClient extends Closeable {
executeJob(killJobAction).asInstanceOf[KillJobResultAction]
}

def analyzeJob(analyzeJobAction: AnalyzeJobAction): AnalyzeJobResultAction = {
executeJob(analyzeJobAction).asInstanceOf[AnalyzeJobResultAction]
}

}

object MonitorHTTPClient {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.monitor.jobhistory.analyze

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.monitor.constants.Constants
import org.apache.linkis.monitor.core.ob.{Event, Observer}
import org.apache.linkis.monitor.jobhistory.entity.JobHistory
import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils, PooledImsAlertUtils}

import java.util

import scala.collection.JavaConverters._


class JobHistoryAnalyzeAlertSender() extends Observer with Logging {
override def update(e: Event, jobHistroyList: scala.Any): Unit = {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.monitor.jobhistory.analyze

import org.apache.linkis.monitor.core.ob.SingleObserverEvent

class JobHistoryAnalyzeHitEvent extends SingleObserverEvent
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.monitor.jobhistory.analyze

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.monitor.config.MonitorConfig
import org.apache.linkis.monitor.constants.Constants
import org.apache.linkis.monitor.core.ob.Observer
import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
import org.apache.linkis.monitor.jobhistory.entity.JobHistory
import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils, ThreadUtils}
import org.apache.linkis.monitor.utils.job.JohistoryUtils

import java.util

class JobHistoryAnalyzeRule(hitObserver: Observer)
extends AbstractScanRule(event = new JobHistoryAnalyzeHitEvent, observer = hitObserver)
with Logging {
private val scanRuleList = CacheUtils.cacheBuilder

/**
* if data match the pattern, return true and trigger observer should call isMatched()
*
* @param data
* @return
*/
override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
if (!getHitEvent.isRegistered) {
logger.error("ScanRule is not bind with an observer. Will not be triggered")
return false
}
for (scanedData <- JohistoryUtils.getJobhistorySanData(data)) {
scanedData match {
case jobHistory: JobHistory =>
val jobStatus = jobHistory.getStatus.toUpperCase()
if (Constants.FINISHED_JOB_STATUS.contains(jobStatus) && jobStatus.equals("FAILED")) {
// 执行任务分析
HttpsUntils.analyzeJob(jobHistory)
}
case _ =>
}
}
true
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.monitor.request

import org.apache.linkis.httpclient.request.GetAction

import org.apache.commons.lang3.StringUtils

class AnalyzeJobAction extends GetAction with MonitorAction {

override def suffixURLs: Array[String] = Array("jobhistory", "diagnosis-query")

}

object AnalyzeJobAction {

def newBuilder(): Builder = new Builder

class Builder private[AnalyzeJobAction] () {

private var taskID: String = _
private var user: String = _

def setTaskID(taskID: String): Builder = {
this.taskID = taskID
this
}

def setUser(user: String): Builder = {
this.user = user
this
}

def build(): AnalyzeJobAction = {
val analyzeJobAction = new AnalyzeJobAction
if (StringUtils.isNotBlank(taskID)) analyzeJobAction.setParameter("taskID", taskID)
if (StringUtils.isNotBlank(user)) analyzeJobAction.setUser(user)
analyzeJobAction
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.monitor.response

import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
import org.apache.linkis.httpclient.dws.response.DWSResult

import java.util

import scala.beans.BeanProperty

@DWSHttpMessageResult("/api/rest_j/v\\d+/jobhistory/diagnosis-query")
class AnalyzeJobResultAction extends DWSResult {

@BeanProperty
var messages: util.ArrayList[util.Map[String, AnyRef]] = _

}
Loading

0 comments on commit cc19a5f

Please sign in to comment.