From b42efafd5f25cd69089f27c22effdf93079e110c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B8=A0=E7=A3=8A?= Date: Thu, 25 Apr 2024 18:07:51 +0800 Subject: [PATCH] test check --- handler/checker/check_handler.py | 7 ++++- handler/checker/check_task.py | 30 +++++++++++++++++--- handler/checker/step/data_size.py | 2 +- handler/checker/step/get_system_parameter.py | 2 +- handler/checker/step/sql.py | 4 +-- handler/checker/step/ssh.py | 2 +- handler/checker/step/stepbase.py | 11 +++---- 7 files changed, 43 insertions(+), 15 deletions(-) diff --git a/handler/checker/check_handler.py b/handler/checker/check_handler.py index c7477a52..7bd249f3 100644 --- a/handler/checker/check_handler.py +++ b/handler/checker/check_handler.py @@ -17,6 +17,8 @@ """ import os +import time + import yaml from common.ob_connector import OBConnector @@ -229,6 +231,7 @@ def execute_one(self, task_name): raise CheckException("execute_one Exception : {0}".format(e)) def execute(self): + start_time = time.time() try: self.stdio.verbose( "execute_all_tasks. the number of tasks is {0} ,tasks is {1}".format(len(self.tasks.keys()), @@ -244,4 +247,6 @@ def execute(self): except CheckrReportException as e: self.stdio.error("Report error :{0}".format(e)) except Exception as e: - self.stdio.error("Internal error :{0}".format(e)) \ No newline at end of file + self.stdio.error("Internal error :{0}".format(e)) + end_time = time.time() + print("Total cost time is {0} s".format((end_time - start_time))) \ No newline at end of file diff --git a/handler/checker/check_task.py b/handler/checker/check_task.py index 9605bfcf..b8db28e8 100644 --- a/handler/checker/check_task.py +++ b/handler/checker/check_task.py @@ -15,6 +15,9 @@ @file: check_task.py @desc: """ +import threading + +from common.ob_connector import OBConnector from handler.checker.check_exception import StepResultFailException, \ StepExecuteFailException, StepResultFalseException, TaskException from handler.checker.step.stepbase import StepBase @@ -39,15 +42,30 @@ def execute(self): self.stdio.verbose("task_base execute") steps_nu = filter_by_version(self.task, self.cluster, self.stdio) if steps_nu < 0: - self.stdio.warn("{0} Unadapted by version. SKIP".format(self.task['name'])) + self.stdio.warn("Unadapted by version. SKIP") self.report.add("Unadapted by version. SKIP", "warning") return "Unadapted by version.SKIP" self.stdio.verbose("filter_by_version is return {0}".format(steps_nu)) if len(self.nodes) == 0: raise Exception("node is not exist") - + # TODO: 这里的逻辑需要优化,如果一个节点执行失败了,那么后续的步骤就不会被执行了。 + work_threads = [] for node in self.nodes: + obConnector = OBConnector(ip=self.cluster.get("db_host"), + port=self.cluster.get("db_port"), + username=self.cluster.get("tenant_sys").get("user"), + password=self.cluster.get("tenant_sys").get("password"), + stdio=self.stdio, + timeout=10000) + t = threading.Thread(target=self.execute_one_node, args=(steps_nu,node,obConnector)) + work_threads.append(t) + t.start() + for t in work_threads: + t.join() + self.stdio.verbose("task execute end") + def execute_one_node(self,steps_nu,node,obConnector): + try: self.stdio.verbose("run task in node: {0}".format(StringUtils.node_cut_passwd_for_log(node))) steps = self.task[steps_nu] nu = 1 @@ -56,7 +74,7 @@ def execute(self): self.stdio.verbose("step nu: {0}".format(nu)) if len(self.cluster) == 0: raise Exception("cluster is not exist") - step_run = StepBase(self.context, step, node, self.cluster, self.task_variable_dict) + step_run = StepBase(self.context, step, node, self.cluster, self.task_variable_dict,obConnector) self.stdio.verbose("step nu: {0} initted, to execute".format(nu)) step_run.execute(self.report) self.task_variable_dict = step_run.update_task_variable_dict() @@ -78,4 +96,8 @@ def execute(self): self.stdio.verbose("step nu: {0} execute end ".format(nu)) nu = nu + 1 - self.stdio.verbose("task execute end") + except Exception as e: + self.stdio.error("TaskBase execute Exception: {0}".format(e)) + raise e + + diff --git a/handler/checker/step/data_size.py b/handler/checker/step/data_size.py index b8d0ff0c..66dc2ff9 100644 --- a/handler/checker/step/data_size.py +++ b/handler/checker/step/data_size.py @@ -23,7 +23,7 @@ class DataSizeHandler: - def __init__(self,context, step, node, task_variable_dict): + def __init__(self,context, step, node, task_variable_dict,obConnector): self.context = context self.stdio = context.stdio self.stdio.verbose("init DataSizeHandler") diff --git a/handler/checker/step/get_system_parameter.py b/handler/checker/step/get_system_parameter.py index af3341c1..29713ee3 100644 --- a/handler/checker/step/get_system_parameter.py +++ b/handler/checker/step/get_system_parameter.py @@ -23,7 +23,7 @@ class GetSystemParameterHandler: - def __init__(self,context, step, node, task_variable_dict): + def __init__(self,context, step, node, task_variable_dict,obConnector): self.context = context self.stdio = context.stdio self.stdio.verbose("init GetSystemParameterHandler") diff --git a/handler/checker/step/sql.py b/handler/checker/step/sql.py index c55c50a7..7d846626 100644 --- a/handler/checker/step/sql.py +++ b/handler/checker/step/sql.py @@ -23,7 +23,7 @@ class StepSQLHandler: - def __init__(self,context, step, task_variable_dict): + def __init__(self,context, step, task_variable_dict,obConnector): try: self.context = context self.stdio = context.stdio @@ -32,7 +32,7 @@ def __init__(self,context, step, task_variable_dict): self.tenant_mode = None self.sys_database = None self.database = None - self.ob_connector=self.context.get_variable('check_obConnector') + self.ob_connector=obConnector if self.ob_connector is None: raise Exception("self.ob_connector is None.") except Exception as e: diff --git a/handler/checker/step/ssh.py b/handler/checker/step/ssh.py index 282477e2..4e4c6368 100644 --- a/handler/checker/step/ssh.py +++ b/handler/checker/step/ssh.py @@ -23,7 +23,7 @@ class SshHandler: - def __init__(self,context, step, node, task_variable_dict): + def __init__(self,context, step, node, task_variable_dict,obConnector): self.context = context self.stdio = context.stdio self.ssh_report_value = None diff --git a/handler/checker/step/stepbase.py b/handler/checker/step/stepbase.py index 3ec68d8e..aa479251 100644 --- a/handler/checker/step/stepbase.py +++ b/handler/checker/step/stepbase.py @@ -27,7 +27,7 @@ class StepBase(object): - def __init__(self, context, step, node, cluster, task_variable_dict): + def __init__(self, context, step, node, cluster, task_variable_dict,obConnector): self.context = context self.stdio = context.stdio self.step = step @@ -35,6 +35,7 @@ def __init__(self, context, step, node, cluster, task_variable_dict): self.cluster = cluster self.task_variable_dict = {} self.task_variable_dict = task_variable_dict + self.obConnector=obConnector def execute(self, report): no_cluster_name_msg = "(Please set ob_cluster_name or obproxy_cluster_name)" @@ -54,13 +55,13 @@ def execute(self, report): if "type" not in self.step: raise StepExecuteFailException("Missing field :type") if self.step["type"] == "get_system_parameter": - handler = GetSystemParameterHandler(self.context, self.step, self.node, self.task_variable_dict) + handler = GetSystemParameterHandler(self.context, self.step, self.node, self.task_variable_dict,self.obConnector) elif self.step["type"] == "ssh": - handler = SshHandler(self.context, self.step, self.node, self.task_variable_dict) + handler = SshHandler(self.context, self.step, self.node, self.task_variable_dict,self.obConnector) elif self.step["type"] == "sql": - handler = StepSQLHandler(self.context, self.step, task_variable_dict=self.task_variable_dict) + handler = StepSQLHandler(self.context, self.step, task_variable_dict=self.task_variable_dict,obConnector=self.obConnector) elif self.step["type"] == "data_size": - handler = DataSizeHandler(self.context, self.step, self.cluster, self.task_variable_dict) + handler = DataSizeHandler(self.context, self.step, self.cluster, self.task_variable_dict,self.obConnector) else: raise StepExecuteFailException("the type not support: {0}".format(self.step["type"])) self.stdio.verbose("task execute and result")