Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Jan 25, 2024
2 parents fc2cace + 6b8d51b commit 890d0a7
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 36 deletions.
7 changes: 4 additions & 3 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.dinky.data.enums.Status;
import org.dinky.data.exception.BusException;
import org.dinky.data.result.Result;
import org.dinky.utils.I18n;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -47,7 +46,6 @@

import cn.dev33.satoken.exception.NotLoginException;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -64,9 +62,6 @@ public class WebExceptionHandler {
@ResponseBody
public Result<Void> busException(BusException e) {
log.error("BusException:", e);
if (StrUtil.isEmpty(e.getMsg())) {
return Result.failed(I18n.getMessage(e.getCode(), e.getMessage()));
}
return Result.failed(e.getMsg());
}

Expand Down
9 changes: 8 additions & 1 deletion dinky-admin/src/test/java/org/dinky/utils/SqlUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ public void testRemoveNote() {

String removedNoteSql = SqlUtil.removeNote(testSql);
Assertions.assertThat(removedNoteSql).isNotNull();
Assertions.assertThat(removedNoteSql).isNotEqualTo(testSql);
Assertions.assertThat(removedNoteSql)
.isEqualTo("//test2\n" + "\n"
+ "\n"
+ "select 1 \n"
+ " from test # test9\n"
+ " where '1' <> '-- ::.' //test6\n"
+ " and 1=1 \n"
+ " and 'zz' <> null;");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.dinky.data.enums.Status;

import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -43,47 +42,45 @@ public class BusException extends RuntimeException {
private static final long serialVersionUID = -2955156471454043812L;

/** 异常信息的code码 */
private String code;

/** 如果有值得话,将不会取i18n里面的错误信息 */
private String msg;
private Status code;

/** 异常信息的参数 */
private Object[] errorArgs;

/** 如果有值得话,将不会取i18n里面的错误信息 */
private String msg;

public BusException(String message) {
super(message);
setMsg(message);
}

public BusException(Status status) {
super(status.getMessage());
setCode(String.valueOf(status.getCode()));
setCode(status);
setMsg(status.getMessage());
}

public BusException(Status status, Object... errorArgs) {
super(status.getMessage());
setCode(String.valueOf(status.getCode()));
setCode(status);
setMsg(StrUtil.format(status.getMessage(), errorArgs));
}

public BusException(String message, Object... args) {
super();
setCode(message);
setErrorArgs(args);
}

/**
* An exception that gets the error message through i 18n
*
* @param code code
* @param message code
* @param e e
* @return {@link BusException}
*/
public static BusException valueOf(String code, Throwable e) {
String errMsg = ExceptionUtil.stacktraceToString(e);
log.error(errMsg);
public static BusException valueOf(String message, Throwable e) {
log.error(message, e);
return new BusException(message + e.getMessage());
}

public static BusException valueOf(Status code, Throwable e) {
log.error(code.getMessage(), e);
return new BusException(code, e.getMessage());
}

Expand All @@ -94,7 +91,7 @@ public static BusException valueOf(String code, Throwable e) {
* @param errorArgs errorArgs
* @return {@link BusException}
*/
public static BusException valueOf(String code, Object... errorArgs) {
public static BusException valueOf(Status code, Object... errorArgs) {
return new BusException(code, errorArgs);
}

Expand All @@ -104,7 +101,7 @@ public static BusException valueOf(String code, Object... errorArgs) {
* @param msg msg
* @return {@link BusException}
*/
public static BusException valueOfMsg(String msg) {
public static BusException valueOf(String msg) {
return new BusException(msg);
}
}
2 changes: 1 addition & 1 deletion dinky-common/src/main/java/org/dinky/utils/SqlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static String removeNote(String sql) {
// Remove the special-space characters
sql = sql.replaceAll("\u00A0", " ").replaceAll("[\r\n]+", "\n");
// Remove annotations Support '--aa' , '/**aaa*/' , '//aa' , '#aaa'
Pattern p = Pattern.compile("(?ms)('(?:''|[^'])*')|--.*?$|//.*?$|/\\*[^+].*?\\*/|#.*?$|");
Pattern p = Pattern.compile("(?ms)('(?:''|[^'])*')|--.*?$|/\\*[^+].*?\\*/|");
String presult = p.matcher(sql).replaceAll("$1");
return presult.trim();
}
Expand Down
9 changes: 5 additions & 4 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -256,12 +257,12 @@ public JobResult executeJarSql(String statement) throws Exception {
ready();
JobJarStreamGraphBuilder jobJarStreamGraphBuilder = JobJarStreamGraphBuilder.build(this);
StreamGraph streamGraph = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader());
Configuration configuration =
executor.getCustomTableEnvironment().getConfig().getConfiguration();
if (Asserts.isNotNullString(config.getSavePointPath())) {
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
config.getSavePointPath(),
executor.getStreamExecutionEnvironment()
.getConfiguration()
.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
try {
if (!useGateway) {
Expand All @@ -282,7 +283,7 @@ public JobResult executeJarSql(String statement) throws Exception {
}
} else {
GatewayResult gatewayResult;
config.addGatewayConfig(executor.getSetConfig());
config.addGatewayConfig(configuration);
if (runMode.isApplicationMode()) {
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(getUdfPathContextHolder());
} else {
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/user_guide/studio/flink_sql_task_devlop.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ EXECUTE JAR WITH (
**样例代码**

```sql
set 'execution.checkpointing.interval'='21 s';

EXECUTE JAR WITH (
'uri'='rs:/jar/flink/demo/SocketWindowWordCount.jar',
'main-class'='org.apache.flink.streaming.examples.socket',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,27 @@ HDFS, OSS 三种文件系统.

### HDFS 配置

![global_setting_resource_hdfs](http://pic.dinky.org.cn/dinky/docs/test/global_setting_resource_hdfs.png)
![global_setting_resource_hdfs](http://pic.dinky.org.cn/dinky/docs/zh-CN/user_guide/system_setting/global_settings/resource_setting/global_setting_resource_hdfs.png)

| 参数名称 | 参数说明 | 默认值 |
|----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| 是否启用Resource | 启用资源管理功能,如果切换存储模式时,需关闭此开关,相关配置完成后,再开启。 | true |
| 存储模式 | 支持HDFS、S3(Minio、阿里云OSS、腾讯云COS等..),切换选项后即可生效。 | Local |
| 上传目录的根路径 | 资源存储在HDFS/OSS (S3)路径上,资源文件将存储到此基本路径,自行配置,请确保该目录存在于相关存储系统上并具有读写权限。<br/>如果是本地存储,则写本地文件存储路径,如 /User/xxx/data<br/>如果是 HDFS 存储则写 HDFS 文件访问路径,如 hdfs://localhost:9000/user/xxx<br/>如果是 OSS 存储则写 OSS 文件访问路径,如 oss://dinky/xxx ||
| HDFS操作用户名 | hdfs用户名 | hdfs |
| HDFS defaultFS | fs.defaultFS 配置项,例如: 远程 HDFS:hdfs://localhost:9000,本地:file:/// | file:/// |
| HDFS defaultFS | fs.defaultFS 配置项,例如: 远程 HDFS:hdfs://localhost:9000,本地:file:/// 高可用: hdfs://namespace | file:/// |
| core-site.xml | core-site.xml 配置文件内容,高可用必填 , 请手动将配置文件内容填入到此处(全选复制粘贴即可) | file:/// | | file:/// |
| hdfs-site.xml | hdfs-site.xml 配置文件内容,高可用必填 , 请手动将配置文件内容填入到此处(全选复制粘贴即可) | file:/// |

:::warning 注意
HDFS defaultFS 配置项,目前暂时不支持高可用 ,请填写可用节点(active)的 HDFS 地址

1. HDFS defaultFS 配置项,可以支持高可用, 但是需要配置 core-site.xml 和 hdfs-site.xml 文件内容,
请手动将配置文件内容写到此处(全选复制粘贴即可)
2. core-site.xml 和 hdfs-site.xml 仅在你的 HDFS 是高可用模式时才需要填写, 如果是其他模式,则不需要填写
3. core-site.xml 和 hdfs-site.xml 不是填写文件路径,而是文件内容,这么做的好处在于 在 Dinky 中提交 Application 任务时可以直接使用
HDFS 文件系统(高可用模式), 达到全模式通用目的
4. 请自行确保上述配置文件内容正确, 否则会导致 HDFS 文件系统无法正常使用

:::

### OSS 配置
Expand Down

0 comments on commit 890d0a7

Please sign in to comment.