Skip to content

Commit

Permalink
Merge pull request #180 from wayyoungboy/master
Browse files Browse the repository at this point in the history
update check tool
  • Loading branch information
Teingi authored Apr 28, 2024
2 parents db0ed95 + 2ffbeb8 commit ae31a52
Show file tree
Hide file tree
Showing 36 changed files with 649 additions and 108 deletions.
2 changes: 2 additions & 0 deletions cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ def do_command(self):
return False
cmd = '%s %s' % (self.prev_cmd, base)
ROOT_IO.track_limit += 1
if "main.py" in cmd:
telemetry.work_tag=False
telemetry.push_cmd_info("cmd: {0}. args:{1}".format(cmd,args))
return self.commands[base].init(cmd, args).do_command()

Expand Down
14 changes: 13 additions & 1 deletion common/ob_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class OBConnector(object):
def __init__(self, ip, port, username, password=None, database=None, stdio=None, timeout=10,):
def __init__(self, ip, port, username, password=None, database=None, stdio=None, timeout=30,):
self.ip = str(ip)
self.port = int(port)
self.username = str(username)
Expand Down Expand Up @@ -50,6 +50,18 @@ def _connect_db(self):
self.stdio.verbose("connect databse ...")
except mysql.Error as e:
self.stdio.error("connect OB: {0}:{1} with user {2} failed, error:{3}".format(self.ip, self.port, self.username, e))
return
try:
ob_trx_timeout=self.timeout*1000000
self.execute_sql("SET SESSION ob_trx_timeout={0};".format(ob_trx_timeout))
except Exception as e:
self.stdio.warn("set ob_trx_timeout failed, error:{0}".format(e))
try:
ob_query_timeout=self.timeout*1000000
self.execute_sql("SET SESSION ob_query_timeout={0};".format(ob_query_timeout))
except Exception as e:
self.stdio.warn("set ob_query_timeout failed, error:{0}".format(e))


def execute_sql(self, sql):
if self.conn is None:
Expand Down
12 changes: 9 additions & 3 deletions common/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"""
from common.ssh import SshHelper
from common.tool import StringUtils
from common.command import get_observer_version, get_obproxy_version
from common.command import get_observer_version, get_obproxy_version, get_observer_version_by_sql


def filter_by_version(scene, cluster, stdio=None):
try:
Expand Down Expand Up @@ -59,14 +60,19 @@ def filter_by_version(scene, cluster, stdio=None):
stdio.exception("filter_by_version Exception : {0}".format(e))
raise Exception("filter_by_version Exception : {0}".format(e))

def get_version(nodes, type, stdio=None):
def get_version(nodes, type,cluster, stdio=None):
try:
if len(nodes) < 1:
raise Exception("input nodes is empty, please check your config")
node = nodes[0]
ssh = SshHelper(True, node.get("ip"), node.get("ssh_username"), node.get("ssh_password"), node.get("ssh_port"), node.get("ssh_key_file"), node)
version = ""
if type == "observer":
version = get_observer_version(True, ssh, nodes[0]["home_path"], stdio)
try:
version = get_observer_version_by_sql(cluster,stdio)
except Exception as e:
stdio.warn("get observer version by sql fail, use node ssher to get. Exception:{0}".format(e))
version = get_observer_version(True, ssh, nodes[0]["home_path"], stdio)
elif type == "obproxy":
version = get_obproxy_version(True, ssh, nodes[0]["home_path"], stdio)
return version
Expand Down
1 change: 0 additions & 1 deletion conf/inner_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ check:
report:
report_path: "./check_report/"
export_type: table
package_file: "~/.obdiag/check/check_package.yaml"
tasks_base_path: "~/.obdiag/check/tasks/"
gather:
scenes_base_path: "~/.obdiag/gather/tasks"
Expand Down
77 changes: 74 additions & 3 deletions handler/checker/check_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
"""

import os
import queue
import time

import yaml

from common.ob_connector import OBConnector
from common.ssh import SshHelper
from handler.checker.check_exception import CheckException
from handler.checker.check_report import TaskReport, CheckReport, CheckrReportException
from handler.checker.check_task import TaskBase
Expand All @@ -27,7 +33,6 @@
from common.tool import YamlUtils
from common.tool import StringUtils


class CheckHandler:

def __init__(self, context, check_target_type="observer"):
Expand Down Expand Up @@ -87,6 +92,35 @@ def __init__(self, context, check_target_type="observer"):
# input_param
self.options=self.context.options

# add ssher
new_node=[]
for node in self.nodes:
# add ssher
ssher = None
try:
ssher = SshHelper(True, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
except Exception as e:
self.stdio.warn("StepBase get SshHelper fail on{0} ,Exception: {1}".format(node.get("ip"), e))
node["ssher"] = ssher
new_node.append(node)
self.nodes=new_node
self.version=get_version(self.nodes, self.check_target_type,self.cluster, self.stdio)

# add OBConnectorPool
try:
obConnectorPool=checkOBConnectorPool(context,3,self.cluster)

except Exception as e:
self.stdio.warn("obConnector init error. Error info is {0}".format(e))
finally:
self.context.set_variable('check_obConnector_pool', obConnectorPool)


def handle(self):
try:
package_name = None
Expand Down Expand Up @@ -173,7 +207,7 @@ def execute_one(self, task_name):
# Verify if the version is within a reasonable range
report = TaskReport(self.context,task_name)
if not self.ignore_version:
version = get_version(self.nodes, self.check_target_type, self.stdio)
version = self.version
if version:
self.cluster["version"] = version
self.stdio.verbose("cluster.version is {0}".format(self.cluster["version"]))
Expand Down Expand Up @@ -206,4 +240,41 @@ def execute(self):
except CheckrReportException as e:
self.stdio.error("Report error :{0}".format(e))
except Exception as e:
self.stdio.error("Internal error :{0}".format(e))
self.stdio.error("Internal error :{0}".format(e))

class checkOBConnectorPool:
def __init__(self,context, max_size, cluster):
self.max_size = max_size
self.cluster=cluster
self.connections = queue.Queue(maxsize=max_size)
self.stdio=context.stdio
self.stdio.verbose("obConnectorPool init success!")
try:
for i in range(max_size):
conn = OBConnector(
ip=self.cluster.get("db_host"),
port=self.cluster.get("db_port"),
username=self.cluster.get("tenant_sys").get("user"),
password=self.cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000
)
self.connections.put(conn)
self.stdio.verbose("obConnectorPool init success!")
except Exception as e:
self.stdio.error("obConnectorPool init fail! err:".format(e))


def get_connection(self):
try:
return self.connections.get()
except Exception as e:
self.stdio.error("get connection fail! err:".format(e))
return None

def release_connection(self, conn):

if conn is not None:
self.connections.put(conn)
return

20 changes: 18 additions & 2 deletions handler/checker/check_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
@file: check_task.py
@desc:
"""
import threading

from common.ob_connector import OBConnector
from handler.checker.check_exception import StepResultFailException, \
StepExecuteFailException, StepResultFalseException, TaskException
from handler.checker.step.stepbase import StepBase
Expand Down Expand Up @@ -46,7 +48,18 @@ def execute(self):
self.stdio.verbose("filter_by_version is return {0}".format(steps_nu))
if len(self.nodes) == 0:
raise Exception("node is not exist")
# TODO: 这里的逻辑需要优化,如果一个节点执行失败了,那么后续的步骤就不会被执行了。
work_threads = []
for node in self.nodes:
t = threading.Thread(target=self.execute_one_node, args=(steps_nu,node))
work_threads.append(t)
t.start()
for t in work_threads:
t.join()

self.stdio.verbose("task execute end")
def execute_one_node(self,steps_nu,node):
try:
self.stdio.verbose("run task in node: {0}".format(StringUtils.node_cut_passwd_for_log(node)))
steps = self.task[steps_nu]
nu = 1
Expand All @@ -58,7 +71,6 @@ def execute(self):
step_run = StepBase(self.context, step, node, self.cluster, self.task_variable_dict)
self.stdio.verbose("step nu: {0} initted, to execute".format(nu))
step_run.execute(self.report)
self.task_variable_dict = step_run.update_task_variable_dict()
if "report_type" in step["result"] and step["result"]["report_type"] == "execution":
self.stdio.verbose("report_type stop this step")
return
Expand All @@ -77,4 +89,8 @@ def execute(self):

self.stdio.verbose("step nu: {0} execute end ".format(nu))
nu = nu + 1
self.stdio.verbose("task execute end")
except Exception as e:
self.stdio.error("TaskBase execute Exception: {0}".format(e))
raise e


14 changes: 5 additions & 9 deletions handler/checker/step/data_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,14 @@ def __init__(self,context, step, node, task_variable_dict):
self.task_variable_dict = task_variable_dict

try:
is_ssh = True
self.ssh_helper = SshHelper(is_ssh, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
self.ssh_helper=self.node["ssher"]
if self.ssh_helper is None:
raise Exception("self.ssh_helper is None.")
except Exception as e:
self.stdio.error(
"GetSystemParameterHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
"DataSizeHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
raise Exception(
"GetSystemParameterHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
"DataSizeHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))

# step report
self.parameter = []
Expand Down
10 changes: 3 additions & 7 deletions handler/checker/step/get_system_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,9 @@ def __init__(self,context, step, node, task_variable_dict):
self.task_variable_dict = task_variable_dict

try:
is_ssh = True
self.ssh_helper = SshHelper(is_ssh, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
self.ssh_helper=self.node["ssher"]
if self.ssh_helper is None:
raise Exception("self.ssh_helper is None.")
except Exception as e:
self.stdio.error(
"GetSystemParameterHandler ssh init fail . Please check the NODES conf Exception : {0} .".format(e))
Expand Down
30 changes: 16 additions & 14 deletions handler/checker/step/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@


class StepSQLHandler:
def __init__(self,context, step, ob_cluster, task_variable_dict):
def __init__(self,context, step, task_variable_dict):
try:
self.context = context
self.stdio = context.stdio
self.ob_cluster = ob_cluster
self.ob_cluster_name = ob_cluster.get("cluster_name")
self.ob_cluster = self.context.cluster_config
self.ob_cluster_name = self.ob_cluster.get("cluster_name")
self.tenant_mode = None
self.sys_database = None
self.database = None
self.ob_connector = OBConnector(ip=ob_cluster.get("db_host"),
port=ob_cluster.get("db_port"),
username=ob_cluster.get("tenant_sys").get("user"),
password=ob_cluster.get("tenant_sys").get("password"),
stdio=self.stdio,
timeout=10000)
self.ob_connector_pool=self.context.get_variable('check_obConnector_pool',None)
if self.ob_connector_pool is not None:
self.ob_connector=self.ob_connector_pool.get_connection()
if self.ob_connector is None:
raise Exception("self.ob_connector is None.")
except Exception as e:
self.stdio.error("StepSQLHandler init fail. Please check the OBCLUSTER conf. OBCLUSTER: {0} Exception : {1} .".format(ob_cluster,e))
raise Exception("StepSQLHandler init fail. Please check the OBCLUSTER conf. OBCLUSTER: {0} Exception : {1} .".format(ob_cluster,e))
self.stdio.error("StepSQLHandler init fail. Please check the OBCLUSTER conf. Exception : {0} .".format(e))
raise Exception("StepSQLHandler init fail. Please check the OBCLUSTER conf. Exception : {0} .".format(e))
self.task_variable_dict = task_variable_dict
self.enable_dump_db = False
self.trace_id = None
Expand All @@ -62,8 +61,9 @@ def execute(self):
if data is None:
self.stdio.warn("sql result is None: {0}".format(self.step["sql"]))
self.stdio.verbose("execute_sql result:{0}".format(data))
if len(data) == 0:
if len(data) == 0 or data is None:
self.stdio.warn("sql result is None: {0}".format(self.step["sql"]))
data=""
else:
data = data[0][0]
if data is None:
Expand All @@ -73,8 +73,10 @@ def execute(self):
self.stdio.verbose("sql execute update task_variable_dict: {0} = {1}".format(self.step["result"]["set_value"], Util.convert_to_number(data)))
self.task_variable_dict[self.step["result"]["set_value"]] = Util.convert_to_number(data)
except Exception as e:
self.stdio.error("StepSQLHandler execute Exception: {0}".format(e).strip())
raise StepExecuteFailException("StepSQLHandler execute Exception: {0}".format(e).strip())
self.stdio.error("StepSQLHandler execute Exception: {0}".format(e))
raise StepExecuteFailException("StepSQLHandler execute Exception: {0}".format(e))
finally:
self.ob_connector_pool.release_connection(self.ob_connector)

def update_step_variable_dict(self):
return self.task_variable_dict
11 changes: 3 additions & 8 deletions handler/checker/step/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from handler.checker.check_exception import StepExecuteFailException
from handler.checker.check_report import TaskReport
from common.ssh import SshHelper
from common.tool import StringUtils
from common.tool import Util

Expand All @@ -32,13 +31,9 @@ def __init__(self,context, step, node, task_variable_dict):
self.step = step
self.node = node
try:
is_ssh = True
self.ssh_helper = SshHelper(is_ssh, node.get("ip"),
node.get("ssh_username"),
node.get("ssh_password"),
node.get("ssh_port"),
node.get("ssh_key_file"),
node)
self.ssh_helper=self.node["ssher"]
if self.ssh_helper is None:
raise Exception("self.ssh_helper is None.")
except Exception as e:
self.stdio.error(
"SshHandler init fail. Please check the NODES conf. node: {0}. Exception : {1} .".format(node, e))
Expand Down
7 changes: 3 additions & 4 deletions handler/checker/step/stepbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,16 @@ def execute(self, report):
self.task_variable_dict["remote_ip"] = \
docker.from_env().containers.get(self.node["container_name"]).attrs['NetworkSettings']['Networks'][
'bridge']["IPAddress"]
for key in self.node:
self.task_variable_dict["remote_{0}".format(key)] = self.node[key]

for node in self.node:
self.task_variable_dict["remote_{0}".format(node)] = self.node[node]
if "type" not in self.step:
raise StepExecuteFailException("Missing field :type")
if self.step["type"] == "get_system_parameter":
handler = GetSystemParameterHandler(self.context, self.step, self.node, self.task_variable_dict)
elif self.step["type"] == "ssh":
handler = SshHandler(self.context, self.step, self.node, self.task_variable_dict)
elif self.step["type"] == "sql":
handler = StepSQLHandler(self.context, self.step, self.cluster, self.task_variable_dict)
handler = StepSQLHandler(self.context, self.step, task_variable_dict=self.task_variable_dict)
elif self.step["type"] == "data_size":
handler = DataSizeHandler(self.context, self.step, self.cluster, self.task_variable_dict)
else:
Expand Down
Loading

0 comments on commit ae31a52

Please sign in to comment.