diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SHAUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SHAUtils.java new file mode 100644 index 0000000000..8122de8558 --- /dev/null +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SHAUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.common.utils; + +import org.apache.linkis.common.conf.CommonVars; + +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public class SHAUtils { + + public static String DOCTOR_NONCE = "12345"; + public static final CommonVars DOCTOR_TOKEN = + CommonVars.apply("linkis.doctor.signature.token", ""); + + /** + * 对字符串加密,默认使用SHA-256 + * + * @param strSrc 要加密的字符串 + * @param encName 加密类型 + * @return + * @throws UnsupportedEncodingException + */ + public static String Encrypt(String strSrc, String encName) throws UnsupportedEncodingException { + MessageDigest md = null; + String strDes = null; + byte[] bt = strSrc.getBytes("utf-8"); + try { + if (encName == null || encName.equals("")) { + encName = "SHA-256"; + } + md = MessageDigest.getInstance(encName); + md.update(bt); + strDes = bytes2Hex(md.digest()); // to HexString + } catch (NoSuchAlgorithmException e) { + return null; + } + return strDes; + } + + public static String bytes2Hex(byte[] bts) { + String des = ""; + String tmp = null; + for (int i = 0; i < bts.length; i++) { + tmp = (Integer.toHexString(bts[i] & 0xFF)); + if (tmp.length() == 1) { + des += "0"; + } + des += tmp; + } + return des; + } +} diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala index fbcda761ee..d7484f92be 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala @@ -31,6 +31,8 @@ object Configuration extends Logging { val IS_TEST_MODE = CommonVars("wds.linkis.test.mode", false) + val LINKIS_SYS_NAME = CommonVars("linkis.system.name", "") + val IS_PROMETHEUS_ENABLE = CommonVars("wds.linkis.prometheus.enable", false) val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false).getValue diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index ad7b97ced6..ef02f182f2 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -23,6 +23,8 @@ import org.apache.linkis.monitor.core.scanner.AnomalyScanner; import org.apache.linkis.monitor.core.scanner.DefaultScanner; import org.apache.linkis.monitor.factory.MapperFactory; +import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeAlertSender; +import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeRule; import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrCodeRule; import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender; import org.apache.linkis.monitor.jobhistory.index.JobIndexRule; @@ -168,6 +170,14 @@ public void jobHistoryFinishedScan() { } catch (Exception e) { logger.warn("CommonJobRunTimeRule Scan Error msg: " + e.getMessage()); } + // 新增失败任务分析扫描 + try { + JobHistoryAnalyzeRule jobHistoryAnalyzeRule = + new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender()); + scanner.addScanRule(jobHistoryAnalyzeRule); + } catch (Exception e) { + logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage()); + } // 执行任务扫描 JobMonitorUtils.run(scanner, fetchers, true); diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java index 7480b3e15f..0476765594 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java @@ -27,6 +27,7 @@ import org.apache.linkis.monitor.entity.IndexEntity; import org.apache.linkis.monitor.jobhistory.entity.JobHistory; import org.apache.linkis.monitor.request.*; +import org.apache.linkis.monitor.response.AnalyzeJobResultAction; import org.apache.linkis.monitor.response.EntranceTaskResult; import org.apache.linkis.monitor.response.KeyvalueResult; import org.apache.linkis.monitor.response.KillJobResultAction; @@ -157,4 +158,15 @@ public static void killJob(JobHistory jobHistory) { KillJobResultAction killJobResultAction = client.killJob(killJobAction); Map data = MapUtils.getMap(killJobResultAction.getResultMap(), "data", new HashMap<>()); } + + public static void analyzeJob(JobHistory jobHistory) { + MonitorHTTPClient client = ClientSingleton.getInstance(); + + AnalyzeJobAction analyzeJobAction = + AnalyzeJobAction.newBuilder() + .setTaskID(String.valueOf(jobHistory.getId())) + .setUser(Constants.ADMIN_USER()) + .build(); + AnalyzeJobResultAction analyzeJobResultAction = client.analyzeJob(analyzeJobAction); + } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala index 6b76fd3c73..ab22404d59 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala @@ -27,6 +27,7 @@ import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrat import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder} import org.apache.linkis.httpclient.response.Result import org.apache.linkis.monitor.request.{ + AnalyzeJobAction, DataSourceParamsAction, EmsListAction, EntranceTaskAction, @@ -34,7 +35,12 @@ import org.apache.linkis.monitor.request.{ KillJobAction, MonitorAction } -import org.apache.linkis.monitor.response.{EntranceTaskResult, KeyvalueResult, KillJobResultAction} +import org.apache.linkis.monitor.response.{ + AnalyzeJobResultAction, + EntranceTaskResult, + KeyvalueResult, + KillJobResultAction +} import org.apache.linkis.ujes.client.response.EmsListResult import java.io.Closeable @@ -66,6 +72,10 @@ abstract class MonitorHTTPClient extends Closeable { executeJob(killJobAction).asInstanceOf[KillJobResultAction] } + def analyzeJob(analyzeJobAction: AnalyzeJobAction): AnalyzeJobResultAction = { + executeJob(analyzeJobAction).asInstanceOf[AnalyzeJobResultAction] + } + } object MonitorHTTPClient { diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala new file mode 100644 index 0000000000..517e854dec --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.analyze + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.{Event, Observer} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException +import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils, PooledImsAlertUtils} + +import java.util + +import scala.collection.JavaConverters._ + + +class JobHistoryAnalyzeAlertSender() extends Observer with Logging { + override def update(e: Event, jobHistroyList: scala.Any): Unit = {} + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala new file mode 100644 index 0000000000..675aacfc98 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.analyze + +import org.apache.linkis.monitor.core.ob.SingleObserverEvent + +class JobHistoryAnalyzeHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala new file mode 100644 index 0000000000..2a95de8571 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.analyze + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.config.MonitorConfig +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils, ThreadUtils} +import org.apache.linkis.monitor.utils.job.JohistoryUtils + +import java.util + +class JobHistoryAnalyzeRule(hitObserver: Observer) + extends AbstractScanRule(event = new JobHistoryAnalyzeHitEvent, observer = hitObserver) + with Logging { + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + if (!getHitEvent.isRegistered) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + for (scanedData <- JohistoryUtils.getJobhistorySanData(data)) { + scanedData match { + case jobHistory: JobHistory => + val jobStatus = jobHistory.getStatus.toUpperCase() + if (Constants.FINISHED_JOB_STATUS.contains(jobStatus) && jobStatus.equals("FAILED")) { + // 执行任务分析 + HttpsUntils.analyzeJob(jobHistory) + } + case _ => + } + } + true + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala new file mode 100644 index 0000000000..333c7ff1c2 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.request + +import org.apache.linkis.httpclient.request.GetAction + +import org.apache.commons.lang3.StringUtils + +class AnalyzeJobAction extends GetAction with MonitorAction { + + override def suffixURLs: Array[String] = Array("jobhistory", "diagnosis-query") + +} + +object AnalyzeJobAction { + + def newBuilder(): Builder = new Builder + + class Builder private[AnalyzeJobAction] () { + + private var taskID: String = _ + private var user: String = _ + + def setTaskID(taskID: String): Builder = { + this.taskID = taskID + this + } + + def setUser(user: String): Builder = { + this.user = user + this + } + + def build(): AnalyzeJobAction = { + val analyzeJobAction = new AnalyzeJobAction + if (StringUtils.isNotBlank(taskID)) analyzeJobAction.setParameter("taskID", taskID) + if (StringUtils.isNotBlank(user)) analyzeJobAction.setUser(user) + analyzeJobAction + } + + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala new file mode 100644 index 0000000000..3c7b0c03c3 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.response + +import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult +import org.apache.linkis.httpclient.dws.response.DWSResult + +import java.util + +import scala.beans.BeanProperty + +@DWSHttpMessageResult("/api/rest_j/v\\d+/jobhistory/diagnosis-query") +class AnalyzeJobResultAction extends DWSResult { + + @BeanProperty + var messages: util.ArrayList[util.Map[String, AnyRef]] = _ + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala new file mode 100644 index 0000000000..deedf5847b --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.utils.job + +import org.apache.linkis.monitor.core.pac.ScannedData + +import java.util + +import scala.collection.JavaConverters._ + +object JohistoryUtils { + + def getJobhistorySanData(data: util.List[ScannedData]): List[Any] = { + if (data == null) { + return List.empty[ScannedData] + } + val scalaData = data.asScala + val result = scalaData.flatMap { dataList => + if (dataList != null && dataList.getData() != null) { + dataList.getData().asScala + } else { + List.empty[ScannedData] + } + }.toList + result + } + +} diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobDiagnosisMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobDiagnosisMapper.java new file mode 100644 index 0000000000..24efb1a183 --- /dev/null +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobDiagnosisMapper.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.jobhistory.dao; + +import org.apache.linkis.jobhistory.entity.JobDiagnosis; + +public interface JobDiagnosisMapper { + void insert(JobDiagnosis jobDiagnosis); + + void deleteById(Long id); + + void update(JobDiagnosis jobDiagnosis); + + JobDiagnosis selectById(Long jobHistoryId); +} diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/JobDiagnosis.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/JobDiagnosis.java new file mode 100644 index 0000000000..9a232d1c6b --- /dev/null +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/JobDiagnosis.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.jobhistory.entity; + +import java.util.Date; + +public class JobDiagnosis { + + private Long id; + + private Long jobHistoryId; + + private String diagnosisContent; + + private Date createdTime; + + private Date updatedTime; + private String onlyRead; + + // Getters and Setters + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getJobHistoryId() { + return jobHistoryId; + } + + public void setJobHistoryId(Long jobHistoryId) { + this.jobHistoryId = jobHistoryId; + } + + public String getDiagnosisContent() { + return diagnosisContent; + } + + public void setDiagnosisContent(String diagnosisContent) { + this.diagnosisContent = diagnosisContent; + } + + public Date getCreatedTime() { + return createdTime; + } + + public void setCreatedTime(Date createdTime) { + this.createdTime = createdTime; + } + + public Date getUpdatedTime() { + return updatedTime; + } + + public void setUpdatedDate(Date updatedTime) { + this.updatedTime = updatedTime; + } + + public String getOnlyRead() { + return onlyRead; + } + + public void setOnlyRead(String onlyRead) { + this.onlyRead = onlyRead; + } + + @Override + public String toString() { + return "JobDiagnosis{" + + "id=" + + id + + ", jobHistoryId=" + + jobHistoryId + + ", diagnosisContent='" + + diagnosisContent + + '\'' + + ", createdTime=" + + createdTime + + ", updatedTime=" + + updatedTime + + '}'; + } +} diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index e9860af298..af87acb575 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -19,14 +19,17 @@ import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.common.exception.LinkisCommonErrorException; +import org.apache.linkis.common.utils.SHAUtils; import org.apache.linkis.governance.common.constant.job.JobRequestConstants; import org.apache.linkis.governance.common.entity.job.QueryException; import org.apache.linkis.jobhistory.cache.impl.DefaultQueryCacheManager; import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration; import org.apache.linkis.jobhistory.conversions.TaskConversions; import org.apache.linkis.jobhistory.entity.*; +import org.apache.linkis.jobhistory.service.JobHistoryDiagnosisService; import org.apache.linkis.jobhistory.service.JobHistoryQueryService; import org.apache.linkis.jobhistory.transitional.TaskStatus; +import org.apache.linkis.jobhistory.util.Constants; import org.apache.linkis.jobhistory.util.JobhistoryUtils; import org.apache.linkis.jobhistory.util.QueryUtils; import org.apache.linkis.protocol.constants.TaskConstant; @@ -50,6 +53,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -77,6 +81,7 @@ public class QueryRestfulApi { @Autowired private JobHistoryQueryService jobHistoryQueryService; @Autowired private DefaultQueryCacheManager queryCacheManager; + @Autowired private JobHistoryDiagnosisService jobHistoryDiagnosisService; @ApiOperation( value = "governanceStationAdmin", @@ -748,4 +753,111 @@ private List getJobhistoryList( } return queryTasks; } + + @ApiOperation(value = "get-doctor-signature", notes = "get signature", response = Message.class) + @ApiImplicitParams({ + @ApiImplicitParam(name = "applicationId", required = false, dataType = "String", value = "user") + }) + @RequestMapping(path = "/get-doctor-signature", method = RequestMethod.GET) + public Message getUserKeyValue( + HttpServletRequest req, + @RequestParam(value = "applicationId", required = false) String applicationId) + throws UnsupportedEncodingException { + if (StringUtils.isBlank(applicationId)) { + return Message.error("Invalid applicationId cannot be empty"); + } + Map parms = new HashMap<>(); + String timestampStr = String.valueOf(System.currentTimeMillis()); + parms.put("applicationId", applicationId); + parms.put("app_id", Configuration.LINKIS_SYS_NAME().getValue()); + parms.put("timestamp", timestampStr); + parms.put("nonce", SHAUtils.DOCTOR_NONCE); + // doctor提供的token + String token = SHAUtils.DOCTOR_TOKEN.getValue(); + if (StringUtils.isNotBlank(token)){ + String signature = + SHAUtils.Encrypt( + SHAUtils.Encrypt( + parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(), null) + + token, + null); + parms.put("signature", signature); + return Message.ok().data("doctor", parms); + } else { + return Message.error("Doctor token cannot be empty"); + } + } + + @ApiOperation( + value = "diagnosis-query", + notes = "query failed task diagnosis msg", + response = Message.class) + @RequestMapping(path = "/diagnosis-query", method = RequestMethod.GET) + public Message queryFailedTaskDiagnosis( + HttpServletRequest req, @RequestParam(value = "taskID", required = false) String taskID) { + String username = ModuleUserUtils.getOperationUser(req, "diagnosis-query"); + if (StringUtils.isBlank(taskID)) { + return Message.error("Invalid jobId cannot be empty"); + } + JobHistory jobHistory = null; + boolean isAdmin = Configuration.isJobHistoryAdmin(username) || Configuration.isAdmin(username); + boolean isDepartmentAdmin = Configuration.isDepartmentAdmin(username); + if (isAdmin) { + jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(Long.valueOf(taskID), null); + } else if (isDepartmentAdmin) { + String departmentId = JobhistoryUtils.getDepartmentByuser(username); + if (StringUtils.isNotBlank(departmentId)) { + List list = + jobHistoryQueryService.search( + Long.valueOf(taskID), + null, + null, + null, + null, + null, + null, + null, + null, + departmentId, + null); + if (!CollectionUtils.isEmpty(list)) { + jobHistory = list.get(0); + } + } + } else { + jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(Long.valueOf(taskID), username); + } + String diagnosisMsg = ""; + if (jobHistory != null) { + String jobStatus = jobHistory.getStatus().toUpperCase(); + JobDiagnosis jobDiagnosis = jobHistoryDiagnosisService.selectByJobId(Long.valueOf(taskID)); + if (null == jobDiagnosis) { + diagnosisMsg = JobhistoryUtils.getDiagnosisMsg(taskID); + jobDiagnosis = new JobDiagnosis(); + jobDiagnosis.setJobHistoryId(Long.valueOf(taskID)); + jobDiagnosis.setDiagnosisContent(diagnosisMsg); + jobDiagnosis.setCreatedTime(new Date()); + jobDiagnosis.setUpdatedDate(new Date()); + if (Constants.FINISHED_JOB_STATUS.contains(jobStatus)) { + jobDiagnosis.setOnlyRead("1"); + } + jobHistoryDiagnosisService.insert(jobDiagnosis); + } else { + if (StringUtils.isNotBlank(jobDiagnosis.getOnlyRead()) + && "1".equals(jobDiagnosis.getOnlyRead())) { + diagnosisMsg = jobDiagnosis.getDiagnosisContent(); + } else { + diagnosisMsg = JobhistoryUtils.getDiagnosisMsg(taskID); + jobDiagnosis.setDiagnosisContent(diagnosisMsg); + jobDiagnosis.setUpdatedDate(new Date()); + jobDiagnosis.setDiagnosisContent(diagnosisMsg); + if (Constants.FINISHED_JOB_STATUS.contains(jobStatus)) { + jobDiagnosis.setOnlyRead("1"); + } + jobHistoryDiagnosisService.update(jobDiagnosis); + } + } + } + return Message.ok().data("diagnosisMsg", diagnosisMsg); + } } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/Constants.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/Constants.java index 11ba76c688..3986fef636 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/Constants.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/Constants.java @@ -19,4 +19,9 @@ public class Constants { public static final String APPLICATION_NAME = "linkis-ps-publicservice"; + + public static final String UNFINISHED_JOB_STATUS = + "Inited,WaitForRetry,Scheduled,Running".toUpperCase(); + + public static final String FINISHED_JOB_STATUS = "Succeed,Failed,Cancelled,Timeout".toUpperCase(); } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java index 25ec3c4676..2792105673 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java @@ -32,6 +32,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -49,6 +50,9 @@ public class JobhistoryUtils { Sender.getSender( Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());; + public static final String shellPath = Configuration.getLinkisHome() + "/admin/"; + public static final String analyzeFilePath = "tools/linkis-analyze.sh"; + public static byte[] downLoadJobToExcel( List jobHistoryList, String language, @@ -172,4 +176,13 @@ private static String formatExecuteApplicationName( String executeApplicationName, String requestApplicationName) { return requestApplicationName + "/" + executeApplicationName; } + + public static String getDiagnosisMsg(String taskID) { + // 执行shell 脚本获取诊断信息 + List cmdlist = new ArrayList<>(); + cmdlist.add("sh"); + cmdlist.add(shellPath + analyzeFilePath); + cmdlist.add(taskID); + return Utils.exec(cmdlist.toArray(new String[3]), 600 * 1000L); + } } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml new file mode 100644 index 0000000000..a14328a69d --- /dev/null +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobDiagnosisMapper.xml @@ -0,0 +1,45 @@ + + + + + + + + INSERT INTO linkis_ps_job_history_diagnosis (job_history_id, diagnosis_content, created_time, updated_time, only_read) + VALUES (#{jobHistoryId}, #{diagnosisContent}, #{createdTime}, #{updatedTime}, #{onlyRead}) + + + + DELETE FROM linkis_ps_job_history_diagnosis WHERE job_history_id = #{jobHistoryId} + + + + UPDATE linkis_ps_job_history_diagnosis + + diagnosis_content = #{diagnosisContent}, + created_time = #{createdTime}, + updated_time = #{updatedTime}, + only_read = #{onlyRead} + + WHERE job_history_id = #{jobHistoryId} + + + + diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryDiagnosisService.java b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryDiagnosisService.java new file mode 100644 index 0000000000..3a2ba2326d --- /dev/null +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryDiagnosisService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.jobhistory.service; + + +import org.apache.linkis.jobhistory.entity.JobDiagnosis; + +import java.util.List; + +public interface JobHistoryDiagnosisService { + + void insert(JobDiagnosis jobDiagnosis); + + void deleteById(Long id); + + void update(JobDiagnosis jobDiagnosis); + + JobDiagnosis selectByJobId(Long id); + + +} diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDiagnosisServicelmpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDiagnosisServicelmpl.scala new file mode 100644 index 0000000000..d3eeff2588 --- /dev/null +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDiagnosisServicelmpl.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.jobhistory.service.impl + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.jobhistory.dao.JobDiagnosisMapper +import org.apache.linkis.jobhistory.entity.JobDiagnosis +import org.apache.linkis.jobhistory.service.JobHistoryDiagnosisService + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service + +import java.{lang, util} + +@Service +class JobHistoryDiagnosisServicelmpl extends JobHistoryDiagnosisService with Logging { + + @Autowired + private var jobDiagnosisMapper: JobDiagnosisMapper = _ + + override def insert(jobDiagnosis: JobDiagnosis): Unit = { + jobDiagnosisMapper.insert(jobDiagnosis) + } + + override def deleteById(id: lang.Long): Unit = { + jobDiagnosisMapper.deleteById(id) + } + + override def update(jobDiagnosis: JobDiagnosis): Unit = { + jobDiagnosisMapper.update(jobDiagnosis) + } + + override def selectByJobId(id: lang.Long): JobDiagnosis = { + jobDiagnosisMapper.selectById(id) + } + +}