Skip to content

Commit

Permalink
feat(backend): 单据状态细化 #6755
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud committed Oct 12, 2024
1 parent a3b62cc commit f299073
Show file tree
Hide file tree
Showing 21 changed files with 566 additions and 310 deletions.
4 changes: 2 additions & 2 deletions dbm-ui/backend/db_services/bigdata/resources/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from backend.db_proxy.models import ClusterExtension
from backend.db_services.dbbase.resources import query
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.ticket.constants import TicketFlowStatus
from backend.ticket.constants import TICKET_RUNNING_STATUS
from backend.ticket.models import InstanceOperateRecord
from backend.utils.time import datetime2str

Expand Down Expand Up @@ -65,7 +65,7 @@ def _filter_instance_hook(cls, bk_biz_id, query_params, instances, **kwargs):

# 获取实例的操作与实例记录
records = InstanceOperateRecord.objects.filter(
instance_id__in=instance_ids, ticket__status=TicketFlowStatus.RUNNING
instance_id__in=instance_ids, ticket__status__in=TICKET_RUNNING_STATUS
)
instance_operate_records_map: Dict[int, List] = defaultdict(list)
for record in records:
Expand Down
4 changes: 2 additions & 2 deletions dbm-ui/backend/db_services/mysql/dumper/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from backend.db_meta.enums import InstanceInnerRole
from backend.db_meta.models import Cluster
from backend.db_services.mysql.dumper.models import DumperSubscribeConfig
from backend.ticket.constants import FlowType, TicketFlowStatus, TicketStatus, TicketType
from backend.ticket.constants import TICKET_RUNNING_STATUS, FlowType, TicketFlowStatus, TicketStatus, TicketType
from backend.ticket.models import Flow, Ticket


Expand Down Expand Up @@ -66,7 +66,7 @@ def patch_dumper_list_info(cls, dumper_results: List[Dict], bk_biz_id: int = 0,
dumper_ticket_types.remove(TicketType.TBINLOGDUMPER_INSTALL)
dumper_ticket_types.extend([TicketType.MYSQL_MASTER_SLAVE_SWITCH, TicketType.MYSQL_MASTER_FAIL_OVER])
active_tickets = Ticket.objects.filter(
bk_biz_id=bk_biz_id, status=TicketStatus.RUNNING, ticket_type__in=dumper_ticket_types
bk_biz_id=bk_biz_id, status__in=TICKET_RUNNING_STATUS, ticket_type__in=dumper_ticket_types
)
# 获取每个dumper单据状态与id的映射
dumper_inst_id__ticket: Dict[int, str] = {}
Expand Down
6 changes: 3 additions & 3 deletions dbm-ui/backend/tests/ticket/test_doris_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
SCALEUP_POOL_TICKET_DATA,
)
from backend.tests.ticket.server_base import TestFlowBase
from backend.ticket.constants import TicketFlowStatus, TicketStatus
from backend.ticket.constants import TicketFlowStatus

logger = logging.getLogger("test")
pytestmark = pytest.mark.django_db
client = APIClient()

INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED]
CHANGED_MOCK_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED, TicketFlowStatus.RUNNING]
INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketFlowStatus.SUCCEEDED]
CHANGED_MOCK_STATUS = [TicketFlowStatus.SKIPPED, TicketFlowStatus.SUCCEEDED, TicketFlowStatus.RUNNING]


@pytest.fixture(autouse=True) # autouse=True 会自动应用这个fixture到所有的测试中
Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/tests/ticket/test_mongodb_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
pytestmark = pytest.mark.django_db
client = APIClient()

INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED]
INITIAL_FLOW_FINISHED_STATUS = [TicketFlowStatus.SKIPPED, TicketFlowStatus.SUCCEEDED]
CHANGED_MOCK_STATUS = [TicketFlowStatus.SKIPPED, TicketStatus.SUCCEEDED, TicketFlowStatus.RUNNING]


Expand Down
218 changes: 153 additions & 65 deletions dbm-ui/backend/ticket/constants.py

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions dbm-ui/backend/ticket/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
from django_filters import rest_framework as filters

from backend.db_meta.models import Cluster
from backend.ticket.constants import TODO_RUNNING_STATUS
from backend.ticket.models import ClusterOperateRecord, Ticket


class TicketListFilter(filters.FilterSet):
remark = filters.CharFilter(field_name="remark", lookup_expr="icontains", label=_("备注"))
cluster = filters.CharFilter(field_name="cluster", method="filter_cluster", label=_("集群域名"))
todo = filters.CharFilter(field_name="todo", method="filter_todo", label=_("代办状态"))

class Meta:
model = Ticket
Expand All @@ -35,3 +37,12 @@ def filter_cluster(self, queryset, name, value):
clusters = Cluster.objects.filter(immute_domain__icontains=value).values_list("id", flat=True)
records = ClusterOperateRecord.objects.filter(cluster_id__in=clusters).values_list("id", flat=True)
return queryset.filter(clusteroperaterecord__in=records)

def filter_todo(self, queryset, name, value):
user = self.request.user.username
if value == "running":
return queryset.filter(
todo_of_ticket__operators__contains=user, todo_of_ticket__status__in=TODO_RUNNING_STATUS
)
else:
return queryset.filter(todo_of_ticket__done_by=user)
20 changes: 15 additions & 5 deletions dbm-ui/backend/ticket/flow_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
FLOW_FINISHED_STATUS,
FLOW_NOT_EXECUTE_STATUS,
FLOW_TYPE__EXPIRE_TYPE_CONFIG,
TICKET_EXPIRE_DEFAULT_CONFIG,
FlowContext,
FlowErrCode,
FlowTypeConfig,
TicketFlowStatus,
)
from backend.ticket.models import ClusterOperateRecord, Flow, InstanceOperateRecord, TicketFlowsConfig
from backend.ticket.models import ClusterOperateRecord, Flow, InstanceOperateRecord, TicketFlowsConfig, Todo

logger = logging.getLogger("root")

Expand Down Expand Up @@ -70,7 +71,7 @@ def status(self) -> str:

if not self.flow_obj.flow_obj_id:
# 任务流程未创建时未PENDING状态
return constants.TicketStatus.PENDING
return constants.TicketFlowStatus.PENDING

# 其他情况暂时认为在PENDING状态
return TicketFlowStatus.PENDING
Expand Down Expand Up @@ -144,10 +145,11 @@ def flush_error_status_handler(self):
def flush_revoke_status_handler(self, operator):
"""终止节点,更新相关状态和错误信息"""
self.flow_obj.status = TicketFlowStatus.TERMINATED
self.flow_obj.err_code = FlowErrCode.GENERAL_ERROR
if operator == DEFAULT_SYSTEM_USER:
self.flow_obj.err_code = FlowErrCode.SYSTEM_TERMINATED_ERROR
self.flow_obj.context = {FlowContext.EXPIRE_TIME: self.get_current_config_expire_time()}
else:
self.flow_obj.err_code = FlowErrCode.GENERAL_ERROR
self.flow_obj.save(update_fields=["status", "err_code", "context", "update_at"])
# 更新操作者
self.ticket.updater = operator
Expand All @@ -157,8 +159,9 @@ def get_current_config_expire_time(self):
"""获取当前配置的flow过期时间"""
if self.flow_obj.flow_type not in FLOW_TYPE__EXPIRE_TYPE_CONFIG:
return -1
config = TicketFlowsConfig.get_config(ticket_type=self.ticket.ticket_type)
expire_time = config[FlowTypeConfig.EXPIRE_CONFIG][FLOW_TYPE__EXPIRE_TYPE_CONFIG[self.flow_obj.flow_type]]
config = TicketFlowsConfig.get_config(ticket_type=self.ticket.ticket_type).configs
expire_config = config.get(FlowTypeConfig.EXPIRE_CONFIG, TICKET_EXPIRE_DEFAULT_CONFIG)
expire_time = expire_config[FLOW_TYPE__EXPIRE_TYPE_CONFIG[self.flow_obj.flow_type]]
return expire_time

def create_operate_records(self, object_key, record_model, object_ids):
Expand Down Expand Up @@ -259,4 +262,11 @@ def _retry(self) -> Any:
self.run()

def _revoke(self, operator) -> Any:
# 停止相关联的todo
from backend.ticket.todos import ActionType, TodoActorFactory

todos = Todo.objects.filter(ticket=self.ticket, flow=self.flow_obj)
for todo in todos:
TodoActorFactory.actor(todo).process(operator, ActionType.TERMINATE, params={})
# 刷新flow和单据状态 --> 终止
self.flush_revoke_status_handler(operator)
61 changes: 46 additions & 15 deletions dbm-ui/backend/ticket/flow_manager/inner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@
from backend.flow.models import FlowTree
from backend.ticket import constants
from backend.ticket.builders.common.base import fetch_cluster_ids
from backend.ticket.constants import BAMBOO_STATE__TICKET_STATE_MAP, FlowCallbackType, TicketType
from backend.ticket.constants import (
BAMBOO_STATE__TICKET_STATE_MAP,
FlowCallbackType,
TicketFlowStatus,
TicketType,
TodoStatus,
TodoType,
)
from backend.ticket.flow_manager.base import BaseTicketFlow
from backend.ticket.models import Flow
from backend.ticket.models import Flow, Todo
from backend.ticket.todos import BaseTodoContext
from backend.utils.basic import generate_root_id
from backend.utils.time import datetime2str

Expand Down Expand Up @@ -82,22 +90,39 @@ def _end_time(self) -> Union[str, datetime]:
@property
def _summary(self) -> str:
# TODO 可以给出具体失败的节点和原因
return _("任务{status_display}").format(status_display=constants.TicketStatus.get_choice_label(self.status))
return _("任务{status_display}").format(status_display=constants.TicketFlowStatus.get_choice_label(self.status))

@property
def _status(self) -> str:
# 如果未找到流程树,则直接取flow_obj的status
# 查询流程树状态,如果未找到则直接取flow_obj的status
if not self.flow_tree:
return self.flow_obj.status
status = BAMBOO_STATE__TICKET_STATE_MAP.get(self.flow_tree.status, constants.TicketFlowStatus.RUNNING)

status = BAMBOO_STATE__TICKET_STATE_MAP.get(self.flow_tree.status, constants.TicketStatus.RUNNING)
self.flow_obj.update_status(status)
return status
# 如果任务失败,则变更todo状态
todo_status = TodoStatus.TODO if status == TicketFlowStatus.FAILED else TodoStatus.DONE_SUCCESS
fail_todo = self.flow_obj.todo_of_flow.filter(type=TodoType.INNER_FAILED)
if fail_todo.exists() and fail_todo.first().status != todo_status:
fail_todo.update(status=todo_status)

return self.flow_obj.update_status(status)

@property
def _url(self) -> str:
return f"{env.BK_SAAS_HOST}/{self.ticket.bk_biz_id}/task-history/detail/{self.root_id}"

def create_inner_todo(self):
# 创建一条todo记录,在失败时变更为TODO状态
Todo.objects.create(
name=_("【{}】单据任务执行失败,待处理").format(self.ticket.get_ticket_type_display()),
flow=self.flow_obj,
ticket=self.ticket,
type=TodoType.INNER_FAILED,
operators=[self.ticket.creator],
context=BaseTodoContext(self.flow_obj.id, self.ticket.id).to_dict(),
status=TodoStatus.DONE_SUCCESS,
)

def check_exclusive_operations(self):
"""判断执行互斥"""
# TODO: 目前来说,执行互斥对于同时提单或者同时重试的操作是防不住的。
Expand All @@ -122,10 +147,6 @@ def check_exclusive_operations(self):
cluster_ids=cluster_ids, ticket_type=ticket_type, exclude_ticket_ids=[self.ticket.id]
)

def handle_exclusive_error(self):
"""处理执行互斥后重试的逻辑"""
pass

def callback(self, callback_type: FlowCallbackType) -> None:
"""
inner节点独有的钩子函数,执行前置/后继流程节点动作
Expand All @@ -141,6 +162,7 @@ def run(self) -> None:
# 获取or生成inner flow的root id
root_id = self.flow_obj.flow_obj_id or generate_root_id()
try:
self.create_inner_todo()
# 由于 _run 执行后可能会触发信号,导致 current_flow 的误判,因此需提前写入 flow_obj_id
self.run_status_handler(root_id)
# 判断执行互斥
Expand Down Expand Up @@ -184,6 +206,15 @@ def _retry(self) -> Any:
)
super()._retry()

def _revoke(self, operator) -> Any:
# 终止运行的pipeline
from backend.db_services.taskflow.handlers import TaskFlowHandler

if FlowTree.objects.filter(root_id=self.flow_obj.flow_obj_id).exists():
TaskFlowHandler(self.flow_obj.flow_obj_id).revoke_pipeline()
# 流转flow的终止状态
super()._revoke(operator)


class QuickInnerFlow(InnerFlow):
"""
Expand All @@ -193,7 +224,7 @@ class QuickInnerFlow(InnerFlow):

@property
def _status(self) -> str:
return constants.TicketStatus.SUCCEEDED
return constants.TicketFlowStatus.SUCCEEDED

@property
def _summary(self) -> str:
Expand All @@ -218,7 +249,7 @@ class IgnoreResultInnerFlow(InnerFlow):
@property
def _summary(self) -> str:
return _("(执行结果可忽略)任务状态: {status_display}").format(
status_display=constants.TicketStatus.get_choice_label(self._raw_status)
status_display=constants.TicketFlowStatus.get_choice_label(self._raw_status)
)

@property
Expand All @@ -228,7 +259,7 @@ def _raw_status(self) -> str:
@property
def _status(self) -> str:
status = self._raw_status
if status in [constants.TicketStatus.SUCCEEDED, constants.TicketStatus.REVOKED, constants.TicketStatus.FAILED]:
return constants.TicketStatus.SUCCEEDED
if status in [constants.TicketFlowStatus.SUCCEEDED, *constants.TICKET_FAILED_STATUS]:
return constants.TicketFlowStatus.SUCCEEDED

return status
47 changes: 34 additions & 13 deletions dbm-ui/backend/ticket/flow_manager/itsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
from backend.components import ItsmApi
from backend.components.itsm.constants import ItsmTicketStatus
from backend.exceptions import ApiResultError
from backend.ticket.constants import FlowMsgStatus, FlowMsgType, TicketFlowStatus, TicketStatus
from backend.ticket.constants import FlowMsgStatus, FlowMsgType, TicketFlowStatus, TicketStatus, TodoStatus, TodoType
from backend.ticket.flow_manager.base import BaseTicketFlow
from backend.ticket.models import Flow
from backend.ticket.models import Flow, Todo
from backend.ticket.tasks.ticket_tasks import send_msg_for_flow
from backend.ticket.todos.itsm_todo import ItsmTodoContext
from backend.utils.time import datetime2str, standardized_time_str


Expand Down Expand Up @@ -56,24 +57,25 @@ def _end_time(self) -> Union[datetime, Any]:
return self.flow_obj.update_at

@property
def _summary(self) -> str:
def _summary(self) -> dict:
try:
logs = ItsmApi.get_ticket_logs({"sn": [self.flow_obj.flow_obj_id]})
except ApiResultError:
return _("未知单据")

# 获取单据审批状态
current_status = self.ticket_approval_result["current_status"]
approve_result = self.ticket_approval_result["approve_result"]
summary = {"status": current_status, "approve_result": approve_result}

# 目前审批流程是固定的,取流程中第三个节点的日志作为概览即可
try:
return logs["logs"][2]["message"]
summary.update(operator=logs["logs"][2]["operator"], message=logs["logs"][2]["message"])
except (IndexError, KeyError):
# 异常时根据状态取默认的概览
status_summary_map = {
TicketStatus.RUNNING.value: _("审批中"),
TicketStatus.SUCCEEDED.value: _("已通过"),
TicketStatus.REVOKED.value: _("已撤销"),
TicketStatus.FAILED.value: _("被拒绝"),
TicketStatus.TERMINATED.value: _("已终止"),
}
return status_summary_map.get(self.status, "")
msg = TicketStatus.get_choice_label(self.status)
summary.update(operator=logs["logs"][-1]["operator"], status=self.status, message=msg)
return summary

@property
def _status(self) -> str:
Expand All @@ -85,15 +87,19 @@ def _status(self) -> str:
return self.flow_obj.update_status(TicketFlowStatus.RUNNING)
# 撤单
elif current_status == ItsmTicketStatus.REVOKED:
self.flow_obj.todo_of_flow.update(status=TodoStatus.DONE_FAILED)
return self.flow_obj.update_status(TicketFlowStatus.TERMINATED)
# 审批通过
elif current_status == ItsmTicketStatus.FINISHED and approve_result:
self.flow_obj.todo_of_flow.update(status=TodoStatus.DONE_SUCCESS)
return self.flow_obj.update_status(TicketFlowStatus.SUCCEEDED)
# 审批拒绝
elif current_status == ItsmTicketStatus.FINISHED and not approve_result:
self.flow_obj.todo_of_flow.update(status=TodoStatus.DONE_FAILED)
return self.flow_obj.update_status(TicketFlowStatus.TERMINATED)
# 终止
elif current_status == ItsmTicketStatus.TERMINATED:
self.flow_obj.todo_of_flow.update(status=TodoStatus.DONE_FAILED)
return self.flow_obj.update_status(TicketFlowStatus.TERMINATED)

@property
Expand All @@ -104,9 +110,20 @@ def _url(self) -> str:
return ""

def _run(self) -> str:
itsm_fields = {f["key"]: f["value"] for f in self.flow_obj.details["fields"]}
# 创建审批todo
operators = itsm_fields["approver"].split(",")
Todo.objects.create(
name=_("【{}】单据等待审批").format(self.ticket.get_ticket_type_display()),
flow=self.flow_obj,
ticket=self.ticket,
type=TodoType.ITSM,
operators=operators,
context=ItsmTodoContext(self.flow_obj.id, self.ticket.id).to_dict(),
)
# 创建单据
data = ItsmApi.create_ticket(self.flow_obj.details)
# 异步发送待审批消息
itsm_fields = {f["key"]: f["value"] for f in self.flow_obj.details["fields"]}
send_msg_for_flow.apply_async(
kwargs={
"flow_id": self.flow_obj.id,
Expand All @@ -117,3 +134,7 @@ def _run(self) -> str:
}
)
return data["sn"]

def _revoke(self, operator) -> Any:
# 父类通过触发todo的终止可以终止itsm单据
super()._revoke(operator)
Loading

0 comments on commit f299073

Please sign in to comment.