diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_backend_scale.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_backend_scale.py index 41e8879ef1..cc08612380 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_backend_scale.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_backend_scale.py @@ -10,6 +10,7 @@ """ import logging.config from collections import defaultdict +from copy import deepcopy from dataclasses import asdict from typing import Dict, Optional @@ -98,6 +99,9 @@ def __pre_check( else: if len(master_ips) != group_num: raise Exception("new machine num != group_num.") + # shard_num 必须大于 group_num + if new_shard_num < group_num: + raise Exception("shard_num:{} < group_num:{}".format(new_shard_num, group_num)) if old_shard_num != new_shard_num: raise Exception("old_shard_num {} != new_shard_num {}.".format(old_shard_num, new_shard_num)) # 2024-05 要求可以不用整除,允许机器上的实例有多有少 @@ -401,6 +405,42 @@ def get_redis_install_sub_pipelines( redis_install_sub_pipelines.append(RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)) return redis_install_sub_pipelines + # > 4.0 版本需要重做一下slave,否则可能会丢数据 + # rediscluster架构需要特殊考虑 + def redis_local_redo_dr(self, act_kwargs, sync_relations) -> list: + sub_pipelines, resync_args = [], deepcopy(act_kwargs) + resync_args.cluster = { + "bk_biz_id": int(act_kwargs.cluster["bk_biz_id"]), + "cluster_id": int(act_kwargs.cluster["cluster_id"]), + "cluster_type": act_kwargs.cluster["cluster_type"], + "immute_domain": act_kwargs.cluster["immute_domain"], + "bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]), + "instances": [], + } + for sync_link in sync_relations: + one_resync_args = deepcopy(resync_args) + new_master, new_slave = sync_link["sync_dst1"], sync_link["sync_dst2"] + for instances in sync_link["ins_link"]: + master_port, slave_port = instances["sync_dst1"], instances["sync_dst2"] + one_resync_args.cluster["instances"].append( + { + "master_ip": new_master, + "master_port": int(master_port), + "slave_ip": new_slave, + "slave_port": int(slave_port), + } + ) + one_resync_args.exec_ip = new_slave + one_resync_args.get_redis_payload_func = RedisActPayload.redis_local_redo_dr.__name__ + sub_pipelines.append( + { + "act_name": _("{}-本地重建Slave").format(new_slave), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(one_resync_args), + } + ) + return sub_pipelines + def redis_backend_scale_flow(self): """ redis 扩缩容流程: @@ -549,6 +589,18 @@ def redis_backend_scale_flow(self): kwargs={**asdict(act_kwargs), **asdict(dns_kwargs)}, ) + # > 4.0 版本需要重做一下slave,否则可能会丢数据 + big_version = int(str.split(act_kwargs.cluster["db_version"], "-")[1]) + if ( + act_kwargs.cluster["cluster_type"] + in [ + ClusterType.TendisRedisInstance.value, + ClusterType.TendisTwemproxyRedisInstance.value, + ] + and big_version >= 4 + ): + sub_pipeline.add_parallel_acts(acts_list=self.redis_local_redo_dr(act_kwargs, sync_relations)) + sub_pipeline.add_act(act_name=_("Redis-人工确认"), act_component_code=PauseComponent.code, kwargs={}) # 删除老实例的nodes域名