diff --git a/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java index 23850a3f3e..ae0c2a213b 100644 --- a/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/ClusterInstanceService.java @@ -22,6 +22,7 @@ import org.dinky.cluster.FlinkClusterInfo; import org.dinky.data.dto.ClusterInstanceDTO; import org.dinky.data.model.ClusterInstance; +import org.dinky.job.JobConfig; import org.dinky.mybatis.service.ISuperService; import java.util.List; @@ -49,26 +50,10 @@ public interface ClusterInstanceService extends ISuperService { /** * build environment address * - * @param useRemote {@link Boolean} use remote or local - * @param id {@link Integer} cluster id - * @return {@link String} eg: host1:8081 - */ - String buildEnvironmentAddress(boolean useRemote, Integer id); - - /** - * build remote environment address by cluster id - * - * @param id {@link Integer} cluster id - * @return {@link String} eg: host1:8081 - */ - String buildRemoteEnvironmentAddress(Integer id); - - /** - * build local environment address - * + * @param config {@link JobConfig} the config of job * @return {@link String} eg: host1:8081 */ - String buildLocalEnvironmentAddress(); + String buildEnvironmentAddress(JobConfig config); /** * list enabled cluster instances diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java index f12a904a24..9c32d2ecec 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java @@ -23,7 +23,6 @@ import org.dinky.assertion.Asserts; import org.dinky.cluster.FlinkCluster; import org.dinky.cluster.FlinkClusterInfo; -import org.dinky.constant.FlinkConstant; import org.dinky.data.dto.ClusterInstanceDTO; import org.dinky.data.model.ClusterConfiguration; import org.dinky.data.model.ClusterInstance; @@ -31,16 +30,18 @@ import org.dinky.gateway.exception.GatewayException; import org.dinky.gateway.model.FlinkClusterConfig; import org.dinky.gateway.result.GatewayResult; +import org.dinky.job.JobConfig; import org.dinky.job.JobManager; import org.dinky.mapper.ClusterInstanceMapper; import org.dinky.mybatis.service.impl.SuperServiceImpl; import org.dinky.service.ClusterConfigurationService; import org.dinky.service.ClusterInstanceService; +import org.dinky.utils.IpUtils; +import org.dinky.utils.URLUtils; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.time.LocalDateTime; import java.util.List; +import java.util.Map; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -87,30 +88,30 @@ public String getJobManagerAddress(ClusterInstance clusterInstance) { } @Override - public String buildEnvironmentAddress(boolean useRemote, Integer id) { - if (useRemote && id != 0) { + public String buildEnvironmentAddress(JobConfig config) { + Integer id = config.getClusterId(); + Boolean useRemote = config.isUseRemote(); + if (useRemote && id != null && id != 0) { return buildRemoteEnvironmentAddress(id); } else { - return buildLocalEnvironmentAddress(); + Map flinkConfig = config.getConfigJson(); + int port; + if (Asserts.isNotNull(flinkConfig) && flinkConfig.containsKey("rest.port")) { + port = Integer.valueOf(flinkConfig.get("rest.port")); + } else { + port = URLUtils.getRandomPort(); + } + return buildLocalEnvironmentAddress(port); } } - @Override - public String buildRemoteEnvironmentAddress(Integer id) { + private String buildRemoteEnvironmentAddress(Integer id) { return getJobManagerAddress(getById(id)); } - @Override - public String buildLocalEnvironmentAddress() { - try { - InetAddress inetAddress = InetAddress.getLocalHost(); - if (inetAddress != null) { - return inetAddress.getHostAddress(); - } - } catch (UnknownHostException e) { - e.printStackTrace(); - } - return FlinkConstant.LOCAL_HOST; + private String buildLocalEnvironmentAddress(int port) { + String host = IpUtils.getHostIp(); + return host + ":" + port; } @Override diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 935def89f5..f738bcf2f8 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -224,14 +224,15 @@ public JobConfig buildJobConfig(TaskDTO task) { JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId()); config.setClusterId(jobInstance.getClusterId()); } + } else if (GatewayType.LOCAL.equalsValue(task.getType())) { + config.setClusterId(null); + config.setClusterConfigurationId(null); } else { Optional.ofNullable(task.getClusterId()).ifPresent(config::setClusterId); } log.info("Init remote cluster"); try { - Optional.ofNullable(config.getClusterId()).ifPresent(i -> { - config.setAddress(clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), i)); - }); + config.setAddress(clusterInstanceService.buildEnvironmentAddress(config)); } catch (Exception e) { log.error("Init remote cluster error", e); } diff --git a/dinky-common/src/main/java/org/dinky/utils/URLUtils.java b/dinky-common/src/main/java/org/dinky/utils/URLUtils.java index e231d425c9..076827eb9d 100644 --- a/dinky-common/src/main/java/org/dinky/utils/URLUtils.java +++ b/dinky-common/src/main/java/org/dinky/utils/URLUtils.java @@ -26,6 +26,7 @@ import java.net.URL; import java.net.URLConnection; import java.util.Arrays; +import java.util.Random; import java.util.Set; import java.util.stream.Collectors; @@ -103,4 +104,9 @@ public static String formatAddress(String webURL) { return ""; } } + + public static int getRandomPort() { + Random random = new Random(); + return random.nextInt(65536); + } } diff --git a/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java b/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java index 0c474c1032..3ee14381a6 100644 --- a/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java +++ b/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java @@ -25,17 +25,8 @@ * @since 2021/5/25 14:39 */ public interface FlinkConstant { - - /** flink端口 */ - Integer FLINK_REST_DEFAULT_PORT = 8081; - /** flink会话默认个数 */ - Integer DEFAULT_SESSION_COUNT = 256; - /** flink加载因子 */ - Double DEFAULT_FACTOR = 0.75; /** 本地模式host */ - String LOCAL_HOST = "localhost:8081"; + String LOCAL_HOST = "localhost"; /** changlog op */ String OP = "op"; - - String DEFAULT_FLINK_HOME = "/opt/flink"; } diff --git a/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java b/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java index 566c468f5b..d55a9ee37c 100644 --- a/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java +++ b/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java @@ -90,6 +90,13 @@ public class ExecutorConfig { notes = "Flag indicating whether to use batch model") private boolean useBatchModel; + @ApiModelProperty( + value = "Whether to only build plans", + dataType = "boolean", + example = "true", + notes = "Build plan only") + private boolean isPlan; + @ApiModelProperty( value = "Checkpoint interval", dataType = "Integer", @@ -263,6 +270,14 @@ public boolean isValidJarFiles() { return Asserts.isNotNull(this.getJarFiles()); } + public boolean isPlan() { + return isPlan; + } + + public void setPlan(boolean plan) { + isPlan = plan; + } + @Override public String toString() { return String.format( diff --git a/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java b/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java index c4a421e035..3977401770 100644 --- a/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java +++ b/dinky-core/src/main/java/org/dinky/executor/LocalBatchExecutor.java @@ -47,13 +47,12 @@ public LocalBatchExecutor(ExecutorConfig executorConfig) { .map(FileUtil::getAbsolutePath) .collect(Collectors.joining(","))); } - if (executorConfig.isValidConfig()) { + if (!executorConfig.isPlan()) { Configuration configuration = Configuration.fromMap(executorConfig.getConfig()); - if (configuration.contains(RestOptions.PORT)) { - this.environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); - } else { - this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration); + if (!configuration.contains(RestOptions.PORT)) { + configuration.set(RestOptions.PORT, executorConfig.getPort()); } + this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration); } else { this.environment = StreamExecutionEnvironment.createLocalEnvironment(); } diff --git a/dinky-core/src/main/java/org/dinky/executor/LocalStreamExecutor.java b/dinky-core/src/main/java/org/dinky/executor/LocalStreamExecutor.java index b7a8d60f2d..c362eba6ab 100644 --- a/dinky-core/src/main/java/org/dinky/executor/LocalStreamExecutor.java +++ b/dinky-core/src/main/java/org/dinky/executor/LocalStreamExecutor.java @@ -47,13 +47,12 @@ public LocalStreamExecutor(ExecutorConfig executorConfig) { .map(FileUtil::getAbsolutePath) .collect(Collectors.joining(","))); } - if (executorConfig.isValidConfig()) { + if (!executorConfig.isPlan()) { Configuration configuration = Configuration.fromMap(executorConfig.getConfig()); - if (configuration.contains(RestOptions.PORT)) { - this.environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); - } else { - this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration); + if (!configuration.contains(RestOptions.PORT)) { + configuration.set(RestOptions.PORT, executorConfig.getPort()); } + this.environment = StreamExecutionEnvironment.createLocalEnvironment(configuration); } else { this.environment = StreamExecutionEnvironment.createLocalEnvironment(); } diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 3ec80afbb3..adf5d8704b 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -177,6 +177,7 @@ public void init() { useRestAPI = SystemConfiguration.getInstances().isUseRestAPI(); sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator(); executorConfig = config.getExecutorSetting(); + executorConfig.setPlan(isPlanMode); executor = ExecutorFactory.buildExecutor(executorConfig); ExecutorContext.setExecutor(executor); } diff --git a/script/bin/auto.sh b/script/bin/auto.sh index 5d4cfc10ca..630bbd0bc9 100644 --- a/script/bin/auto.sh +++ b/script/bin/auto.sh @@ -5,7 +5,7 @@ FLINK_VERSION=${2:-1.14} JAR_NAME="dinky-admin" # Use FLINK_HOME: -CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/*" +CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/dinky/*:./plugins/flink${FLINK_VERSION}/*" PID_FILE="dinky.pid"