diff --git a/core.py b/core.py index 4efdd5d4..44a08e43 100644 --- a/core.py +++ b/core.py @@ -44,6 +44,7 @@ from handler.gather.gather_plan_monitor import GatherPlanMonitorHandler from handler.gather.gather_scenes import GatherSceneHandler from handler.gather.scenes.list import GatherScenesListHandler +from handler.gather.gather_tabledump import GatherTableDumpHandler from telemetry.telemetry import telemetry from update.update import UpdateHandler from colorama import Fore, Style @@ -230,6 +231,9 @@ def gather_function(self, function_type, opt): elif function_type == 'gather_ash_report': handler = GatherAshReportHandler(self.context) return handler.handle() + elif function_type == 'gather_tabledump': + handler = GatherTableDumpHandler(self.context) + return handler.handle() else: self._call_stdio('error', 'Not support gather function: {0}'.format(function_type)) return False diff --git a/diag_cmd.py b/diag_cmd.py index 33d367c5..5fb72c3e 100644 --- a/diag_cmd.py +++ b/diag_cmd.py @@ -582,6 +582,25 @@ def _do_command(self, obdiag): return obdiag.gather_function('gather_ash_report', self.opts) +class ObdiagGatherTableDumpHandler(ObdiagOriginCommand): + + def __init__(self): + super(ObdiagGatherTableDumpHandler, self).__init__('tabledump', 'gather tabledump') + self.parser.add_option('--database', type='string', help="Specifies the name of the database to connect to.") + self.parser.add_option('--table', type='string', help="Specifies the name of the table in the database to operate on.") + self.parser.add_option('--user', type='string', help="The username to use for the database connection.") + self.parser.add_option('--password', type='string', help="The password for the database user. If not specified, an attempt will be made to connect without a password.", default='') + self.parser.add_option('--store_dir', type='string', help='the dir to store gather result, current dir by default.', default='./gather_report') + self.parser.add_option('-c', type='string', help='obdiag custom config', default=os.path.expanduser('~/.obdiag/config.yml')) + + def init(self, cmd, args): + super(ObdiagGatherTableDumpHandler, self).init(cmd, args) + return self + + def _do_command(self, obdiag): + return obdiag.gather_function('gather_tabledump', self.opts) + + class ObdiagAnalyzeLogCommand(ObdiagOriginCommand): def __init__(self): @@ -738,6 +757,7 @@ def __init__(self): self.register_command(ObdiagGatherObproxyLogCommand()) self.register_command(ObdiagGatherSceneCommand()) self.register_command(ObdiagGatherAshReportCommand()) + self.register_command(ObdiagGatherTableDumpHandler()) class ObdiagGatherSceneCommand(MajorCommand): diff --git a/handler/gather/gather_tabledump.py b/handler/gather/gather_tabledump.py new file mode 100644 index 00000000..c0078824 --- /dev/null +++ b/handler/gather/gather_tabledump.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -* +# Copyright (c) 2022 OceanBase +# OceanBase Diagnostic Tool is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +@time: 2024/07/08 +@file: gather_tabledump.py +@desc: +""" + +import os +from stdio import SafeStdio +from common.ob_connector import OBConnector +from common.tool import StringUtils +from common.command import get_observer_version +from colorama import Fore, Style +from common.tool import Util +from common.tool import TimeUtils +from tabulate import tabulate +from handler.checker.check_exception import CheckException +from colorama import Fore, Style + + +class GatherTableDumpHandler(SafeStdio): + + def __init__(self, context, task_type="observer", export_report_path="./gather_report"): + self.context = context + self.stdio = context.stdio + self.report = None + self.report_path = None + self.ob_cluster = {} + self.ob_connector = {} + self.tenant_connector = {} + self.database = None + self.table = None + self.result_list = [] + self.export_report_path = export_report_path + try: + if not os.path.exists(export_report_path): + os.makedirs(export_report_path) + except Exception as e: + self.stdio.error("init gather_report {0}".format(e)) + raise Exception("int gather_report {0}".format(e)) + if self.context.get_variable("gather_timestamp", None): + self.gather_timestamp = self.context.get_variable("gather_timestamp") + else: + self.gather_timestamp = TimeUtils.get_current_us_timestamp() + + def init_config(self): + try: + self.ob_cluster = self.context.cluster_config + self.obproxy_nodes = self.context.obproxy_config['servers'] + self.ob_nodes = self.context.cluster_config['servers'] + new_nodes = Util.get_nodes_list(self.context, self.ob_nodes, self.stdio) + if new_nodes: + self.nodes = new_nodes + options = self.context.options + self.database = Util.get_option(options, 'database') + self.table = Util.get_option(options, 'table') + user = Util.get_option(options, 'user') + password = Util.get_option(options, 'password') + self.export_report_path = Util.get_option(options, 'store_dir') + self.tenant_name = self.__extract_string(user) + self.ob_connector = OBConnector( + ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=self.ob_cluster.get("tenant_sys").get("user"), password=self.ob_cluster.get("tenant_sys").get("password"), stdio=self.stdio, timeout=100 + ) + self.tenant_connector = OBConnector(ip=self.ob_cluster.get("db_host"), port=self.ob_cluster.get("db_port"), username=user, password=password, stdio=self.stdio, timeout=100) + self.file_name = "{0}/obdiag_tabledump_result_{1}.txt".format(self.export_report_path, self.gather_timestamp) + return True + except Exception as e: + return False + + def handle(self): + if not self.init_config(): + self.stdio.error('init config failed') + return False + self.execute() + self.stdio.print("get table info finished. For more details, please run cmd '" + Fore.YELLOW + " cat {0} ".format(self.file_name) + Style.RESET_ALL + "'") + + def execute(self): + try: + self.version = get_observer_version(self.context) + self.__get_table_schema() + if self.version == "4.0.0.0" or StringUtils.compare_versions_greater(self.version, "4.0.0.0"): + self.__get_table_info() + else: + self.__get_table_info_v3() + except Exception as e: + self.stdio.error("report sql result to file: {0} failed, error: ".format(self.file_name)) + self.stdio.error("StepSQLHandler execute Exception: {0}".format(e).strip()) + + def __get_table_schema(self): + sql = "show create table " + self.database + "." + self.table + columns, result = self.tenant_connector.execute_sql_return_columns_and_data(sql) + if result is None or len(result) == 0: + self.stdio.verbose("excute sql: {0}, result is None".format(sql)) + self.__report(sql, columns, result) + + def __get_table_info(self): + try: + tenant_data = self.ob_connector.execute_sql_return_cursor_dictionary("select tenant_id from oceanbase.__all_tenant where tenant_name='{0}'".format(self.tenant_name)) + if tenant_data is None: + self.stdio.error("tenant is None") + return + self.tenant_id = tenant_data.fetchall()[0].get("tenant_id") + + database_data = self.ob_connector.execute_sql_return_cursor_dictionary( + "SELECT con_id as tenant_id, object_id as database_id, object_name as database_name FROM oceanbase.cdb_objects where OBJECT_TYPE = 'DATABASE' and con_id = '{0}' and object_name='{1}' ".format(self.tenant_id, self.database) + ) + if database_data is None: + self.stdio.error("database is None") + return + self.database_id = database_data.fetchall()[0].get("database_id") + table_data = self.ob_connector.execute_sql_return_cursor_dictionary( + "select /*+read_consistency(weak) QUERY_TIMEOUT(60000000) */ t.table_id from oceanbase.__all_virtual_table t where t.tenant_id = '{0}' and t.database_id = '{1}' and table_name = '{2}' limit 1 ".format( + self.tenant_id, self.database_id, self.table + ) + ) + if table_data is None: + self.stdio.error("table is None") + return + self.table_id = table_data.fetchall()[0].get("table_id") + + ## 查询行数 + query_count = "select /*+read_consistency(weak) QUERY_TIMEOUT(60000000) */ table_name as 'Table' , ifnull(num_rows,0) as num_rows from oceanbase.cdb_tables where con_id = '{0}' and owner = '{1}' and table_name = '{2}' order by num_rows desc limit 1".format( + self.tenant_id, self.database, self.table + ) + columns, result = self.ob_connector.execute_sql_return_columns_and_data(query_count) + if result is None: + self.stdio.error("line Count is None") + return + self.stdio.print("table count {0}".format(result)) + + self.__report(query_count, columns, result) + ## 查询数据量 + query_data = '''select /*+read_consistency(weak) QUERY_TIMEOUT(60000000) */ t1.SVR_IP,t1.role,ifnull(t2.data_size,0) as total_data_size from (select SVR_IP,tenant_id, database_name, role, table_id, tablet_id from oceanbase.cdb_ob_table_locations) t1 left join (select tenant_id, tablet_id,data_size from oceanbase.cdb_ob_tablet_replicas) t2 on t1.tenant_id = t2.tenant_id and t1.tablet_id = t2.tablet_id where t1.tenant_id = '{0}' and t1.table_id = '{1}' order by total_data_size desc limit 1'''.format( + self.tenant_id, self.table_id + ) + + columns, result = self.ob_connector.execute_sql_return_columns_and_data(query_data) + if result is None: + self.stdio.error("dataSize is None") + return + self.stdio.print("data size {0}".format(result)) + self.__report(query_data, columns, result) + + except Exception as e: + self.stdio.error("getTableInfo execute Exception: {0}".format(e).strip()) + + def __get_table_info_v3(self): + try: + tenant_data = self.ob_connector.execute_sql_return_cursor_dictionary("select tenant_id from oceanbase.__all_tenant where tenant_name='{0}'".format(self.tenant_name)) + if tenant_data is None: + self.stdio.error("tenant is None") + return + self.tenant_id = tenant_data.fetchall()[0].get("tenant_id") + database_data = self.ob_connector.execute_sql_return_cursor_dictionary("select tenant_id,database_id,database_name from oceanbase.gv$database where tenant_name = '{0}' and database_name = '{1}' ".format(self.tenant_name, self.database)) + if database_data is None: + self.stdio.error("database is None") + return + self.database_id = database_data.fetchall()[0].get("database_id") + table_data = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.__all_virtual_table where table_name='{0}' and database_id='{1}' and tenant_id='{2}'".format(self.table, self.database_id, self.tenant_id)) + if table_data is None: + self.stdio.error("table is None") + return + self.table_id = table_data.fetchall()[0].get("table_id") + query_count = '''select /*+read_consistency(weak) QUERY_TIMEOUT(60000000) */ m.svr_ip,m.role,m.data_size total_data_size, m.row_count as total_rows_count from oceanbase.__all_virtual_meta_table m, oceanbase.__all_virtual_table t + where m.table_id = t.table_id and m.tenant_id = '{0}' and m.table_id = '{1}' and t.table_name = '{2}' order by total_rows_count desc limit 1'''.format( + self.tenant_id, self.table_id, self.table + ) + columns, result = self.ob_connector.execute_sql_return_columns_and_data(query_count) + if result is None: + self.stdio.error("dataSize and line count is None") + return + self.stdio.print("table count {0}".format(result)) + self.__report(query_count, columns, result) + + except Exception as e: + self.stdio.error("getTableInfo execute Exception: {0}".format(e).strip()) + + def __report(self, sql, column_names, data): + try: + table_data = [list(row) for row in data] + formatted_table = tabulate(table_data, headers=column_names, tablefmt="grid") + with open(self.file_name, 'a', encoding='utf-8') as f: + f.write('\n\n' + 'obclient > ' + sql + '\n') + f.write(formatted_table) + except Exception as e: + self.stdio.error("report sql result to file: {0} failed, error: ".format(self.file_name)) + + def __extract_string(self, s): + if '@' in s: + at_index = s.index('@') + if '#' in s: + hash_index = s.index('#') + if hash_index > at_index: + return s[at_index + 1 : hash_index] + else: + return s[at_index + 1 :] + else: + return s[at_index + 1 :] + else: + return s