Skip to content

Commit

Permalink
fix(backend): 单据标准化协议修改 #7747
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud committed Nov 22, 2024
1 parent 2acca7e commit d910c3f
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 265 deletions.
182 changes: 7 additions & 175 deletions dbm-ui/backend/db_dirty/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,18 @@
"""
import itertools
import logging
from collections import defaultdict
from typing import Any, Dict, List
from typing import List

from django.utils.translation import ugettext as _

from backend import env
from backend.components import CCApi
from backend.configuration.constants import SystemSettingsEnum
from backend.configuration.models import SystemSettings
from backend.db_dirty.constants import MachineEventType, PoolType
from backend.db_dirty.exceptions import PoolTransferException
from backend.db_dirty.models import DirtyMachine, MachineEvent
from backend.db_meta.models import AppCache
from backend.db_services.ipchooser.constants import IDLE_HOST_MODULE
from backend.db_services.ipchooser.handlers.topo_handler import TopoHandler
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.flow.consts import FAILED_STATES
from backend.flow.utils.cc_manage import CcManage
from backend.ticket.builders import BuilderFactory
from backend.ticket.builders.common.base import fetch_apply_hosts
from backend.ticket.models import Flow, Ticket
from backend.utils.batch_request import request_multi_thread

logger = logging.getLogger("root")

Expand Down Expand Up @@ -67,172 +57,14 @@ def transfer_hosts_to_pool(cls, operator: str, bk_host_ids: List[int], source: P
raise PoolTransferException(_("{}--->{}转移不合法").format(source, target))

@classmethod
def query_dirty_machine_records(cls, bk_host_ids: List[int]):
"""
查询污点池主机信息 TODO: 污点池废弃,代码将被移除
@param bk_host_ids: 主机列表
"""

def get_module_data(data):
params, res = data
params = params["params"]
return [{"bk_biz_id": params["bk_biz_id"], **d} for d in res]

if not bk_host_ids:
return []

# 如果传入的列表已经是DirtyMachine,则直接用
if not isinstance(bk_host_ids[0], DirtyMachine):
dirty_machines = DirtyMachine.objects.filter(bk_host_id__in=bk_host_ids)
else:
dirty_machines = bk_host_ids
bk_host_ids = [dirty.bk_host_id for dirty in dirty_machines]

# 缓存云区域和业务信息
bk_biz_ids = [dirty_machine.bk_biz_id for dirty_machine in dirty_machines]
for_biz_infos = AppCache.batch_get_app_attr(bk_biz_ids=bk_biz_ids, attr_name="bk_biz_name")
cloud_info = ResourceQueryHelper.search_cc_cloud(get_cache=True)

# 查询污点主机当前所处的模块
host_topo_infos = CCApi.find_host_biz_relations(params={"bk_host_id": bk_host_ids})
host__topo_info_map: Dict[int, List] = defaultdict(list)
biz__modules_map: Dict[int, List] = defaultdict(list)
for topo in host_topo_infos:
host__topo_info_map[topo["bk_host_id"]].append(topo)
biz__modules_map[topo["bk_biz_id"]].append(topo["bk_module_id"])
# 批量获取业务下模块信息
module_infos = request_multi_thread(
func=CCApi.find_module_batch,
params_list=[
{
"params": {"bk_biz_id": biz, "bk_ids": modules, "fields": ["bk_module_id", "bk_module_name"]},
"use_admin": True,
}
for biz, modules in biz__modules_map.items()
],
get_data=get_module_data,
in_order=True,
)
module_infos = list(itertools.chain(*module_infos))
biz__module__module_name: Dict[int, Dict[int, str]] = defaultdict(dict)
for info in module_infos:
biz__module__module_name[info["bk_biz_id"]][info["bk_module_id"]] = info["bk_module_name"]

# 获取污点池模块
system_manage_topo = SystemSettings.get_setting_value(key=SystemSettingsEnum.MANAGE_TOPO.value)
dirty_module = system_manage_topo["dirty_module_id"]

# 获取污点池列表信息
dirty_machine_list: List[Dict] = []
for dirty in dirty_machines:
# 填充污点池主机基础信息
dirty_machine_info = {
"ip": dirty.ip,
"bk_host_id": dirty.bk_host_id,
"bk_cloud_name": cloud_info[str(dirty.bk_cloud_id)]["bk_cloud_name"],
"bk_cloud_id": dirty.bk_cloud_id,
"bk_biz_name": for_biz_infos[int(dirty.bk_biz_id)],
"bk_biz_id": dirty.bk_biz_id,
"ticket_type": dirty.ticket.ticket_type,
"ticket_id": dirty.ticket.id,
"ticket_type_display": dirty.ticket.get_ticket_type_display(),
"task_id": dirty.flow.flow_obj_id,
"operator": dirty.ticket.creator,
"is_dirty": True,
}

# 如果主机已经不存在于cc,则仅能删除记录
if dirty.bk_host_id not in host__topo_info_map:
dirty_machine_info.update(is_dirty=False)
dirty_machine_list.append(dirty_machine_info)
continue

# 补充主机所在的模块信息
host_in_module = [
{
"bk_module_id": h["bk_module_id"],
"bk_module_name": biz__module__module_name[h["bk_biz_id"]].get(h["bk_module_id"], ""),
}
for h in host__topo_info_map[dirty.bk_host_id]
]
dirty_machine_info.update(bk_module_infos=host_in_module)

# 如果主机 不处于/不仅仅处于【污点池】中,则不允许移入待回收
host = host__topo_info_map[dirty.bk_host_id][0]
if len(host__topo_info_map[dirty.bk_host_id]) > 1:
dirty_machine_info.update(is_dirty=False)
elif host["bk_biz_id"] != env.DBA_APP_BK_BIZ_ID or host["bk_module_id"] != dirty_module:
dirty_machine_info.update(is_dirty=False)

dirty_machine_list.append(dirty_machine_info)

dirty_machine_list.sort(key=lambda x: x["ticket_id"], reverse=True)
return dirty_machine_list

@classmethod
def insert_dirty_machines(cls, bk_biz_id: int, bk_host_ids: List[Dict[str, Any]], ticket: Ticket, flow: Flow):
def handle_dirty_machine(cls, ticket_id, root_id, origin_tree_status, target_tree_status):
"""
将机器导入到污点池中 TODO: 污点池废弃,代码将被移除
@param bk_biz_id: 业务ID
@param bk_host_ids: 主机列表
@param ticket: 关联的单据
@param flow: 关联的flow任务
处理执行失败/重试成功涉及的污点池机器
@param ticket_id: 单据ID
@param root_id: 流程ID
@param origin_tree_status: 流程源状态
@param target_tree_status: 流程目标状态
"""
# 查询污点机器信息
host_property_filter = {
"condition": "AND",
"rules": [{"field": "bk_host_id", "operator": "in", "value": bk_host_ids}],
}
dirty_host_infos = CCApi.list_hosts_without_biz(
{
# 默认一次性录入的机器不会超过500
"page": {"start": 0, "limit": 500, "sort": "bk_host_id"},
"host_property_filter": host_property_filter,
"fields": ["bk_host_id", "bk_cloud_id", "bk_host_innerip"],
},
use_admin=True,
)["info"]

# 获取业务空闲机模块,资源池模块和污点池模块
idle_module = CcManage(bk_biz_id, "").get_biz_internal_module(bk_biz_id)[IDLE_HOST_MODULE]["bk_module_id"]
system_manage_topo = SystemSettings.get_setting_value(key=SystemSettingsEnum.MANAGE_TOPO.value)
resource_module, dirty_module = system_manage_topo["resource_module_id"], system_manage_topo["dirty_module_id"]
# 获取主机的拓扑信息(注:这里不能带上业务信息,因为主机可能转移业务)
host_topo_infos = TopoHandler.query_host_set_module(bk_host_ids=bk_host_ids)["hosts_topo_info"]
# 将污点机器信息转移至DBA污点池模(如果污点机器不在空闲机/资源池,则放弃转移,认为已到正确拓扑)
transfer_host_ids = [
info["bk_host_id"]
for info in host_topo_infos
if not set(info["bk_module_ids"]) - {resource_module, idle_module}
]
if transfer_host_ids:
update_host_properties = {"dbm_meta": [], "need_monitor": False, "update_operator": False}
CcManage(bk_biz_id=env.DBA_APP_BK_BIZ_ID, cluster_type="").transfer_host_module(
transfer_host_ids, target_module_ids=[dirty_module], update_host_properties=update_host_properties
)

# 录入污点池表中
exist_dirty_machine_ids = list(
DirtyMachine.objects.filter(bk_host_id__in=bk_host_ids).values_list("bk_host_id", flat=True)
)
DirtyMachine.objects.bulk_create(
[
DirtyMachine(
ticket=ticket,
flow=flow,
ip=host["bk_host_innerip"],
bk_biz_id=bk_biz_id,
bk_host_id=host["bk_host_id"],
bk_cloud_id=host["bk_cloud_id"],
)
for host in dirty_host_infos
if host["bk_host_id"] not in exist_dirty_machine_ids
]
)

@classmethod
def handle_dirty_machine(cls, ticket_id, root_id, origin_tree_status, target_tree_status):
"""处理执行失败/重试成功涉及的污点池机器"""
if (origin_tree_status not in FAILED_STATES) and (target_tree_status not in FAILED_STATES):
return

Expand Down
21 changes: 12 additions & 9 deletions dbm-ui/backend/db_dirty/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ def hosts_pool_transfer(cls, bk_biz_id, hosts, pool, operator="", ticket=None):
host_ids = [host["bk_host_id"] for host in hosts]

# 主机转入污点/故障池,说明第一次被纳管到池
if pool in [PoolType.Fault, PoolType.Dirty]:
hosts_pool = [
cls(bk_biz_id=bk_biz_id, pool=pool, ticket=ticket, creator=operator, updater=operator, **host)
for host in hosts
]
cls.objects.bulk_create(hosts_pool)
# 待回收只会从故障池转移
elif pool == PoolType.Recycle:
cls.objects.filter(bk_host_id__in=host_ids).update(pool=pool, ticket=ticket)
# 待回收会从故障池、资源池转移
# 因此这里判断主机不存在就创建,否则更新
if pool in [PoolType.Fault, PoolType.Dirty, PoolType.Recycle]:
handle_hosts = cls.objects.filter(bk_host_id__in=host_ids)
if handle_hosts.count() == len(host_ids):
handle_hosts.update(pool=pool, ticket=ticket)
else:
handle_hosts = [
cls(bk_biz_id=bk_biz_id, pool=pool, ticket=ticket, creator=operator, updater=operator, **host)
for host in hosts
]
cls.objects.bulk_create(handle_hosts)
# 回收机器只能从待回收转移,删除池纳管记录
# 重导入回资源池,删除池纳管记录
elif pool in [PoolType.Recycled, PoolType.Resource]:
Expand Down
6 changes: 4 additions & 2 deletions dbm-ui/backend/db_meta/api/machine/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,16 @@ def clear_info_for_machine(machines: Optional[List]):

# 清理proxy相关信息
for p in proxys:
p.tendbclusterspiderext.delete()
if hasattr(p, "tendbclusterspiderext"):
p.tendbclusterspiderext.delete()
p.delete(keep_parents=True)

# 清理storage相关信息
for s in storages:
for info in StorageInstanceTuple.objects.filter(Q(ejector=s) | Q(receiver=s)):
# 先删除额外关联信息,否则会报ProtectedError 异常
info.tendbclusterstorageset.delete()
if hasattr(info, "tendbclusterstorageset"):
info.tendbclusterstorageset.delete()
info.delete()
s.delete(keep_parents=True)
machine.delete(keep_parents=True)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class Migration(migrations.Migration):

dependencies = [
("db_meta", "0043_auto_20241014_1042"),
("db_meta", "0044_deviceclass"),
("db_meta", "0043_auto_20241015_2128"),
]

Expand Down
10 changes: 6 additions & 4 deletions dbm-ui/backend/db_meta/models/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,18 @@ def enable_dbha(self):
self.refresh_from_db()

@classmethod
def get_cluster_related_machines(cls, cluster_ids: List[int]) -> List:
def get_cluster_related_machines(cls, cluster_ids: List[int], role: str = None) -> List:
"""
通过集群id查询集群关联的所有主机信息,即实例所在的主机
"""
from backend.db_meta.models import Machine

clusters = Cluster.objects.filter(id__in=cluster_ids)
host_ids = set(clusters.values_list("storageinstance__machine__bk_host_id", flat=True)) | set(
clusters.values_list("proxyinstance__machine__bk_host_id", flat=True)
)
host_ids = set()
if not role or role == "backend":
host_ids |= set(clusters.values_list("storageinstance__machine__bk_host_id", flat=True))
if not role or role == "proxy":
host_ids |= set(clusters.values_list("proxyinstance__machine__bk_host_id", flat=True))
machines = Machine.objects.filter(bk_host_id__in=host_ids)
return machines

Expand Down
6 changes: 5 additions & 1 deletion dbm-ui/backend/db_package/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
SyncMediumSerializer,
UploadPackageSerializer,
)
from backend.exceptions import ApiRequestError
from backend.flow.consts import MediumEnum
from backend.iam_app.dataclass import ResourceEnum
from backend.iam_app.dataclass.actions import ActionEnum
Expand Down Expand Up @@ -168,7 +169,10 @@ def partial_update(self, request, *args, **kwargs):
)
def destroy(self, request, *args, **kwargs):
# 删除制品库文件
StorageHandler().delete_file(self.get_object().path)
try:
StorageHandler().delete_file(self.get_object().path)
except ApiRequestError as e:
logger.error(_("文件删除异常,错误信息: {}").format(e))
# 删除本地记录
super().destroy(request, *args, **kwargs)
return Response()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
logger.error("cluster {} backup info not exists".format(cluster_model.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_id)))
cluster["backupinfo"] = backup_info
cluster["new_master_ip"] = self.data["new_master_ ip"]
cluster["new_master_ip"] = self.data["new_master_ip"]
cluster["new_slave_ip"] = self.data["new_slave_ip"]
cluster["new_master_port"] = master_model.port
cluster["new_slave_port"] = master_model.port
Expand Down
13 changes: 10 additions & 3 deletions dbm-ui/backend/ticket/builders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ class RecycleParamBuilder(FlowParamBuilder):

controller_map = {
DBType.MySQL.value: "MySQLController.mysql_machine_clear_scene",
DBType.TenDBCluster.value: "MySQLController.mysql_machine_clear_scene",
DBType.TenDBCluster.value: "SpiderController.tendbcluster_machine_clear_scene",
# TODO redis清理流程暂时没有
DBType.Redis.value: "",
}

def __init__(self, ticket: Ticket):
Expand All @@ -264,10 +266,15 @@ def __init__(self, ticket: Ticket):
assert self.ip_dest is not None

def build_controller_info(self) -> dict:
# TODO: 暂时兼容没有清理流程的组件,默认用mysql
db_type = self.ticket_data["db_type"]
class_name, flow_name = self.controller_map[db_type].split(".")
module = importlib.import_module(f"backend.flow.engine.controller.{db_type}")
clear_db_type = db_type if self.controller_map.get(db_type) else DBType.MySQL.value

class_name, flow_name = self.controller_map[clear_db_type].split(".")
module_file_name = "spider" if clear_db_type == DBType.TenDBCluster else clear_db_type
module = importlib.import_module(f"backend.flow.engine.controller.{module_file_name}")
self.controller = getattr(getattr(module, class_name), flow_name)

return super().build_controller_info()

def format_ticket_data(self):
Expand Down
4 changes: 2 additions & 2 deletions dbm-ui/backend/ticket/builders/common/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,10 @@ def patch_recycle_host_details(self):
return
self.ticket.details["recycle_hosts"] = ResourceHandler.standardized_resource_host(recycle_hosts, bk_biz_id)

def patch_recycle_cluster_details(self):
def patch_recycle_cluster_details(self, role=None):
"""补充集群下架后回收主机信息,在下架类单据一定调用此方法"""
bk_biz_id = self.ticket.bk_biz_id
recycle_hosts = Cluster.get_cluster_related_machines(fetch_cluster_ids(self.ticket.details))
recycle_hosts = Cluster.get_cluster_related_machines(fetch_cluster_ids(self.ticket.details), role)
recycle_hosts = [{"bk_host_id": host.bk_host_id} for host in recycle_hosts]
self.ticket.details["recycle_hosts"] = ResourceHandler.standardized_resource_host(recycle_hosts, bk_biz_id)

Expand Down
15 changes: 9 additions & 6 deletions dbm-ui/backend/ticket/builders/mysql/mysql_fixpoint_rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class FixPointRollbackSerializer(serializers.Serializer):
@classmethod
def validate_rollback_info(cls, rollback_cluster_type, info, now):
# 校验回档集群类型参数
if rollback_cluster_type == RollbackBuildClusterType.BUILD_INTO_NEW_CLUSTER and not info.get("rollback_host"):
if rollback_cluster_type == RollbackBuildClusterType.BUILD_INTO_NEW_CLUSTER and not (
info.get("rollback_host") or info.get("resource_spec")
):
raise serializers.ValidationError(_("请提供部署新集群的机器信息"))

if rollback_cluster_type != RollbackBuildClusterType.BUILD_INTO_NEW_CLUSTER and not info.get(
Expand Down Expand Up @@ -102,8 +104,9 @@ def format_ticket_data(self):
info["rollback_type"] = f"{info['backup_source'].upper()}_AND_{op_type}"
# 格式化定点回档部署的信息
if rollback_cluster_type == RollbackBuildClusterType.BUILD_INTO_NEW_CLUSTER:
info["rollback_ip"] = info["rollback_host"]["ip"]
info["bk_rollback"] = info.pop("rollback_host")
if self.ticket_data["ip_source"] == IpSource.MANUAL_INPUT:
info["rollback_ip"] = info["rollback_host"]["ip"]
info["bk_rollback"] = info.pop("rollback_host")
else:
info["rollback_cluster_id"] = info.pop("target_cluster_id")

Expand All @@ -122,12 +125,12 @@ def format(self):
def post_callback(self):
next_flow = self.ticket.next_flow()
for info in next_flow.details["ticket_data"]["infos"]:
info["rollback_ip"] = info["rollback_host"]["ip"]
info["bk_rollback"] = info.pop("rollback_host")
info["rollback_ip"] = info["rollback_host"][0]["ip"]
info["bk_rollback"] = info.pop("rollback_host")[0]
next_flow.save(update_fields=["details"])


@builders.BuilderFactory.register(TicketType.MYSQL_ROLLBACK_CLUSTER)
@builders.BuilderFactory.register(TicketType.MYSQL_ROLLBACK_CLUSTER, is_apply=True)
class MysqlFixPointRollbackFlowBuilder(BaseMySQLTicketFlowBuilder):
serializer = MySQLFixPointRollbackDetailSerializer
inner_flow_builder = MySQLFixPointRollbackFlowParamBuilder
Expand Down
Loading

0 comments on commit d910c3f

Please sign in to comment.