diff --git a/core.py b/core.py index f2e886c2..d19f9a23 100644 --- a/core.py +++ b/core.py @@ -176,7 +176,7 @@ def update_obcluster_nodes(self, config): if (obcluster := config_data.get('obcluster')) and (servers := obcluster.get('servers')) and servers.get('nodes'): return - ob_version = get_observer_version_by_sql(ob_cluster, self.stdio) + ob_version = get_observer_version_by_sql(self.context, ob_cluster) obConnetcor = OBConnector(context=self.context, ip=ob_cluster["db_host"], port=ob_cluster["db_port"], username=ob_cluster["tenant_sys"]["user"], password=ob_cluster["tenant_sys"]["password"]) sql = "select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.__all_server" diff --git a/handler/rca/scene/major_hold.py b/handler/rca/scene/major_hold.py index 7d6b51b5..01603187 100644 --- a/handler/rca/scene/major_hold.py +++ b/handler/rca/scene/major_hold.py @@ -48,136 +48,125 @@ def init(self, context): raise RCAInitException("MajorHoldScene RCAInitException: {0}".format(e)) def execute(self): - # 前置条件确认 need_tag = False - first_record = RCA_ResultRecord() err_tenant_ids = [] - # 合并任务是否有报错 + self.record.add_record("check major task is error or not") try: - COMPACTING_data = self.ob_connector.execute_sql('select * from oceanbase.CDB_OB_MAJOR_COMPACTION where IS_ERROR="YES";') + COMPACTING_data = self.ob_connector.execute_sql_return_cursor_dictionary('select * from oceanbase.CDB_OB_MAJOR_COMPACTION where IS_ERROR="YES";').fetchall() if len(COMPACTING_data) == 0: - first_record.add_record("CDB_OB_MAJOR_COMPACTION is not exist IS_ERROR='YES'") + self.record.add_record("CDB_OB_MAJOR_COMPACTION is not exist IS_ERROR='YES'") else: need_tag = True CDB_OB_MAJOR_COMPACTION_err_tenant_ids = [] for data in COMPACTING_data: - CDB_OB_MAJOR_COMPACTION_err_tenant_ids.append(str(data[0])) - - first_record.add_record("CDB_OB_MAJOR_COMPACTION have IS_ERROR='YES',the tenant_ids are {0}".format(err_tenant_ids)) + CDB_OB_MAJOR_COMPACTION_err_tenant_ids.append(str(data.get('TENANT_ID'))) + self.record.add_record("CDB_OB_MAJOR_COMPACTION have IS_ERROR='YES',the tenant_ids are {0}".format(err_tenant_ids)) err_tenant_ids.extend(CDB_OB_MAJOR_COMPACTION_err_tenant_ids) except Exception as e: self.stdio.warn("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e)) raise RCAExecuteException("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e)) - # __all_virtual_compaction_diagnose_info里存在status=FAILED的记录 + # get info form __all_virtual_compaction_diagnose_info where status=FAILED try: - diagnose_data = self.ob_connector.execute_sql('select * from oceanbase.__all_virtual_compaction_diagnose_info where status="FAILED";') + diagnose_data = self.ob_connector.execute_sql_return_cursor_dictionary('select * from oceanbase.__all_virtual_compaction_diagnose_info where status="FAILED";').fetchall() if len(diagnose_data) == 0: - first_record.add_record('__all_virtual_compaction_diagnose_info is not exist status="FAILED";') + self.record.add_record('__all_virtual_compaction_diagnose_info is not exist status="FAILED";') else: need_tag = True __all_virtual_compaction_diagnose_info_err_tenant_ids = [] for data in COMPACTING_data: - __all_virtual_compaction_diagnose_info_err_tenant_ids.append(str(data[0])) - - first_record.add_record("__all_virtual_compaction_diagnose_info have status='FAILED',the tenant is {0}".format(__all_virtual_compaction_diagnose_info_err_tenant_ids)) + __all_virtual_compaction_diagnose_info_err_tenant_ids.append(str(data.get("tenant_id"))) + self.record.add_record("__all_virtual_compaction_diagnose_info have status='FAILED',the tenant is {0}".format(__all_virtual_compaction_diagnose_info_err_tenant_ids)) err_tenant_ids.extend(__all_virtual_compaction_diagnose_info_err_tenant_ids) except Exception as e: self.stdio.error("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e)) raise RCAExecuteException("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e)) # GV$OB_COMPACTION_PROGRESS表中,根据上一次合并记录中的data_size/(estimated_finish_time-start_time)与当前合并版本记录中(data_size-unfinished_data_size)/(当前时间-start_time)相比,如果差距过大(当前合并比上一次合并慢很多,以5倍为指标) try: - running_data = self.ob_connector.execute_sql("select * from oceanbase.GV$OB_COMPACTION_PROGRESS where STATUS <> 'FINISH' and START_TIME <= NOW() - INTERVAL 20 minute GROUP BY COMPACTION_SCN DESC;") + running_data = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.GV$OB_COMPACTION_PROGRESS where STATUS <> 'FINISH' and START_TIME <= NOW() - INTERVAL 20 minute GROUP BY COMPACTION_SCN DESC;").fetchall() if len(running_data) == 0: - first_record.add_record("No merge tasks that have not ended beyond the expected time") + self.record.add_record("No merge tasks that have not ended beyond the expected time") else: - time_out_merge_err_tenant_ids = [] need_tag = True for data in running_data: - time_out_merge_err_tenant_ids.append(str(data[2])) - first_record.add_record("merge tasks that have not ended beyond the expected time,the tenant_id is {0}".format(time_out_merge_err_tenant_ids)) + time_out_merge_err_tenant_ids.append(str(data.get("TENANT_ID"))) + self.record.add_record("merge tasks that have not ended beyond the expected time,the tenant_id is {0}".format(time_out_merge_err_tenant_ids)) self.stdio.verbose("merge tasks that have not ended beyond the expected time,the tenant_id is {0}".format(time_out_merge_err_tenant_ids)) err_tenant_ids.extend(time_out_merge_err_tenant_ids) except Exception as e: self.stdio.error("MajorHoldScene execute GV$OB_COMPACTION_PROGRESS panic: {0}".format(e)) raise RCAExecuteException("MajorHoldScene execute GV$OB_COMPACTION_PROGRESS panic: {0}".format(e)) if not need_tag: - first_record.add_suggest("major merge abnormal situation not need execute") - self.Result.records.append(first_record) + self.record.add_suggest("major merge abnormal situation not need execute") + raise RCANotNeedExecuteException("MajorHoldScene not need execute") else: err_tenant_ids = list(set(err_tenant_ids)) - first_record.add_suggest("some tenants need execute MajorHoldScene. :{0}".format(err_tenant_ids)) + self.record.add_suggest("some tenants need execute MajorHoldScene. :{0}".format(err_tenant_ids)) self.stdio.verbose("On CDB_OB_MAJOR_COMPACTION") # execute record need more for err_tenant_id in err_tenant_ids: tenant_record = RCA_ResultRecord() - first_record_records = first_record.records.copy() + first_record_records = self.record.records.copy() tenant_record.records.extend(first_record_records) self.stdio.verbose("tenant_id is {0}".format(err_tenant_id)) tenant_record.add_record("tenant_id is {0}".format(err_tenant_id)) # 1 try: + tenant_record.add_record("step:1 get CDB_OB_MAJOR_COMPACTION data") cursor = self.ob_connector.execute_sql_return_cursor_dictionary('SELECT * FROM oceanbase.CDB_OB_MAJOR_COMPACTION WHERE TENANT_ID= "{0}" AND (IS_ERROR = "NO" OR IS_SUSPENDED = "NO");'.format(err_tenant_id)) OB_MAJOR_COMPACTION_data = cursor.fetchall() if len(OB_MAJOR_COMPACTION_data) == 0: tenant_record.add_record("on CDB_OB_MAJOR_COMPACTION where status='COMPACTING'; " "result:{0} , need not next step".format(str(OB_MAJOR_COMPACTION_data))) - else: tenant_record.add_record("on CDB_OB_MAJOR_COMPACTION where status='COMPACTING';" "result:{0}".format(str(OB_MAJOR_COMPACTION_data))) - except Exception as e: tenant_record.add_record("#1 on CDB_OB_MAJOR_COMPACTION get data failed") self.stdio.warn("MajorHoldScene execute exception: {0}".format(e)) pass # 2 try: - compaction_diagnose_info = self.ob_connector.execute_sql('SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE status="FAILED";') - + tenant_record.add_record("step:2&3 get __all_virtual_compaction_diagnose_info and check the diagnose type") + compaction_diagnose_info = self.ob_connector.execute_sql_return_cursor_dictionary('SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE status="FAILED";').fetchall() if len(compaction_diagnose_info) == 0: tenant_record.add_record("on __all_virtual_compaction_diagnose_info no data status=FAILED") else: tenant_record.add_record("on __all_virtual_compaction_diagnose_info;" "result:{0}".format(str(compaction_diagnose_info))) - for COMPACTING_data in compaction_diagnose_info: self.diagnose_info_switch(COMPACTING_data, tenant_record) - except Exception as e: tenant_record.add_record("#2&3 on __all_virtual_compaction_diagnose_info get data failed") self.stdio.warn("#2&3 MajorHoldScene execute exception: {0}".format(e)) pass - # 4 try: - global_broadcast_scn = self.ob_connector.execute_sql("select * from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id))[0][3] + tenant_record.add_record("step:4 get GV$OB_COMPACTION_PROGRESS whit tenant_id:{0}".format(err_tenant_id)) + global_broadcast_scn = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id)).fetchall()[0].get("GLOBAL_BROADCAST_SCN") tenant_record.add_record("global_broadcast_scn is {0}".format(global_broadcast_scn)) - last_scn = self.ob_connector.execute_sql("select LAST_SCN from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id))[0] + last_scn = self.ob_connector.execute_sql_return_cursor_dictionary("select LAST_SCN from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id)).fetchall()[0] tenant_record.add_record("last_scn is {0}".format(last_scn)) - sql = "select * from oceanbase.GV$OB_COMPACTION_PROGRESS where TENANT_ID='{0}' and COMPACTION_SCN='{1}';".format(err_tenant_id, global_broadcast_scn) - OB_COMPACTION_PROGRESS_data_global_broadcast_scn = self.ob_connector.execute_sql(sql) + OB_COMPACTION_PROGRESS_data_global_broadcast_scn = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall() file_name = "{0}/rca_major_hold_{1}_OB_COMPACTION_PROGRESS_data_global_broadcast_scn".format(self.local_path, err_tenant_id) with open(file_name, "w") as f: f.write(str(OB_COMPACTION_PROGRESS_data_global_broadcast_scn)) tenant_record.add_record("tenant_id:{0} OB_COMPACTION_PROGRESS_data_global_broadcast_scn save on {1}".format(err_tenant_id, file_name)) - sql = "select * from oceanbase.GV$OB_COMPACTION_PROGRESS where TENANT_ID='{0}' and COMPACTION_SCN='{1}';".format(err_tenant_id, last_scn) - OB_COMPACTION_PROGRESS_data_last_scn = self.ob_connector.execute_sql(sql) + OB_COMPACTION_PROGRESS_data_last_scn = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall() file_name = "{0}/rca_major_hold_{1}_OB_COMPACTION_PROGRESS_data_last_scn".format(self.local_path, err_tenant_id) with open(file_name, "w") as f: f.write(str(OB_COMPACTION_PROGRESS_data_last_scn)) tenant_record.add_record("tenant_id:{0} OB_COMPACTION_PROGRESS_data_last_scn save on {1}".format(err_tenant_id, file_name)) - sql = "select * from oceanbase.GV$OB_COMPACTION_PROGRESS where TENANT_ID='{0}' and STATUS<>'FINISH';".format(err_tenant_id, global_broadcast_scn) - finish_data = self.ob_connector.execute_sql(sql) + finish_data = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall() if len(finish_data) == 0: tenant_record.add_record("sql:{0},len of result is 0;result:{1}".format(sql, finish_data)) - sql = "select * from oceanbase. where TENANT_ID='{0}' and LS_ID=1".format(err_tenant_id) - svrs = self.ob_connector.execute_sql(sql) - svr_ip = svrs[0][4] - svr_port = svrs[0][5] + sql = "select * from oceanbase.DBA_OB_TABLET_REPLICAS where TENANT_ID='{0}' and LS_ID=1".format(err_tenant_id) + svrs = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall() + svr_ip = svrs[0].get("SVR_IP") + svr_port = svrs[0].get("SVR_PORT") node = None ssh_client = None for observer_node in self.observer_nodes: @@ -187,7 +176,6 @@ def execute(self): if node == None: self.stdio.error("can not find ls_svr by TENANT_ID:{2} ip:{0},port:{1}".format(svr_ip, svr_port, err_tenant_id)) break - log_name = "/tmp/major_hold_scene_4_major_merge_progress_checker_{0}.log".format(err_tenant_id) ssh_client.exec_cmd('grep "major_merge_progress_checker" {0}/log/rootservice.log* | grep T{1} -m500 >{2}'.format(node.get("home_path"), err_tenant_id, log_name)) ssh_client.download(log_name, self.local_path) @@ -199,8 +187,8 @@ def execute(self): # 5 try: + tenant_record.add_record("step:5 get OB_COMPACTION_SUGGESTIONS") cursor = self.ob_connector.execute_sql_return_cursor_dictionary('select * from oceanbase.GV$OB_COMPACTION_SUGGESTIONS where tenant_id="{0}";'.format(err_tenant_id)) - columns = [column[0] for column in cursor.description] OB_COMPACTION_SUGGESTIONS_data = cursor.fetchall() OB_COMPACTION_SUGGESTIONS_info = json.dumps(OB_COMPACTION_SUGGESTIONS_data, cls=DateTimeEncoder) file_name = "{0}/rca_major_hold_{1}_OB_COMPACTION_SUGGESTIONS_info".format(self.local_path, err_tenant_id) @@ -212,7 +200,7 @@ def execute(self): self.stdio.warn("MajorHoldScene execute 5 exception: {0}".format(e)) # 6 try: - self.stdio.verbose("MajorHoldScene execute 6 get dmesg by dmesg -T") + self.stdio.verbose("step:6 get dmesg by dmesg -T") if not os.path.exists(self.local_path + "/dmesg"): os.makedirs(self.local_path + "/dmesg_log") # all node execute @@ -228,26 +216,26 @@ def execute(self): def get_info__all_virtual_compaction_diagnose_info(self, tenant_record): try: - COMPACTING_datas = self.ob_connector.execute_sql("SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE IS_ERROR = 'NO' OR IS_SUSPENDED = 'NO';") + COMPACTING_datas = self.ob_connector.execute_sql_return_cursor_dictionary("SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE IS_ERROR = 'NO' OR IS_SUSPENDED = 'NO';").fetchall() if len(COMPACTING_datas) == 0: tenant_record.add_record("sql:select * from oceanbase.__all_virtual_compaction_diagnose_info; no data") return else: - tenant_record.add_record("sql:select * from oceanbase.CDB_OB_MAJOR_COMPACTION where status=COMPACTING; " "result:{0}".format(str(COMPACTING_datas))) - for index, COMPACTING_data in COMPACTING_datas: - self.diagnose_info_switch(COMPACTING_data) + tenant_record.add_record("sql:select * from oceanbase.__all_virtual_compaction_diagnose_info where status=COMPACTING; " "result:{0}".format(str(COMPACTING_datas))) except Exception as e: raise RCAExecuteException("MajorHoldScene execute get_info__all_virtual_compaction_diagnose_info exception: {0}".format(e)) def diagnose_info_switch(self, sql_data, tenant_record): - svr_ip = sql_data[0] - svr_port = sql_data[1] - tenant_id = sql_data[2] - ls_id = sql_data[4] - table_id = sql_data[5] - create_time = sql_data[7] - diagnose_info = sql_data[8] + svr_ip = sql_data.get("svr_ip") + svr_port = sql_data.get("svr_port") + tenant_id = sql_data.get("tenant_id") + ls_id = sql_data.get("ls_id") + table_id = sql_data.get("table_id") + create_time = sql_data.get("create_time") + diagnose_info = sql_data.get("diagnose_info") + tenant_record.add_record("diagnose_info:{0}".format(diagnose_info)) if "schedule medium failed" in diagnose_info: + tenant_record.add_record("diagnose_info type is 'schedule medium failed'") node = None ssh_client = None for observer_node in self.observer_nodes: @@ -265,9 +253,9 @@ def diagnose_info_switch(self, sql_data, tenant_record): ssh_client.exec_cmd("rm -rf {0}".format(log_name)) return elif "error_no=" in diagnose_info and "error_trace=" in diagnose_info: - err_no = re.search("\berror_no=(\d+)\b", diagnose_info).group(1) - err_trace = re.search("\berror_trace=(.+)\b", diagnose_info).group(1) - + tenant_record.add_record("diagnose_info type is error_no") + err_no = re.search(r'error_no=([^,]+)', diagnose_info).group(1) + err_trace = re.search(r'error_trace=([^,]+)', diagnose_info).group(1) global_broadcast_scn = self.ob_connector.execute_sql("select * from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(tenant_id))[0][3] compaction_scn = self.ob_connector.execute_sql("select * from oceanbase.__all_virtual_tablet_meta_table where tablet_id='{0}' and tenant_id='{1}';".format(table_id, tenant_id))[0][7] if compaction_scn > global_broadcast_scn: @@ -315,7 +303,6 @@ def diagnose_info_switch(self, sql_data, tenant_record): ssh_client = observer_node["ssher"] if node is None: raise RCAExecuteException("can not find observer node by ip:{0}, port:{1}".format(svr_ip, svr_port)) - tenant_record.add_record("diagnose_info type is 'error_no'. time is {0},observer is {1}:{2},the log is {3}".format(create_time, svr_ip, svr_port, log_name)) ssh_client.exec_cmd('cat observer.log* |grep "{1}" > /tmp/{0}'.format(log_name, err_trace)) ssh_client.download(log_name, local_path=self.local_path) @@ -323,6 +310,7 @@ def diagnose_info_switch(self, sql_data, tenant_record): ssh_client.exec_cmd("rm -rf {0}".format(log_name)) return elif "weak read ts is not ready" in diagnose_info: + tenant_record.add_record("diagnose_info type is weak read ts is not ready.") cursor = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.__all_virtual_ls_info where tenant_id='{0}' and ls_id='{1}';".format(tenant_id, ls_id)) all_virtual_ls_info_data = cursor.fetchall() self.all_virtual_ls_info = json.dumps(all_virtual_ls_info_data, cls=DateTimeEncoder)