Skip to content

Commit

Permalink
update0904
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyd66 committed Sep 4, 2024
1 parent 98da876 commit aa40dd9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 56 deletions.
81 changes: 27 additions & 54 deletions handler/analyzer/analyze_quene.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,17 @@ def get_version(self):
def handle(self):
if not self.init_option():
self.stdio.error('init option failed')
# return ObdiagResult(ObdiagResult.SERVER_ERROR_CODE, error_data="init option failed")
return False
if not self.init_config():
self.stdio.error('init config failed')
# return ObdiagResult(ObdiagResult.SERVER_ERROR_CODE, error_data="init config failed")
return False
local_store_parent_dir = os.path.join(self.gather_pack_dir, "obdiag_analyze_pack_{0}".format(TimeUtils.timestamp_to_filename_time(TimeUtils.get_current_us_timestamp())))
self.stdio.verbose("Use {0} as pack dir.".format(local_store_parent_dir))
# analyze_tuples = []
analyze_tuples = pd.DataFrame()
analyze_tuples = []

def handle_from_node(node):
node_results = self.__handle_from_node(node, local_store_parent_dir)
# analyze_tuples.append((node.get("ip"), False, node_results))
analyze_tuples = pd.concat([analyze_tuples, node_results], axis=0)
analyze_tuples.append((node.get("ip"), False, node_results))

if self.is_ssh:
nodes_new = []
Expand Down Expand Up @@ -215,12 +211,11 @@ def handle_from_node(node):
# return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info})

def __handle_from_node(self, node, local_store_parent_dir):
# resp = {"skip": False, "error": ""}
ssh_client = SshClient(self.context, node)
try:
# node_results = []
analyze_result = pd.DataFrame()
node_results = pd.DataFrame()
node_results = []
quene_limit = self.quenue
result_dict = {}
remote_ip = node.get("ip") if self.is_ssh else '127.0.0.1'
remote_user = node.get("ssh_username")
remote_password = node.get("ssh_password")
Expand All @@ -233,8 +228,6 @@ def __handle_from_node(self, node, local_store_parent_dir):
DirectoryUtil.mkdir(path=local_store_dir, stdio=self.stdio)
except Exception as e:
ssh_failed = True
# resp["skip"] = True
# resp["error"] = "Please check the {0}".format(self.config_path)
raise Exception("Please check the {0}".format(self.config_path))

from_datetime_timestamp = TimeUtils.timestamp_to_filename_time(TimeUtils.datetime_to_timestamp(self.from_time_str))
Expand All @@ -244,8 +237,6 @@ def __handle_from_node(self, node, local_store_parent_dir):
mkdir(ssh_client, gather_dir_full_path)

log_list = self.__handle_log_list(ssh_client, node)
# if resp["skip"]:
# return resp, node_results
self.stdio.print(FileUtil.show_file_list_tabulate(remote_ip, log_list, self.stdio))
for log_name in log_list:
if self.directly_analyze_files:
Expand All @@ -257,25 +248,30 @@ def __handle_from_node(self, node, local_store_parent_dir):
self.stdio.start_loading('analyze log start')
file_result = self.__parse_log_lines(analyze_log_full_path)
self.stdio.stop_loading('analyze log sucess')
node_results = pd.concat([node_results, file_result], axis=0)
node_results.append(file_result)
delete_file(ssh_client, gather_dir_full_path, self.stdio)
ssh_client.ssh_close()
self.stdio.print(node_results)
node_results = node_results.drop('timestamp', axis=1).apply(pd.to_numeric, errors='coerce').astype(int)
# quenue_limit_size
quenue_over_sum = (node_results > self.quenue).sum(axis=0).sum()
max_queue = int(np.max(node_results.values))
analyze_result = pd.DataFrame(
{
'tenant_name': self.tenant, # teant_name
'ip': node, # ip
'is_quenue': ['yes' if quenue_over_sum > 0 else 'no'], # is_quenue
'total_count_over_limit': [int(quenue_over_sum)], # over quenue_limit_size
'max_quenue': [max_queue], # max_quenue
},
index=[0],
) # [0]
return analyze_result
count, max_queue_value = self.count_and_find_max_queues(node_results, quene_limit)
result_dict['tenant_name'] = self.tenant
result_dict['over_quene_limit'] = count
result_dict['max_quene'] = max_queue_value
return result_dict

def count_and_find_max_queues(data, quene_limit):
count = 0
max_queue_value = 0
for sublist in data:
for item in sublist:
for key, value in item.items():
if 'queue' in key: # queue
value = int(value)
if value > quene_limit:
count += 1
if value > max_queue_value:
max_queue_value = value

return count, max_queue_value

def __handle_log_list(self, ssh_client, node):
if self.directly_analyze_files:
Expand Down Expand Up @@ -355,27 +351,6 @@ def __pharse_offline_log_file(self, ssh_client, log_name, local_store_dir):
self.stdio.verbose("grep files, run cmd = [{0}]".format(grep_cmd))
ssh_client.exec_cmd(grep_cmd)

def __get_observer_ret_code(self, log_line):
"""
Get the ret code from the observer log
:param log_line
:return: ret_code
"""
prefix = "ret=-"
idx = log_line.find(prefix)
if idx < 0:
return ""
start = idx + len(prefix)
if start >= len(log_line):
return ""
end = start
while end < len(log_line):
c = log_line[end]
if c < '0' or c > '9':
break
end = end + 1
return "-" + log_line[start:end]

def __parse_log_lines(self, file_full_path):
"""
Process the observer's log line by line
Expand Down Expand Up @@ -426,9 +401,7 @@ def __parse_log_lines(self, file_full_path):

# 添加到结果列表
results.append(result)
df = pd.DataFrame(results)
df = df[['timestamp', 'req_queue_total_size', 'multi_level_queue_total_size'] + list(group_id_columns.keys())]
return df
return results

def __get_time_from_ob_log_line(self, log_line):
"""
Expand Down
3 changes: 1 addition & 2 deletions requirements3.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,5 @@ sqlgpt-parser>=0.0.1a5
netifaces==0.11.0
netifaces==0.11.0
kubernetes==30.1.0
numpy ==2.1.0
pandas==2.2.2


0 comments on commit aa40dd9

Please sign in to comment.