Skip to content

Commit

Permalink
fix(redis): 高版本redis链式slave可能存在数据丢失问题 TencentBlueKing#8177
Browse files Browse the repository at this point in the history
  • Loading branch information
OMG-By committed Nov 29, 2024
1 parent f731f2e commit e9e27ae
Showing 1 changed file with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import logging.config
from collections import defaultdict
from copy import deepcopy
from dataclasses import asdict
from typing import Dict, Optional

Expand Down Expand Up @@ -98,6 +99,9 @@ def __pre_check(
else:
if len(master_ips) != group_num:
raise Exception("new machine num != group_num.")
# shard_num 必须大于 group_num
if new_shard_num < group_num:
raise Exception("shard_num:{} < group_num:{}".format(new_shard_num, group_num))
if old_shard_num != new_shard_num:
raise Exception("old_shard_num {} != new_shard_num {}.".format(old_shard_num, new_shard_num))
# 2024-05 要求可以不用整除,允许机器上的实例有多有少
Expand Down Expand Up @@ -401,6 +405,42 @@ def get_redis_install_sub_pipelines(
redis_install_sub_pipelines.append(RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params))
return redis_install_sub_pipelines

# > 4.0 版本需要重做一下slave,否则可能会丢数据
# rediscluster架构需要特殊考虑
def redis_local_redo_dr(self, act_kwargs, sync_relations) -> list:
sub_pipelines, resync_args = [], deepcopy(act_kwargs)
resync_args.cluster = {
"bk_biz_id": int(act_kwargs.cluster["bk_biz_id"]),
"cluster_id": int(act_kwargs.cluster["cluster_id"]),
"cluster_type": act_kwargs.cluster["cluster_type"],
"immute_domain": act_kwargs.cluster["immute_domain"],
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"instances": [],
}
for sync_link in sync_relations:
one_resync_args = deepcopy(resync_args)
new_master, new_slave = sync_link["sync_dst1"], sync_link["sync_dst2"]
for instances in sync_link["ins_link"]:
master_port, slave_port = instances["sync_dst1"], instances["sync_dst2"]
one_resync_args.cluster["instances"].append(
{
"master_ip": new_master,
"master_port": int(master_port),
"slave_ip": new_slave,
"slave_port": int(slave_port),
}
)
one_resync_args.exec_ip = new_slave
one_resync_args.get_redis_payload_func = RedisActPayload.redis_local_redo_dr.__name__
sub_pipelines.append(
{
"act_name": _("{}-本地重建Slave").format(new_slave),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(one_resync_args),
}
)
return sub_pipelines

def redis_backend_scale_flow(self):
"""
redis 扩缩容流程:
Expand Down Expand Up @@ -549,6 +589,18 @@ def redis_backend_scale_flow(self):
kwargs={**asdict(act_kwargs), **asdict(dns_kwargs)},
)

# > 4.0 版本需要重做一下slave,否则可能会丢数据
big_version = int(str.split(act_kwargs.cluster["db_version"], "-")[1])
if (
act_kwargs.cluster["cluster_type"]
in [
ClusterType.TendisRedisInstance.value,
ClusterType.TendisTwemproxyRedisInstance.value,
]
and big_version >= 4
):
sub_pipeline.add_parallel_acts(acts_list=self.redis_local_redo_dr(act_kwargs, sync_relations))

sub_pipeline.add_act(act_name=_("Redis-人工确认"), act_component_code=PauseComponent.code, kwargs={})

# 删除老实例的nodes域名
Expand Down

0 comments on commit e9e27ae

Please sign in to comment.