diff --git a/handler/analyzer/analyze_quene.py b/handler/analyzer/analyze_quene.py index ff52808d..ca5987df 100644 --- a/handler/analyzer/analyze_quene.py +++ b/handler/analyzer/analyze_quene.py @@ -168,21 +168,17 @@ def get_version(self): def handle(self): if not self.init_option(): self.stdio.error('init option failed') - # return ObdiagResult(ObdiagResult.SERVER_ERROR_CODE, error_data="init option failed") return False if not self.init_config(): self.stdio.error('init config failed') - # return ObdiagResult(ObdiagResult.SERVER_ERROR_CODE, error_data="init config failed") return False local_store_parent_dir = os.path.join(self.gather_pack_dir, "obdiag_analyze_pack_{0}".format(TimeUtils.timestamp_to_filename_time(TimeUtils.get_current_us_timestamp()))) self.stdio.verbose("Use {0} as pack dir.".format(local_store_parent_dir)) - # analyze_tuples = [] - analyze_tuples = pd.DataFrame() + analyze_tuples = [] def handle_from_node(node): node_results = self.__handle_from_node(node, local_store_parent_dir) - # analyze_tuples.append((node.get("ip"), False, node_results)) - analyze_tuples = pd.concat([analyze_tuples, node_results], axis=0) + analyze_tuples.append((node.get("ip"), False, node_results)) if self.is_ssh: nodes_new = [] @@ -215,12 +211,11 @@ def handle_from_node(node): # return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info}) def __handle_from_node(self, node, local_store_parent_dir): - # resp = {"skip": False, "error": ""} ssh_client = SshClient(self.context, node) try: - # node_results = [] - analyze_result = pd.DataFrame() - node_results = pd.DataFrame() + node_results = [] + quene_limit = self.quenue + result_dict = {} remote_ip = node.get("ip") if self.is_ssh else '127.0.0.1' remote_user = node.get("ssh_username") remote_password = node.get("ssh_password") @@ -233,8 +228,6 @@ def __handle_from_node(self, node, local_store_parent_dir): DirectoryUtil.mkdir(path=local_store_dir, stdio=self.stdio) except Exception as e: ssh_failed = True - # resp["skip"] = True - # resp["error"] = "Please check the {0}".format(self.config_path) raise Exception("Please check the {0}".format(self.config_path)) from_datetime_timestamp = TimeUtils.timestamp_to_filename_time(TimeUtils.datetime_to_timestamp(self.from_time_str)) @@ -244,8 +237,6 @@ def __handle_from_node(self, node, local_store_parent_dir): mkdir(ssh_client, gather_dir_full_path) log_list = self.__handle_log_list(ssh_client, node) - # if resp["skip"]: - # return resp, node_results self.stdio.print(FileUtil.show_file_list_tabulate(remote_ip, log_list, self.stdio)) for log_name in log_list: if self.directly_analyze_files: @@ -257,25 +248,30 @@ def __handle_from_node(self, node, local_store_parent_dir): self.stdio.start_loading('analyze log start') file_result = self.__parse_log_lines(analyze_log_full_path) self.stdio.stop_loading('analyze log sucess') - node_results = pd.concat([node_results, file_result], axis=0) + node_results.append(file_result) delete_file(ssh_client, gather_dir_full_path, self.stdio) ssh_client.ssh_close() self.stdio.print(node_results) - node_results = node_results.drop('timestamp', axis=1).apply(pd.to_numeric, errors='coerce').astype(int) - # quenue_limit_size - quenue_over_sum = (node_results > self.quenue).sum(axis=0).sum() - max_queue = int(np.max(node_results.values)) - analyze_result = pd.DataFrame( - { - 'tenant_name': self.tenant, # teant_name - 'ip': node, # ip - 'is_quenue': ['yes' if quenue_over_sum > 0 else 'no'], # is_quenue - 'total_count_over_limit': [int(quenue_over_sum)], # over quenue_limit_size - 'max_quenue': [max_queue], # max_quenue - }, - index=[0], - ) # [0] - return analyze_result + count, max_queue_value = self.count_and_find_max_queues(node_results, quene_limit) + result_dict['tenant_name'] = self.tenant + result_dict['over_quene_limit'] = count + result_dict['max_quene'] = max_queue_value + return result_dict + + def count_and_find_max_queues(data, quene_limit): + count = 0 + max_queue_value = 0 + for sublist in data: + for item in sublist: + for key, value in item.items(): + if 'queue' in key: # queue + value = int(value) + if value > quene_limit: + count += 1 + if value > max_queue_value: + max_queue_value = value + + return count, max_queue_value def __handle_log_list(self, ssh_client, node): if self.directly_analyze_files: @@ -355,27 +351,6 @@ def __pharse_offline_log_file(self, ssh_client, log_name, local_store_dir): self.stdio.verbose("grep files, run cmd = [{0}]".format(grep_cmd)) ssh_client.exec_cmd(grep_cmd) - def __get_observer_ret_code(self, log_line): - """ - Get the ret code from the observer log - :param log_line - :return: ret_code - """ - prefix = "ret=-" - idx = log_line.find(prefix) - if idx < 0: - return "" - start = idx + len(prefix) - if start >= len(log_line): - return "" - end = start - while end < len(log_line): - c = log_line[end] - if c < '0' or c > '9': - break - end = end + 1 - return "-" + log_line[start:end] - def __parse_log_lines(self, file_full_path): """ Process the observer's log line by line @@ -426,9 +401,7 @@ def __parse_log_lines(self, file_full_path): # 添加到结果列表 results.append(result) - df = pd.DataFrame(results) - df = df[['timestamp', 'req_queue_total_size', 'multi_level_queue_total_size'] + list(group_id_columns.keys())] - return df + return results def __get_time_from_ob_log_line(self, log_line): """ diff --git a/requirements3.txt b/requirements3.txt index 72588234..a903a38d 100644 --- a/requirements3.txt +++ b/requirements3.txt @@ -38,6 +38,5 @@ sqlgpt-parser>=0.0.1a5 netifaces==0.11.0 netifaces==0.11.0 kubernetes==30.1.0 -numpy ==2.1.0 -pandas==2.2.2 +