Skip to content

Commit

Permalink
[Fix] Fixed the issue that the address of the jobmanager was not chan…
Browse files Browse the repository at this point in the history
…ged in modify cluster instance (#3070)

Co-authored-by: gaoyan1998 <[email protected]>
  • Loading branch information
gaoyan1998 and gaoyan1998 authored Jan 26, 2024
1 parent 717214f commit cc60470
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

Expand All @@ -40,6 +41,7 @@
@NoArgsConstructor
@AllArgsConstructor
@ApiModel(value = "ClusterInstanceDTO", description = "API Cluster Instance Data Transfer Object")
@Builder
public class ClusterInstanceDTO {

@ApiModelProperty(value = "id", required = true, dataType = "Integer", example = "id")
Expand Down Expand Up @@ -67,14 +69,6 @@ public class ClusterInstanceDTO {
@ApiModelProperty(value = "hosts", required = true, dataType = "String", example = "test", notes = "cluster hosts")
private String hosts;

@ApiModelProperty(
value = "jobManagerHost",
required = true,
dataType = "String",
example = "test",
notes = "job manager host")
private String jobManagerHost;

@ApiModelProperty(
value = "autoRegisters",
required = true,
Expand Down Expand Up @@ -105,18 +99,4 @@ public ClusterInstance toBean() {
BeanUtil.copyProperties(this, clusterInstance);
return clusterInstance;
}

public static ClusterInstanceDTO autoRegistersClusterDTO(
String hosts, String name, String alias, String type, Integer clusterConfigurationId, Integer taskId) {
ClusterInstanceDTO clusterInstanceDTO = new ClusterInstanceDTO();
clusterInstanceDTO.setName(name);
clusterInstanceDTO.setAlias(alias);
clusterInstanceDTO.setHosts(hosts);
clusterInstanceDTO.setType(type);
clusterInstanceDTO.setClusterConfigurationId(clusterConfigurationId);
clusterInstanceDTO.setTaskId(taskId);
clusterInstanceDTO.setAutoRegisters(true);
clusterInstanceDTO.setEnabled(true);
return clusterInstanceDTO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,4 @@ public class ClusterInstance extends SuperEntity<ClusterInstance> {

@ApiModelProperty(value = "taskId", required = true, dataType = "Integer", example = "test", notes = "task id")
private Integer taskId;

public static ClusterInstance autoRegistersCluster(
String hosts, String name, String alias, String type, Integer clusterConfigurationId, Integer taskId) {
ClusterInstance clusterInstance = new ClusterInstance();
clusterInstance.setName(name);
clusterInstance.setAlias(alias);
clusterInstance.setHosts(hosts);
clusterInstance.setType(type);
clusterInstance.setClusterConfigurationId(clusterConfigurationId);
clusterInstance.setTaskId(taskId);
clusterInstance.setAutoRegisters(true);
clusterInstance.setEnabled(true);
return clusterInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,32 @@ public boolean success() {
ClusterInstance clusterInstance;
final Integer clusterConfigurationId = job.getJobConfig().getClusterConfigurationId();
if (job.isUseGateway()) {
clusterInstance = clusterInstanceService.registersCluster(ClusterInstanceDTO.autoRegistersClusterDTO(
job.getJobManagerAddress(),
job.getJobId(),
job.getJobConfig().getJobName() + "_" + LocalDateTime.now(),
job.getType().getLongValue(),
clusterConfigurationId,
taskId));
clusterInstance = clusterInstanceService.registersCluster(ClusterInstanceDTO.builder()
.hosts(job.getJobManagerAddress())
.name(job.getJobId())
.alias(job.getJobConfig().getJobName() + "_" + LocalDateTime.now())
.type(job.getType().getLongValue())
.clusterConfigurationId(clusterConfigurationId)
.taskId(taskId)
.autoRegisters(true)
.enabled(true)
.build());

if (Asserts.isNotNull(clusterInstance)) {
clusterId = clusterInstance.getId();
}
} else if (GatewayType.LOCAL.equalsValue(job.getJobConfig().getType())
&& Asserts.isNotNullString(job.getJobManagerAddress())
&& Asserts.isNotNullString(job.getJobId())) {
clusterInstance = clusterInstanceService.registersCluster(ClusterInstanceDTO.autoRegistersClusterDTO(
job.getJobManagerAddress(),
job.getJobId(),
job.getJobConfig().getJobName() + "_" + LocalDateTime.now(),
job.getType().getLongValue(),
null,
taskId));
clusterInstance = clusterInstanceService.registersCluster(ClusterInstanceDTO.builder()
.hosts(job.getJobManagerAddress())
.name(job.getJobId())
.alias(job.getJobConfig().getJobName() + "_" + LocalDateTime.now())
.type(job.getType().getLongValue())
.taskId(taskId)
.autoRegisters(true)
.enabled(true)
.build());
if (Asserts.isNotNull(clusterInstance)) {
clusterId = clusterInstance.getId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,15 @@ public ClusterInstance deploySessionCluster(Integer id) {
GatewayResult gatewayResult = JobManager.deploySessionCluster(gatewayConfig);
if (gatewayResult.isSuccess()) {
Asserts.checkNullString(gatewayResult.getWebURL(), "Unable to obtain Web URL.");
return registersCluster(ClusterInstanceDTO.autoRegistersClusterDTO(
gatewayResult.getWebURL().replace("http://", ""),
gatewayResult.getId(),
clusterCfg.getName() + "_" + LocalDateTime.now(),
gatewayConfig.getType().getLongValue(),
id,
null));
return registersCluster(ClusterInstanceDTO.builder()
.hosts(gatewayResult.getWebURL().replace("http://", ""))
.name(gatewayResult.getId())
.alias(clusterCfg.getName() + "_" + LocalDateTime.now())
.type(gatewayConfig.getType().getLongValue())
.clusterConfigurationId(id)
.autoRegisters(true)
.enabled(true)
.build());
}
throw new DinkyException("Deploy session cluster error: " + gatewayResult.getError());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export type OperatorType = {
};
const JobOperator = (props: OperatorType) => {
const { jobDetail, refesh } = props;
const webUri = `/api/flink/${jobDetail?.history?.jobManagerAddress}/#/job/running/${jobDetail?.instance?.jid}/overview`;
const webUri = `/api/flink/${jobDetail?.clusterInstance?.jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`;

const handleJobOperator = (key: string) => {
Modal.confirm({
Expand Down

0 comments on commit cc60470

Please sign in to comment.