Skip to content

Commit

Permalink
[feature](cloud) Support rename compute group sql
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng committed Dec 31, 2024
1 parent b959c66 commit 904a306
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 9 deletions.
9 changes: 7 additions & 2 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2203,6 +2203,11 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
}
} break;
case AlterClusterRequest::RENAME_CLUSTER: {
// SQL mode, cluster cluster name eq empty cluster name, need drop empty cluster first.
// but in http api, cloud control will drop empty cluster
bool drop_empty_cluster =
request->has_drop_empty_cluster() ? request->drop_empty_cluster() : false;

msg = resource_mgr_->update_cluster(
instance_id, cluster,
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
Expand All @@ -2212,7 +2217,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
LOG(INFO) << "cluster.cluster.cluster_name(): "
<< cluster.cluster.cluster_name();
for (auto itt : cluster_names) {
LOG(INFO) << "itt : " << itt;
LOG(INFO) << "instance's cluster name : " << itt;
}
if (it != cluster_names.end()) {
code = MetaServiceCode::INVALID_ARGUMENT;
Expand All @@ -2232,7 +2237,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
}
c.set_cluster_name(cluster.cluster.cluster_name());
return msg;
});
}, drop_empty_cluster);
} break;
case AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT: {
msg = resource_mgr_->update_cluster(
Expand Down
28 changes: 27 additions & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ std::pair<MetaServiceCode, std::string> ResourceManager::drop_cluster(
std::string ResourceManager::update_cluster(
const std::string& instance_id, const ClusterInfo& cluster,
std::function<bool(const ClusterPB&)> filter,
std::function<std::string(ClusterPB&, std::set<std::string>& cluster_names)> action) {
std::function<std::string(ClusterPB&, std::set<std::string>& cluster_names)> action,
bool drop_empty_cluster) {
std::stringstream ss;
std::string msg;

Expand Down Expand Up @@ -521,6 +522,31 @@ std::string ResourceManager::update_cluster(

auto& clusters = const_cast<std::decay_t<decltype(instance.clusters())>&>(instance.clusters());

// check cluster_name is empty cluster, if empty and drop_empty_cluster == true, drop it
if (drop_empty_cluster) {
auto it = cluster_names.find(cluster_name);
if (it != cluster_names.end()) {
// found it, if it's an empty cluster, drop it from instance
int idx = -1;
for (auto& cluster : instance.clusters()) {
idx++;
if (cluster.cluster_name() == cluster_name) {
// Check if cluster is empty (has no nodes)
if (cluster.nodes_size() == 0) {
// Remove empty cluster from instance
auto& clusters = const_cast<std::decay_t<decltype(instance.clusters())>&>(
instance.clusters());
clusters.DeleteSubrange(idx, 1);
// Remove cluster name from set
cluster_names.erase(cluster_name);
LOG(INFO) << "Removed empty cluster, cluster_name=" << cluster_name;
}
break;
}
}
}
}

// do update
ClusterPB original = clusters[idx];
msg = action(clusters[idx], cluster_names);
Expand Down
4 changes: 3 additions & 1 deletion cloud/src/resource-manager/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ class ResourceManager {
*
* @param cluster cluster to update, only cluster name and cluster id are concered
* @param action update operation code snippet
* @param drop_empty_cluster, find cluster.cluster_name is a empty cluster(no node), drop it
* @filter filter condition
* @return empty string for success, otherwise failure reason returned
*/
virtual std::string update_cluster(
const std::string& instance_id, const ClusterInfo& cluster,
std::function<bool(const ClusterPB&)> filter,
std::function<std::string(ClusterPB&, std::set<std::string>& cluster_names)> action);
std::function<std::string(ClusterPB&, std::set<std::string>& cluster_names)> action,
bool drop_empty_cluster = false);

/**
* Get instance from underlying storage with given transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ supportedAlterStatement
dropRollupClause (COMMA dropRollupClause)* #alterTableDropRollup
| ALTER TABLE name=multipartIdentifier
SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterTableProperties
| ALTER SYSTEM RENAME COMPUTE GROUP name=identifier newName=identifier #alterSystemRenameComputeGroup
;

supportedDropStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class CloudClusterChecker extends MasterDaemon {

private CloudSystemInfoService cloudSystemInfoService;

private final Object checkLock = new Object();

boolean isUpdateCloudUniqueId = false;

public CloudClusterChecker(CloudSystemInfoService cloudSystemInfoService) {
Expand Down Expand Up @@ -321,9 +323,11 @@ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,

@Override
protected void runAfterCatalogReady() {
checkCloudBackends();
updateCloudMetrics();
checkCloudFes();
synchronized (checkLock) {
checkCloudBackends();
updateCloudMetrics();
checkCloudFes();
}
}

private void checkFeNodesMapValid() {
Expand Down Expand Up @@ -545,4 +549,12 @@ private void updateCloudMetrics() {
MetricRepo.updateClusterBackendAliveTotal(entry.getKey(), entry.getValue(), aliveNum);
}
}

public void checkNow() {
if (Env.getCurrentEnv().isMaster()) {
synchronized (checkLock) {
runAfterCatalogReady();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public CloudUpgradeMgr getCloudUpgradeMgr() {
return this.upgradeMgr;
}

public CloudClusterChecker getCloudClusterChecker() {
return this.cloudClusterCheck;
}

public String getCloudInstanceId() {
return cloudInstanceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ public void dropFrontend(Frontend frontend) throws DdlException {

private String tryCreateComputeGroup(String clusterName, String computeGroupId) throws UserException {
if (Strings.isNullOrEmpty(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())) {
throw new DdlException("unable to create compute group due to empty cluster_id");
throw new DdlException("unable to create compute group due to empty cloud_instance_id");
}

Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder()
Expand Down Expand Up @@ -1158,4 +1158,44 @@ public String getInstanceId(String cloudUniqueId) throws IOException {
throw new IOException("Failed to get instance info");
}
}

public void renameComputeGroup(String originalName, String newGroupName) throws UserException {
String cloudInstanceId = ((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId();
if (Strings.isNullOrEmpty(cloudInstanceId)) {
throw new DdlException("unable to rename compute group due to empty cloud_instance_id");
}
String originalComputeGroupId = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterIdByName(originalName);
if (Strings.isNullOrEmpty(originalComputeGroupId)) {
throw new DdlException("unable to rename compute group, "
+ "cant get compute group id by compute name " + originalName);
}

Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder()
.setClusterId(originalComputeGroupId)
.setClusterName(newGroupName)
.setType(Cloud.ClusterPB.Type.COMPUTE)
.build();

Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder()
.setInstanceId(((CloudEnv) Env.getCurrentEnv()).getCloudInstanceId())
.setOp(Cloud.AlterClusterRequest.Operation.RENAME_CLUSTER)
.setDropEmptyCluster(true)
.setCluster(clusterPB)
.build();


Cloud.AlterClusterResponse response;
try {
response = MetaServiceProxy.getInstance().alterCluster(request);
LOG.info("alter rename compute group, request: {}, response: {}", request, response);
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("alter rename compute group not ok, response: {}", response);
throw new UserException("failed to rename compute group errorCode: " + response.getStatus().getCode()
+ " msg: " + response.getStatus().getMsg());
}
} catch (RpcException e) {
throw new UserException("failed to alter rename compute group", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.doris.nereids.DorisParser.AlterRoleContext;
import org.apache.doris.nereids.DorisParser.AlterSqlBlockRuleContext;
import org.apache.doris.nereids.DorisParser.AlterStorageVaultContext;
import org.apache.doris.nereids.DorisParser.AlterSystemRenameComputeGroupContext;
import org.apache.doris.nereids.DorisParser.AlterTableAddRollupContext;
import org.apache.doris.nereids.DorisParser.AlterTableClauseContext;
import org.apache.doris.nereids.DorisParser.AlterTableContext;
Expand Down Expand Up @@ -487,6 +488,7 @@
import org.apache.doris.nereids.trees.plans.commands.AlterRoleCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterStorageVaultCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterSystemRenameComputeGroupCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterViewCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterWorkloadGroupCommand;
Expand Down Expand Up @@ -1284,6 +1286,11 @@ public LogicalPlan visitAlterStorageVault(AlterStorageVaultContext ctx) {
return new AlterStorageVaultCommand(vaultName, properties);
}

@Override
public LogicalPlan visitAlterSystemRenameComputeGroup(AlterSystemRenameComputeGroupContext ctx) {
return new AlterSystemRenameComputeGroupCommand(ctx.name.getText(), ctx.newName.getText());
}

@Override
public LogicalPlan visitShowConstraint(ShowConstraintContext ctx) {
List<String> parts = visitMultipartIdentifier(ctx.table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,6 @@ public enum PlanType {
SHOW_QUERY_PROFILE_COMMAND,
SWITCH_COMMAND,
HELP_COMMAND,
USE_COMMAND
USE_COMMAND,
ALTER_SYSTEM_RENAME_COMPUTE_GROUP
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Strings;

/**
* Alter System Rename Compute Group
*/
public class AlterSystemRenameComputeGroupCommand extends Command implements ForwardWithSync {
private final String originalName;
private final String newName;

public AlterSystemRenameComputeGroupCommand(String originalName, String newName) {
super(PlanType.ALTER_SYSTEM_RENAME_COMPUTE_GROUP);
this.originalName = originalName;
this.newName = newName;
}

private void validate() throws AnalysisException {
if (Strings.isNullOrEmpty(originalName) || Strings.isNullOrEmpty(newName)) {
throw new AnalysisException("rename group requires non-empty or non-empty name");
}
if (originalName.equals(newName)) {
throw new AnalysisException("rename compute group original name eq new name");
}
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate();
doRun(ctx);
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCommand(this, context);
}

private void doRun(ConnectContext ctx) throws Exception {
try {
// 1. send rename rpc to ms
((CloudSystemInfoService) Env.getCurrentSystemInfo()).renameComputeGroup(this.originalName, this.newName);
// 2. if 1 not throw exception, refresh cloud cluster
// if not do 2, will wait 10s to get new name
((CloudEnv) Env.getCurrentEnv()).getCloudClusterChecker().checkNow();
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
}
}
2 changes: 2 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ message AlterClusterRequest {
optional string cloud_unique_id = 2; // For auth
optional ClusterPB cluster = 3;
optional Operation op = 4;
// for SQL mode rename cluster, rename to cluster name eq instance empty cluster name, need drop empty cluster
optional bool drop_empty_cluster = 5 [default = false];
}

message AlterClusterResponse {
Expand Down

0 comments on commit 904a306

Please sign in to comment.