Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): process_todo、batch_process_todo接口优化 #8303 #8304

Open
wants to merge 1 commit into
base: v1.5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dbm-ui/backend/ticket/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,13 @@ def query_ticket_flows_describe(cls, bk_biz_id, db_type, ticket_types=None):
flow_desc_list.append(flow_config_info)

return flow_desc_list

@classmethod
def process_single_todo(cls, operation, act, username):
"""
处理单个待办的辅助函数
"""
todo_id = operation["todo_id"]
params = operation["params"]
todo = Todo.objects.get(id=todo_id)
TodoActorFactory.actor(todo).process(username, act, params)
13 changes: 8 additions & 5 deletions dbm-ui/backend/ticket/models/ticket.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,25 @@ def current_flow(self) -> Flow:
1. 取 TicketFlow 中最后一个 flow_obj_id 非空的流程
2. 若 TicketFlow 中都流程都为空,则代表整个单据未开始,取第一个流程
"""
if Flow.objects.filter(ticket=self).exclude(status=TicketFlowStatus.PENDING).exists():
return Flow.objects.filter(ticket=self).exclude(status=TicketFlowStatus.PENDING).last()
non_pending_flows = [flow for flow in self.flows.all() if flow.status != TicketFlowStatus.PENDING]
if non_pending_flows:
# 返回最后一个符合条件的 Flow 对象
return non_pending_flows[-1]
# 初始化时,当前节点和下一个节点为同一个
return self.next_flow()

def next_flow(self) -> Flow:
"""
下一个流程,即 TicketFlow 中第一个为PENDING的流程
"""
next_flows = Flow.objects.filter(ticket=self, status=TicketFlowStatus.PENDING)
next_flows = [flow for flow in self.flows.all() if flow.status == TicketFlowStatus.PENDING]

# 支持跳过人工审批和确认环节
if env.ITSM_FLOW_SKIP:
next_flows = next_flows.exclude(flow_type__in=[FlowType.BK_ITSM, FlowType.PAUSE])
next_flows = [flow for flow in next_flows if flow.flow_type not in [FlowType.BK_ITSM, FlowType.PAUSE]]

return next_flows.first()
# 返回第一个符合条件的 Flow 对象
return next_flows[0] if next_flows else None

@classmethod
def create_ticket(
Expand Down
22 changes: 12 additions & 10 deletions dbm-ui/backend/ticket/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import asyncio
import operator
from functools import reduce

Expand Down Expand Up @@ -77,7 +78,7 @@
UpdateTicketFlowConfigSerializer,
)
from backend.ticket.todos import TodoActorFactory
from backend.utils.batch_request import request_multi_thread
from backend.utils.batch_request import process_operations_async, request_multi_thread

TICKET_TAG = "ticket"

Expand Down Expand Up @@ -221,6 +222,7 @@ def perform_create(self, serializer):
builder.patch_ticket_detail()
builder.init_ticket_flows()

ticket = Ticket.objects.prefetch_related("flows").get(pk=ticket.pk)
TicketFlowManager(ticket=ticket).run_next_flow()

@swagger_auto_schema(
Expand Down Expand Up @@ -437,7 +439,9 @@ def process_todo(self, request, *args, **kwargs):

validated_data = self.params_validate(self.get_serializer_class())

todo = ticket.todo_of_ticket.get(id=validated_data["todo_id"])
todo = (
Todo.objects.select_related("ticket").prefetch_related("ticket__flows").get(id=validated_data["todo_id"])
)
TodoActorFactory.actor(todo).process(request.user.username, validated_data["action"], validated_data["params"])

return Response(TodoSerializer(ticket.todo_of_ticket.all(), many=True).data)
Expand Down Expand Up @@ -644,15 +648,13 @@ def batch_process_todo(self, request, *args, **kwargs):
"""
validated_data = self.params_validate(self.get_serializer_class())
act = validated_data["action"]
operations = validated_data["operations"]

# 批量处理待办操作
results = []
for operation in validated_data["operations"]:
todo_id = operation["todo_id"]
params = operation["params"]
todo = Todo.objects.get(id=todo_id)
TodoActorFactory.actor(todo).process(request.user.username, act, params)
results.append(todo)
# 执行异步处理
asyncio.run(process_operations_async(operations, act, request.user.username))

# 获取处理后的待办事项
results = [Todo.objects.get(id=operation["todo_id"]) for operation in operations]

# 使用 TodoSerializer 序列化响应数据
return Response(TodoSerializer(results, many=True).data)
24 changes: 24 additions & 0 deletions dbm-ui/backend/utils/batch_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import asyncio
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
from copy import deepcopy
from multiprocessing.pool import ThreadPool
Expand Down Expand Up @@ -226,3 +228,25 @@ def wrapper(wrapped, instance, args, kwargs):
return {"data": data, "total": len(data)}

return wrapper


async def process_operations_async(operations, act, username):
"""
异步处理待办操作
@param operations: 待办操作列表 [{"todo_id":"todo_id","params":{}}}]
@param act: 统一待办操作类型
@param username: 处理人
"""
from backend.ticket.handler import TicketHandler

loop = asyncio.get_running_loop()

# 自定义线程池
with concurrent.futures.ThreadPoolExecutor() as executor:
# 将所有的任务提交到线程池
tasks = [
loop.run_in_executor(executor, TicketHandler.process_single_todo, operation, act, username)
for operation in operations
]
# 等待所有任务完成
await asyncio.gather(*tasks)