Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into 3.0.0_1209
Browse files Browse the repository at this point in the history
  • Loading branch information
Teingi committed Dec 10, 2024
2 parents bbff0e3 + 7d281ad commit df47808
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 61 deletions.
2 changes: 1 addition & 1 deletion core.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def update_obcluster_nodes(self, config):
if (obcluster := config_data.get('obcluster')) and (servers := obcluster.get('servers')) and servers.get('nodes'):
return

ob_version = get_observer_version_by_sql(ob_cluster, self.stdio)
ob_version = get_observer_version_by_sql(self.context, ob_cluster)
obConnetcor = OBConnector(context=self.context, ip=ob_cluster["db_host"], port=ob_cluster["db_port"], username=ob_cluster["tenant_sys"]["user"], password=ob_cluster["tenant_sys"]["password"])

sql = "select SVR_IP, SVR_PORT, ZONE, BUILD_VERSION from oceanbase.__all_server"
Expand Down
108 changes: 48 additions & 60 deletions handler/rca/scene/major_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,136 +48,125 @@ def init(self, context):
raise RCAInitException("MajorHoldScene RCAInitException: {0}".format(e))

def execute(self):
# 前置条件确认
need_tag = False
first_record = RCA_ResultRecord()
err_tenant_ids = []
# 合并任务是否有报错
self.record.add_record("check major task is error or not")
try:
COMPACTING_data = self.ob_connector.execute_sql('select * from oceanbase.CDB_OB_MAJOR_COMPACTION where IS_ERROR="YES";')
COMPACTING_data = self.ob_connector.execute_sql_return_cursor_dictionary('select * from oceanbase.CDB_OB_MAJOR_COMPACTION where IS_ERROR="YES";').fetchall()
if len(COMPACTING_data) == 0:
first_record.add_record("CDB_OB_MAJOR_COMPACTION is not exist IS_ERROR='YES'")
self.record.add_record("CDB_OB_MAJOR_COMPACTION is not exist IS_ERROR='YES'")
else:
need_tag = True
CDB_OB_MAJOR_COMPACTION_err_tenant_ids = []
for data in COMPACTING_data:
CDB_OB_MAJOR_COMPACTION_err_tenant_ids.append(str(data[0]))

first_record.add_record("CDB_OB_MAJOR_COMPACTION have IS_ERROR='YES',the tenant_ids are {0}".format(err_tenant_ids))
CDB_OB_MAJOR_COMPACTION_err_tenant_ids.append(str(data.get('TENANT_ID')))
self.record.add_record("CDB_OB_MAJOR_COMPACTION have IS_ERROR='YES',the tenant_ids are {0}".format(err_tenant_ids))
err_tenant_ids.extend(CDB_OB_MAJOR_COMPACTION_err_tenant_ids)

except Exception as e:
self.stdio.warn("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e))
raise RCAExecuteException("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e))
# __all_virtual_compaction_diagnose_info里存在status=FAILED的记录
# get info form __all_virtual_compaction_diagnose_info where status=FAILED
try:
diagnose_data = self.ob_connector.execute_sql('select * from oceanbase.__all_virtual_compaction_diagnose_info where status="FAILED";')
diagnose_data = self.ob_connector.execute_sql_return_cursor_dictionary('select * from oceanbase.__all_virtual_compaction_diagnose_info where status="FAILED";').fetchall()
if len(diagnose_data) == 0:
first_record.add_record('__all_virtual_compaction_diagnose_info is not exist status="FAILED";')
self.record.add_record('__all_virtual_compaction_diagnose_info is not exist status="FAILED";')
else:
need_tag = True
__all_virtual_compaction_diagnose_info_err_tenant_ids = []
for data in COMPACTING_data:
__all_virtual_compaction_diagnose_info_err_tenant_ids.append(str(data[0]))

first_record.add_record("__all_virtual_compaction_diagnose_info have status='FAILED',the tenant is {0}".format(__all_virtual_compaction_diagnose_info_err_tenant_ids))
__all_virtual_compaction_diagnose_info_err_tenant_ids.append(str(data.get("tenant_id")))
self.record.add_record("__all_virtual_compaction_diagnose_info have status='FAILED',the tenant is {0}".format(__all_virtual_compaction_diagnose_info_err_tenant_ids))
err_tenant_ids.extend(__all_virtual_compaction_diagnose_info_err_tenant_ids)
except Exception as e:
self.stdio.error("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e))
raise RCAExecuteException("MajorHoldScene execute CDB_OB_MAJOR_COMPACTION panic: {0}".format(e))
# GV$OB_COMPACTION_PROGRESS表中,根据上一次合并记录中的data_size/(estimated_finish_time-start_time)与当前合并版本记录中(data_size-unfinished_data_size)/(当前时间-start_time)相比,如果差距过大(当前合并比上一次合并慢很多,以5倍为指标)
try:
running_data = self.ob_connector.execute_sql("select * from oceanbase.GV$OB_COMPACTION_PROGRESS where STATUS <> 'FINISH' and START_TIME <= NOW() - INTERVAL 20 minute GROUP BY COMPACTION_SCN DESC;")
running_data = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.GV$OB_COMPACTION_PROGRESS where STATUS <> 'FINISH' and START_TIME <= NOW() - INTERVAL 20 minute GROUP BY COMPACTION_SCN DESC;").fetchall()
if len(running_data) == 0:
first_record.add_record("No merge tasks that have not ended beyond the expected time")
self.record.add_record("No merge tasks that have not ended beyond the expected time")
else:

time_out_merge_err_tenant_ids = []
need_tag = True
for data in running_data:
time_out_merge_err_tenant_ids.append(str(data[2]))
first_record.add_record("merge tasks that have not ended beyond the expected time,the tenant_id is {0}".format(time_out_merge_err_tenant_ids))
time_out_merge_err_tenant_ids.append(str(data.get("TENANT_ID")))
self.record.add_record("merge tasks that have not ended beyond the expected time,the tenant_id is {0}".format(time_out_merge_err_tenant_ids))
self.stdio.verbose("merge tasks that have not ended beyond the expected time,the tenant_id is {0}".format(time_out_merge_err_tenant_ids))
err_tenant_ids.extend(time_out_merge_err_tenant_ids)
except Exception as e:
self.stdio.error("MajorHoldScene execute GV$OB_COMPACTION_PROGRESS panic: {0}".format(e))
raise RCAExecuteException("MajorHoldScene execute GV$OB_COMPACTION_PROGRESS panic: {0}".format(e))
if not need_tag:
first_record.add_suggest("major merge abnormal situation not need execute")
self.Result.records.append(first_record)
self.record.add_suggest("major merge abnormal situation not need execute")

raise RCANotNeedExecuteException("MajorHoldScene not need execute")
else:
err_tenant_ids = list(set(err_tenant_ids))
first_record.add_suggest("some tenants need execute MajorHoldScene. :{0}".format(err_tenant_ids))
self.record.add_suggest("some tenants need execute MajorHoldScene. :{0}".format(err_tenant_ids))
self.stdio.verbose("On CDB_OB_MAJOR_COMPACTION")

# execute record need more
for err_tenant_id in err_tenant_ids:
tenant_record = RCA_ResultRecord()
first_record_records = first_record.records.copy()
first_record_records = self.record.records.copy()
tenant_record.records.extend(first_record_records)
self.stdio.verbose("tenant_id is {0}".format(err_tenant_id))
tenant_record.add_record("tenant_id is {0}".format(err_tenant_id))
# 1
try:
tenant_record.add_record("step:1 get CDB_OB_MAJOR_COMPACTION data")
cursor = self.ob_connector.execute_sql_return_cursor_dictionary('SELECT * FROM oceanbase.CDB_OB_MAJOR_COMPACTION WHERE TENANT_ID= "{0}" AND (IS_ERROR = "NO" OR IS_SUSPENDED = "NO");'.format(err_tenant_id))
OB_MAJOR_COMPACTION_data = cursor.fetchall()
if len(OB_MAJOR_COMPACTION_data) == 0:
tenant_record.add_record("on CDB_OB_MAJOR_COMPACTION where status='COMPACTING'; " "result:{0} , need not next step".format(str(OB_MAJOR_COMPACTION_data)))

else:
tenant_record.add_record("on CDB_OB_MAJOR_COMPACTION where status='COMPACTING';" "result:{0}".format(str(OB_MAJOR_COMPACTION_data)))

except Exception as e:
tenant_record.add_record("#1 on CDB_OB_MAJOR_COMPACTION get data failed")
self.stdio.warn("MajorHoldScene execute exception: {0}".format(e))
pass
# 2
try:
compaction_diagnose_info = self.ob_connector.execute_sql('SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE status="FAILED";')

tenant_record.add_record("step:2&3 get __all_virtual_compaction_diagnose_info and check the diagnose type")
compaction_diagnose_info = self.ob_connector.execute_sql_return_cursor_dictionary('SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE status="FAILED";').fetchall()
if len(compaction_diagnose_info) == 0:
tenant_record.add_record("on __all_virtual_compaction_diagnose_info no data status=FAILED")
else:
tenant_record.add_record("on __all_virtual_compaction_diagnose_info;" "result:{0}".format(str(compaction_diagnose_info)))

for COMPACTING_data in compaction_diagnose_info:
self.diagnose_info_switch(COMPACTING_data, tenant_record)

except Exception as e:
tenant_record.add_record("#2&3 on __all_virtual_compaction_diagnose_info get data failed")
self.stdio.warn("#2&3 MajorHoldScene execute exception: {0}".format(e))
pass

# 4
try:
global_broadcast_scn = self.ob_connector.execute_sql("select * from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id))[0][3]
tenant_record.add_record("step:4 get GV$OB_COMPACTION_PROGRESS whit tenant_id:{0}".format(err_tenant_id))
global_broadcast_scn = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id)).fetchall()[0].get("GLOBAL_BROADCAST_SCN")
tenant_record.add_record("global_broadcast_scn is {0}".format(global_broadcast_scn))
last_scn = self.ob_connector.execute_sql("select LAST_SCN from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id))[0]
last_scn = self.ob_connector.execute_sql_return_cursor_dictionary("select LAST_SCN from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(err_tenant_id)).fetchall()[0]
tenant_record.add_record("last_scn is {0}".format(last_scn))

sql = "select * from oceanbase.GV$OB_COMPACTION_PROGRESS where TENANT_ID='{0}' and COMPACTION_SCN='{1}';".format(err_tenant_id, global_broadcast_scn)
OB_COMPACTION_PROGRESS_data_global_broadcast_scn = self.ob_connector.execute_sql(sql)
OB_COMPACTION_PROGRESS_data_global_broadcast_scn = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()
file_name = "{0}/rca_major_hold_{1}_OB_COMPACTION_PROGRESS_data_global_broadcast_scn".format(self.local_path, err_tenant_id)
with open(file_name, "w") as f:
f.write(str(OB_COMPACTION_PROGRESS_data_global_broadcast_scn))
tenant_record.add_record("tenant_id:{0} OB_COMPACTION_PROGRESS_data_global_broadcast_scn save on {1}".format(err_tenant_id, file_name))

sql = "select * from oceanbase.GV$OB_COMPACTION_PROGRESS where TENANT_ID='{0}' and COMPACTION_SCN='{1}';".format(err_tenant_id, last_scn)
OB_COMPACTION_PROGRESS_data_last_scn = self.ob_connector.execute_sql(sql)
OB_COMPACTION_PROGRESS_data_last_scn = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()
file_name = "{0}/rca_major_hold_{1}_OB_COMPACTION_PROGRESS_data_last_scn".format(self.local_path, err_tenant_id)
with open(file_name, "w") as f:
f.write(str(OB_COMPACTION_PROGRESS_data_last_scn))
tenant_record.add_record("tenant_id:{0} OB_COMPACTION_PROGRESS_data_last_scn save on {1}".format(err_tenant_id, file_name))

sql = "select * from oceanbase.GV$OB_COMPACTION_PROGRESS where TENANT_ID='{0}' and STATUS<>'FINISH';".format(err_tenant_id, global_broadcast_scn)
finish_data = self.ob_connector.execute_sql(sql)
finish_data = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()
if len(finish_data) == 0:
tenant_record.add_record("sql:{0},len of result is 0;result:{1}".format(sql, finish_data))
sql = "select * from oceanbase. where TENANT_ID='{0}' and LS_ID=1".format(err_tenant_id)
svrs = self.ob_connector.execute_sql(sql)
svr_ip = svrs[0][4]
svr_port = svrs[0][5]
sql = "select * from oceanbase.DBA_OB_TABLET_REPLICAS where TENANT_ID='{0}' and LS_ID=1".format(err_tenant_id)
svrs = self.ob_connector.execute_sql_return_cursor_dictionary(sql).fetchall()
svr_ip = svrs[0].get("SVR_IP")
svr_port = svrs[0].get("SVR_PORT")
node = None
ssh_client = None
for observer_node in self.observer_nodes:
Expand All @@ -187,7 +176,6 @@ def execute(self):
if node == None:
self.stdio.error("can not find ls_svr by TENANT_ID:{2} ip:{0},port:{1}".format(svr_ip, svr_port, err_tenant_id))
break

log_name = "/tmp/major_hold_scene_4_major_merge_progress_checker_{0}.log".format(err_tenant_id)
ssh_client.exec_cmd('grep "major_merge_progress_checker" {0}/log/rootservice.log* | grep T{1} -m500 >{2}'.format(node.get("home_path"), err_tenant_id, log_name))
ssh_client.download(log_name, self.local_path)
Expand All @@ -199,8 +187,8 @@ def execute(self):

# 5
try:
tenant_record.add_record("step:5 get OB_COMPACTION_SUGGESTIONS")
cursor = self.ob_connector.execute_sql_return_cursor_dictionary('select * from oceanbase.GV$OB_COMPACTION_SUGGESTIONS where tenant_id="{0}";'.format(err_tenant_id))
columns = [column[0] for column in cursor.description]
OB_COMPACTION_SUGGESTIONS_data = cursor.fetchall()
OB_COMPACTION_SUGGESTIONS_info = json.dumps(OB_COMPACTION_SUGGESTIONS_data, cls=DateTimeEncoder)
file_name = "{0}/rca_major_hold_{1}_OB_COMPACTION_SUGGESTIONS_info".format(self.local_path, err_tenant_id)
Expand All @@ -212,7 +200,7 @@ def execute(self):
self.stdio.warn("MajorHoldScene execute 5 exception: {0}".format(e))
# 6
try:
self.stdio.verbose("MajorHoldScene execute 6 get dmesg by dmesg -T")
self.stdio.verbose("step:6 get dmesg by dmesg -T")
if not os.path.exists(self.local_path + "/dmesg"):
os.makedirs(self.local_path + "/dmesg_log")
# all node execute
Expand All @@ -228,26 +216,26 @@ def execute(self):

def get_info__all_virtual_compaction_diagnose_info(self, tenant_record):
try:
COMPACTING_datas = self.ob_connector.execute_sql("SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE IS_ERROR = 'NO' OR IS_SUSPENDED = 'NO';")
COMPACTING_datas = self.ob_connector.execute_sql_return_cursor_dictionary("SELECT * FROM oceanbase.__all_virtual_compaction_diagnose_info WHERE IS_ERROR = 'NO' OR IS_SUSPENDED = 'NO';").fetchall()
if len(COMPACTING_datas) == 0:
tenant_record.add_record("sql:select * from oceanbase.__all_virtual_compaction_diagnose_info; no data")
return
else:
tenant_record.add_record("sql:select * from oceanbase.CDB_OB_MAJOR_COMPACTION where status=COMPACTING; " "result:{0}".format(str(COMPACTING_datas)))
for index, COMPACTING_data in COMPACTING_datas:
self.diagnose_info_switch(COMPACTING_data)
tenant_record.add_record("sql:select * from oceanbase.__all_virtual_compaction_diagnose_info where status=COMPACTING; " "result:{0}".format(str(COMPACTING_datas)))
except Exception as e:
raise RCAExecuteException("MajorHoldScene execute get_info__all_virtual_compaction_diagnose_info exception: {0}".format(e))

def diagnose_info_switch(self, sql_data, tenant_record):
svr_ip = sql_data[0]
svr_port = sql_data[1]
tenant_id = sql_data[2]
ls_id = sql_data[4]
table_id = sql_data[5]
create_time = sql_data[7]
diagnose_info = sql_data[8]
svr_ip = sql_data.get("svr_ip")
svr_port = sql_data.get("svr_port")
tenant_id = sql_data.get("tenant_id")
ls_id = sql_data.get("ls_id")
table_id = sql_data.get("table_id")
create_time = sql_data.get("create_time")
diagnose_info = sql_data.get("diagnose_info")
tenant_record.add_record("diagnose_info:{0}".format(diagnose_info))
if "schedule medium failed" in diagnose_info:
tenant_record.add_record("diagnose_info type is 'schedule medium failed'")
node = None
ssh_client = None
for observer_node in self.observer_nodes:
Expand All @@ -265,9 +253,9 @@ def diagnose_info_switch(self, sql_data, tenant_record):
ssh_client.exec_cmd("rm -rf {0}".format(log_name))
return
elif "error_no=" in diagnose_info and "error_trace=" in diagnose_info:
err_no = re.search("\berror_no=(\d+)\b", diagnose_info).group(1)
err_trace = re.search("\berror_trace=(.+)\b", diagnose_info).group(1)

tenant_record.add_record("diagnose_info type is error_no")
err_no = re.search(r'error_no=([^,]+)', diagnose_info).group(1)
err_trace = re.search(r'error_trace=([^,]+)', diagnose_info).group(1)
global_broadcast_scn = self.ob_connector.execute_sql("select * from oceanbase.CDB_OB_MAJOR_COMPACTION where TENANT_ID='{0}';".format(tenant_id))[0][3]
compaction_scn = self.ob_connector.execute_sql("select * from oceanbase.__all_virtual_tablet_meta_table where tablet_id='{0}' and tenant_id='{1}';".format(table_id, tenant_id))[0][7]
if compaction_scn > global_broadcast_scn:
Expand Down Expand Up @@ -315,14 +303,14 @@ def diagnose_info_switch(self, sql_data, tenant_record):
ssh_client = observer_node["ssher"]
if node is None:
raise RCAExecuteException("can not find observer node by ip:{0}, port:{1}".format(svr_ip, svr_port))

tenant_record.add_record("diagnose_info type is 'error_no'. time is {0},observer is {1}:{2},the log is {3}".format(create_time, svr_ip, svr_port, log_name))
ssh_client.exec_cmd('cat observer.log* |grep "{1}" > /tmp/{0}'.format(log_name, err_trace))
ssh_client.download(log_name, local_path=self.local_path)
tenant_record.add_record("download {0} to {1}".format(log_name, self.local_path))
ssh_client.exec_cmd("rm -rf {0}".format(log_name))
return
elif "weak read ts is not ready" in diagnose_info:
tenant_record.add_record("diagnose_info type is weak read ts is not ready.")
cursor = self.ob_connector.execute_sql_return_cursor_dictionary("select * from oceanbase.__all_virtual_ls_info where tenant_id='{0}' and ls_id='{1}';".format(tenant_id, ls_id))
all_virtual_ls_info_data = cursor.fetchall()
self.all_virtual_ls_info = json.dumps(all_virtual_ls_info_data, cls=DateTimeEncoder)
Expand Down

0 comments on commit df47808

Please sign in to comment.