Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into fix-3636
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Sep 25, 2024
2 parents de8fb39 + cf80e45 commit 4ea3f15
Show file tree
Hide file tree
Showing 32 changed files with 252 additions and 106 deletions.
3 changes: 3 additions & 0 deletions dinky-admin/src/main/java/org/dinky/aop/LogAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ protected void handleCommonLogic(final JoinPoint joinPoint, final Exception e, O
// *========数据库日志=========*//
OperateLog operLog = new OperateLog();
Result<Void> result = JsonUtils.toBean(jsonResult, new TypeReference<Result<Void>>() {});
if (result == null) {
result = Result.failed();
}
operLog.setStatus(result.isSuccess() ? BusinessStatus.SUCCESS.ordinal() : BusinessStatus.FAIL.ordinal());

// 请求的地址
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -66,7 +67,6 @@
@Api(tags = "UDF & App Jar Controller")
@RequestMapping("/download")
public class DownloadController {
// todo: Controller has injection risk
@GetMapping("downloadDepJar/{taskId}")
@ApiOperation("Download UDF Jar")
public void downloadJavaUDF(@PathVariable Integer taskId, HttpServletResponse resp) {
Expand Down Expand Up @@ -131,12 +131,15 @@ public void downloadFromRs(String path, HttpServletResponse resp) {
ServletUtil.write(resp, inputStream);
}

// todo: There is a risk of injection in this interface
@PostMapping("uploadFromRsByLocal")
@ApiOperation("Upload From Resource By Local")
@SaIgnore
public Result<Void> uploadFromRs(String path, @RequestParam("file") MultipartFile file) {
public Result<Void> uploadFromRs(
String path, @RequestParam("file") MultipartFile file, @RequestHeader("token") String token) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
if (!systemConfiguration.getDinkyToken().getValue().equals(token)) {
return Result.failed("token is not correct");
}
if (!systemConfiguration.getResourcesEnable().getValue()
|| !systemConfiguration.getResourcesModel().getValue().equals(ResourcesModelEnum.LOCAL)) {
return Result.failed("resources model is not local or resources is not enable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ private void executeAlertAction(Facts facts, AlertRuleDTO alertRuleDTO) throws E

if (!Asserts.isNull(task.getAlertGroup())) {
// 获取任务的责任人和维护人对应的用户信息|Get the responsible person and maintainer of the task
User ownerInfo = userCache.get(task.getFirstLevelOwner());
Integer owner = task.getFirstLevelOwner();
User ownerInfo = owner == null ? null : userCache.get(owner);
List<User> maintainerInfo = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(task.getSecondLevelOwners())) {
for (Integer secondLevelOwner : task.getSecondLevelOwners()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ public void flush() throws IOException {
}

private void sendFile() {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
try (HttpResponse httpResponse = HttpUtil.createPost(
SystemConfiguration.getInstances().getDinkyAddr().getValue() + "/download/uploadFromRsByLocal")
systemConfiguration.getDinkyAddr().getValue() + "/download/uploadFromRsByLocal")
.header("token", systemConfiguration.getDinkyToken().getValue())
.form("file", file)
.form("path", path.toString())
.execute()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
package org.dinky.context;

import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @since 0.7.0
*/
public class FlinkUdfPathContextHolder {
private static final List<String> PYTHON_FILE_SUFFIX =
Arrays.asList(".zip", ".py", ".pyc", ".pyo", ".pyd", ".pyw", ".pyz", ".pyzw");

private final Set<File> UDF_PATH_CONTEXT = new HashSet<>();
private final Set<File> OTHER_PLUGINS_PATH_CONTEXT = new HashSet<>();
private final Set<File> PYTHON_UDF_FILE = new HashSet<>();
private final Set<File> FILES = new HashSet<>();

public void addUdfPath(File file) {
Expand All @@ -54,7 +58,10 @@ public Set<File> getUdfFile() {
}

public Set<File> getPyUdfFile() {
return PYTHON_UDF_FILE;
return getAllFileSet().stream()
.filter(file -> PYTHON_FILE_SUFFIX.stream()
.anyMatch(suffix -> file.getName().endsWith(suffix)))
.collect(Collectors.toSet());
}

public Set<File> getOtherPluginsFiles() {
Expand Down
3 changes: 3 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ public enum Status {
SYS_ENV_SETTINGS_DINKYADDR(116, "sys.env.settings.dinkyAddr"),
SYS_ENV_SETTINGS_DINKYADDR_NOTE(117, "sys.env.settings.dinkyAddr.note"),

SYS_ENV_SETTINGS_DINKYTOKEN(116, "sys.env.settings.dinkyToken"),
SYS_ENV_SETTINGS_DINKYTOKEN_NOTE(117, "sys.env.settings.dinkyToken.note"),

SYS_ENV_SETTINGS_JOB_RESEND_DIFF_SECOND(118, "sys.env.settings.jobResendDiffSecond"),
SYS_ENV_SETTINGS_JOB_RESEND_DIFF_SECOND_NOTE(119, "sys.env.settings.jobResendDiffSecond.note"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,14 @@ public static Configuration.OptionBuilder key(Status status) {
.stringType()
.defaultValue("python3")
.note(Status.SYS_ENV_SETTINGS_PYTHONHOME_NOTE);

private final Configuration<String> dinkyAddr = key(Status.SYS_ENV_SETTINGS_DINKYADDR)
.stringType()
.defaultValue(System.getProperty("dinkyAddr"))
.note(Status.SYS_ENV_SETTINGS_DINKYADDR_NOTE);
private final Configuration<String> dinkyToken = key(Status.SYS_ENV_SETTINGS_DINKYTOKEN_NOTE)
.stringType()
.defaultValue("efda1551-7958-4e0f-80a8-dfd107df3e38")
.note(Status.SYS_ENV_SETTINGS_DINKYTOKEN);

private final Configuration<Integer> jobReSendDiffSecond = key(Status.SYS_ENV_SETTINGS_JOB_RESEND_DIFF_SECOND)
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ sys.env.settings.pythonHome=Python Env variables
sys.env.settings.pythonHome.note=Python environment variables, used to submit Python tasks and build Python Udf
sys.env.settings.dinkyAddr=Dinky Address
sys.env.settings.dinkyAddr.note=This address is an accessible Dinky address, such as http://127.0.0.1:8888
sys.env.settings.dinkyToken=Dinky Token
sys.env.settings.dinkyToken.note=This token is used to access some interfaces within Dinky, such as uploading resources
sys.env.settings.jobResendDiffSecond=Alert anti-resend interval
sys.env.settings.jobResendDiffSecond.note=During this interval, when the Alert information sent reaches the configured value of [Maximum number of alarm resend prevention], after reaching the threshold, the Alert information will no longer be sent; unit: seconds
sys.env.settings.diffMinuteMaxSendCount=Maximum number of alarms to prevent resending
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ sys.env.settings.pythonHome=Python 环境变量
sys.env.settings.pythonHome.note=Python 环境变量,用于提交 Python 任务以及构建 Python Udf
sys.env.settings.dinkyAddr=Dinky 地址
sys.env.settings.dinkyAddr.note=该地址为可访问的 Dinky 地址,如 http://127.0.0.1:8888
sys.env.settings.dinkyToken=Dinky Token
sys.env.settings.dinkyToken.note = 此 token 用于访问 Dinky 内的一些接口,如上传资源等
sys.env.settings.jobResendDiffSecond=告警防重发间隔
sys.env.settings.jobResendDiffSecond.note=在此间隔内,发送告警信息达到 [告警防重发最大条数] 配置的值时,达到阈值后,不再发送告警信息; 单位:秒
sys.env.settings.diffMinuteMaxSendCount=告警防重发最大条数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,6 +43,7 @@
@Slf4j
@Setter
@Getter
@NoArgsConstructor
public class SelectResult extends AbstractResult implements IResult {

private String jobID;
Expand All @@ -65,7 +67,6 @@ public SelectResult(
this.columns = columns;
this.jobID = jobID;
this.success = success;
// this.endTime = LocalDateTime.now();
this.isDestroyed = false;
}

Expand Down
10 changes: 5 additions & 5 deletions dinky-flink/dinky-flink-1.20/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.20</artifactId>
<version>24.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
Expand Down Expand Up @@ -138,11 +143,6 @@
<artifactId>commons-cli</artifactId>
<version>${commons.version}</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.19</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
Expand Down
22 changes: 12 additions & 10 deletions dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public YarnGateway(GatewayConfig config) {
super(config);
}

@Override
public void init() {
initConfig();
initYarnClient();
Expand Down Expand Up @@ -192,7 +193,7 @@ private void initYarnClient() {
hadoopUserName = "hdfs";
}

// 设置 yarn 提交的用户名
// Set the username for the yarn submission
String yarnUser = configuration.get(CustomerConfigureOptions.YARN_APPLICATION_USER);
if (StrUtil.isNotBlank(yarnUser)) {
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(yarnUser));
Expand All @@ -211,6 +212,7 @@ private Path getYanConfigFilePath(String path) {
return new Path(URI.create(config.getClusterConfig().getHadoopConfigPath() + "/" + path));
}

@Override
public SavePointResult savepointCluster(String savePoint) {
if (Asserts.isNull(yarnClient)) {
init();
Expand All @@ -221,6 +223,7 @@ public SavePointResult savepointCluster(String savePoint) {
return runClusterSavePointResult(savePoint, applicationId, clusterDescriptor);
}

@Override
public SavePointResult savepointJob(String savePoint) {
if (Asserts.isNull(yarnClient)) {
init();
Expand Down Expand Up @@ -253,13 +256,14 @@ private void autoCancelCluster(ClusterClient<ApplicationId> clusterClient) {
Thread.sleep(3000);
clusterClient.shutDownCluster();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error(e.getMessage());
} finally {
clusterClient.close();
}
});
}

@Override
public TestResult test() {
try {
initConfig();
Expand Down Expand Up @@ -365,13 +369,12 @@ protected YarnClusterDescriptor createYarnClusterDescriptorWithJar(FlinkUdfPathC
}

protected YarnClusterDescriptor createInitYarnClusterDescriptor() {
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
true);
return yarnClusterDescriptor;
}

protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResult result)
Expand Down Expand Up @@ -464,7 +467,7 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);

if (HighAvailabilityMode.ZOOKEEPER == highAvailabilityMode) {
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, appId);
configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, appId);
String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);

if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
Expand Down Expand Up @@ -495,13 +498,13 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error("", e);
} finally {
if (Asserts.isNotNull(zooKeeper)) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("", e);
}
}
}
Expand All @@ -515,12 +518,11 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
* Creates a ZooKeeper path of the form "/a/b/.../z".
*/
private static String generateZookeeperPath(String... paths) {
final String result = Arrays.stream(paths)

return Arrays.stream(paths)
.map(YarnGateway::trimSlashes)
.filter(s -> !s.isEmpty())
.collect(Collectors.joining("/", "/", ""));

return result;
}

private static String trimSlashes(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public OracleTypeConvert() {
this.convertMap.clear();
register("char", ColumnType.STRING);
register("date", ColumnType.LOCAL_DATETIME);
register("time", ColumnType.TIME);
register("timestamp", ColumnType.TIMESTAMP);
register("time", ColumnType.TIME);
register("number", OracleTypeConvert::convertNumber);
register("float", ColumnType.JAVA_LANG_FLOAT);
register("clob", ColumnType.STRING);
Expand Down
7 changes: 4 additions & 3 deletions dinky-web/src/components/Flink/FlinkDag/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ function getMaxWidthAndDepth(edges: CusEdge[]): { maxWidth: number; maxDepth: nu
graph[sourceCell].push(targetCell);
});

const maxSource = Object.keys(sourceCount).reduce((a, b) =>
sourceCount[a] > sourceCount[b] ? a : b
);
const maxSource =
Object.keys(sourceCount).length < 1
? '1'
: Object.keys(sourceCount).reduce((a, b) => (sourceCount[a] > sourceCount[b] ? a : b));
const maxWidth = sourceCount[maxSource];

const visited: Record<string, boolean> = {};
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/components/Flink/OptionsSelect/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const FlinkOptionsSelect = (props: FlinkOptionsProps) => {
return (
<ProFormSelect
{...props}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item) }}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item), virtual: false }}
/>
);
};
Expand Down
Loading

0 comments on commit 4ea3f15

Please sign in to comment.