diff --git a/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/impl/DefaultDataSourceService.java b/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/impl/DefaultDataSourceService.java index af44fdf07..b65749f76 100644 --- a/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/impl/DefaultDataSourceService.java +++ b/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/impl/DefaultDataSourceService.java @@ -859,7 +859,8 @@ public Map getDataSourceConnectParamsById(String operator, Long ""); Map connectParams = result.getConnectParams(); - if (Objects.nonNull(connectParams) && StringUtils.equals(String.valueOf(connectParams.get("isEncrypt")), "1")) { + if (Objects.nonNull(connectParams) && connectParams.containsKey("password") + && StringUtils.equals(String.valueOf(connectParams.get("isEncrypt")), "1")) { String decrypt = AESUtils.decrypt(connectParams.get("password").toString(), LINKIS_DATASOURCE_AES_KEY.getValue()); connectParams.replace("password", decrypt); diff --git a/exchangis-job/exchangis-job-launcher/pom.xml b/exchangis-job/exchangis-job-launcher/pom.xml index b06b04516..3c8c8b7d7 100644 --- a/exchangis-job/exchangis-job-launcher/pom.xml +++ b/exchangis-job/exchangis-job-launcher/pom.xml @@ -26,7 +26,7 @@ org.apache.linkis linkis-computation-client - ${linkis.version} + ${linkis-computation-client.version} diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/scheduler/tasks/SubmitSchedulerTask.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/scheduler/tasks/SubmitSchedulerTask.java index dfeca8b61..fb94da616 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/scheduler/tasks/SubmitSchedulerTask.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/scheduler/tasks/SubmitSchedulerTask.java @@ -83,7 +83,7 @@ public SubmitSchedulerTask(RateLimitService rateLimitService, LaunchableExchangi try { submitAble.set(submitCondition.call()); } catch (Exception e) { - // Ignore + LOG.warn("Error to set submit condition from callback, cause by: ", e); } } // Set max retry diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/subscriber/NewInTaskObserver.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/subscriber/NewInTaskObserver.java index 8454d85e5..6d84e3b7b 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/subscriber/NewInTaskObserver.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/subscriber/NewInTaskObserver.java @@ -86,10 +86,14 @@ public int subscribe(List publishedTasks) throws Exchan } } return success; + } else { + LOG.warn("Unable to get parallel, maybe reach the maxParallel"); } noParallel.set(true); return false; }, (submitTask, e) -> { + LOG.info("Unable to launch the task with job_execution_id {} by user {}, decrease the parallel ,cause by {}" + , launchableExchangisTask.getExecuteUser(), launchableExchangisTask.getJobExecutionId(), e); TaskParallelRule parallelRule = parallelManager.getOrCreateRule(launchableExchangisTask.getExecuteUser(), TaskParallelManager.Operation.SUBMIT); // Decrease the parallel @@ -103,6 +107,7 @@ public int subscribe(List publishedTasks) throws Exchan if (submitSchedulerTask.isSubmitAble()) { submitSchedulerTask.setTenancy(launchableExchangisTask.getExecuteUser()); try { + LOG.info("Do submit task with id {} and job_execution_id {}", launchableExchangisTask.getId(), launchableExchangisTask.getJobExecutionId()); taskExecution.submit(submitSchedulerTask); } catch (Exception e) { // If the consumer queue is full? @@ -144,8 +149,8 @@ public void discard(List unsubscribeTasks) { //Ignore } } - Queue queue = getCacheQueue(); - unsubscribeTasks.forEach(queue::offer); +// Queue queue = getCacheQueue(); +// unsubscribeTasks.forEach(queue::offer); } } } \ No newline at end of file diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/utils/SQLCommandUtils.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/utils/SQLCommandUtils.java index 2a6d24b32..fcdbc9053 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/utils/SQLCommandUtils.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/utils/SQLCommandUtils.java @@ -37,7 +37,9 @@ public static String contactSql(List tables, List alias, private static String columnListSql(List columns){ StringBuilder builder = new StringBuilder(); for(int i = 0; i < columns.size(); i++){ + builder.append("`"); builder.append(columns.get(i)); + builder.append("`"); if(i < columns.size() - 1){ builder.append(DEFAULT_COLUMN_SEPARATOR); } diff --git a/pom.xml b/pom.xml index c3f288ae4..10d9edcc0 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ 1.1.7-webank 1.8.0-SNAPSHOT 1.5.0-wds-SNAPSHOT + 1.6.0-wds 1.5.0-wds-SNAPSHOT 0.1.0-SNAPSHOT 3.8.1