Skip to content

Commit

Permalink
[fix][service] 通过页面dfs.hosts配置的值创建白名单文件 (datavane#622)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghongchen committed Oct 22, 2024
1 parent df915d9 commit 42894cf
Showing 1 changed file with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) {
SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceWebuisService.class);
ClusterServiceInstanceRoleGroupService roleGroupService =
SpringTool.getApplicationContext().getBean(ClusterServiceInstanceRoleGroupService.class);

ClusterInfoEntity clusterInfo = clusterInfoService.getById(serviceRoleInfo.getClusterId());

ClusterServiceInstanceEntity clusterServiceInstance =
serviceInstanceService.getServiceInstanceByClusterIdAndServiceName(serviceRoleInfo.getClusterId(),
serviceRoleInfo.getParentName());
Expand Down Expand Up @@ -177,7 +175,6 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) {
}
Integer roleGroupId = (Integer) CacheUtils.get("UseRoleGroup_" + clusterServiceInstance.getId());
ClusterServiceInstanceRoleGroup roleGroup = roleGroupService.getById(roleGroupId);

// save role instance
ClusterServiceRoleInstanceEntity roleInstanceEntity = serviceRoleInstanceService
.getOneServiceRole(serviceRoleInfo.getName(), serviceRoleInfo.getHostname(), clusterInfo.getId());
Expand Down Expand Up @@ -222,20 +219,15 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) {
webuisService.save(webuis);
globalVariables.remove("${host}");
}

}
}

}

public static void saveHostInstallInfo(StartWorkerMessage message, String clusterCode,
ClusterHostService clusterHostService) {
ClusterInfoService clusterInfoService = SpringTool.getApplicationContext().getBean(ClusterInfoService.class);
ClusterHostDO clusterHostDO = new ClusterHostDO();
BeanUtil.copyProperties(message, clusterHostDO);

ClusterInfoEntity cluster = clusterInfoService.getClusterByClusterCode(clusterCode);

clusterHostDO.setClusterId(cluster.getId());
clusterHostDO.setCheckTime(new Date());
clusterHostDO.setRack("/default-rack");
Expand All @@ -246,7 +238,6 @@ public static void saveHostInstallInfo(StartWorkerMessage message, String cluste
clusterHostDO.setManaged(MANAGED.YES);
clusterHostService.save(clusterHostDO);
}

public static void updateCommandStateToFailed(List<String> commandIds) {
for (String commandId : commandIds) {
logger.info("command id is {}", commandId);
Expand Down Expand Up @@ -283,12 +274,10 @@ public static void updateCommandStateToFailed(List<String> commandIds) {
}
}
}

public static void tellCommandActorResult(String serviceName, ExecuteServiceRoleCommand executeServiceRoleCommand,
ServiceExecuteState state) {
ActorRef serviceExecuteResultActor = ActorUtils.getLocalActor(ServiceExecuteResultActor.class,
ActorUtils.getActorRefName(ServiceExecuteResultActor.class));

ServiceExecuteResultMessage serviceExecuteResultMessage = new ServiceExecuteResultMessage();
serviceExecuteResultMessage.setServiceExecuteState(state);
serviceExecuteResultMessage.setDag(executeServiceRoleCommand.getDag());
Expand All @@ -302,15 +291,12 @@ public static void tellCommandActorResult(String serviceName, ExecuteServiceRole
serviceExecuteResultMessage.setErrorTaskList(executeServiceRoleCommand.getErrorTaskList());
serviceExecuteResultMessage.setReadyToSubmitTaskList(executeServiceRoleCommand.getReadyToSubmitTaskList());
serviceExecuteResultMessage.setCompleteTaskList(executeServiceRoleCommand.getCompleteTaskList());

serviceExecuteResultActor.tell(serviceExecuteResultMessage, ActorRef.noSender());
}

public static ClusterServiceCommandHostCommandEntity handleCommandResult(String hostCommandId, Boolean execResult,
String execOut) {
ClusterServiceCommandHostCommandService service =
SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class);

ClusterServiceCommandHostCommandEntity hostCommand = service.getByHostCommandId(hostCommandId);
hostCommand.setCommandProgress(100);
if (execResult) {
Expand All @@ -335,14 +321,12 @@ public static ClusterServiceCommandHostCommandEntity handleCommandResult(String
} else {
message.setServiceRoleType(ServiceRoleType.WORKER);
}

ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor");
ActorUtils.actorSystem.scheduler().scheduleOnce(FiniteDuration.apply(
1L, TimeUnit.SECONDS),
commandActor, message,
ActorUtils.actorSystem.dispatcher(),
ActorRef.noSender());

return hostCommand;
}

Expand Down Expand Up @@ -469,17 +453,36 @@ public static void hdfsEcMethond(Integer serviceInstanceId, ClusterServiceRoleIn
.eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstanceId)
.eq(ClusterServiceRoleInstanceEntity::getServiceRoleName, roleName)
.list();

// 更新namenode节点的whitelist白名单
ClusterVariableService variableService =
SpringTool.getApplicationContext().getBean(ClusterVariableService.class);
// 更新namenode节点的whitelist白名单或blacklist黑名单,分别是dfs.hosts、dfs.hosts.exclude 对应的配置文件
for (ClusterServiceRoleInstanceEntity namenode : namenodes) {
ActorSelection actorSelection = ActorUtils.actorSystem.actorSelection(
"akka.tcp://datasophon@" + namenode.getHostname() + ":2552/user/worker/fileOperateActor");
ActorSelection execCmdActor = ActorUtils.actorSystem.actorSelection(
"akka.tcp://datasophon@" + namenode.getHostname() + ":2552/user/worker/executeCmdActor");
Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS));
// DataNode 节点的 hostname 作为文件内容
FileOperateCommand fileOperateCommand = new FileOperateCommand();
fileOperateCommand.setLines(list);
fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
// fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
if ("whitelist".equals(type)) {
// 优先获取通过页面白名单文件变量配置的路径,而不是写死文件路径
ClusterVariable clusterVariable = variableService.getVariableByVariableName("${dfs.hosts}", namenode.getClusterId());
if (!Objects.isNull(clusterVariable) && !StringUtils.isBlank(clusterVariable.getVariableValue())) {
logger.info("dfs.hosts value is {}", clusterVariable.getVariableValue());
fileOperateCommand.setPath(clusterVariable.getVariableValue());
} else {
fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
}
} else {
ClusterVariable clusterVariable = variableService.getVariableByVariableName("${dfs.hosts.exclude}", namenode.getClusterId());
if (!Objects.isNull(clusterVariable) && !StringUtils.isBlank(clusterVariable.getVariableValue())) {
fileOperateCommand.setPath(clusterVariable.getVariableValue());
} else {
fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
}
}
Future<Object> future = Patterns.ask(actorSelection, fileOperateCommand, timeout);
ExecResult fileOperateResult = (ExecResult) Await.result(future, timeout.duration());
if (Objects.nonNull(fileOperateResult) && fileOperateResult.getExecResult()) {
Expand Down

0 comments on commit 42894cf

Please sign in to comment.