diff --git a/drishti/main.py b/drishti/main.py index 3afb96c..b8c7056 100644 --- a/drishti/main.py +++ b/drishti/main.py @@ -11,8 +11,8 @@ import datetime import argparse import subprocess - import pandas as pd +import numpy as np import darshan import darshan.backend.cffi_backend as darshanll @@ -99,6 +99,8 @@ INSIGHTS_MPI_IO_AGGREGATORS_INTER = 'M09' INSIGHTS_MPI_IO_AGGREGATORS_OK = 'M10' +SMALL_REQUEST_SIZE_THRESHOLD = 1000000 +BACKTRACE_THRESHOLD = 2 # TODO: need to verify the threashold to be between 0 and 1 # TODO: read thresholds from file @@ -188,6 +190,14 @@ dest='json', help=argparse.SUPPRESS) +parser.add_argument( + '--backtrace', + default=False, + action='store_true', + dest='backtrace', + help='Enable DXT insights and backtrace' +) + args = parser.parse_args() if args.export_size: @@ -281,11 +291,31 @@ def message(code, target, level, issue, recommendations=None, details=None): if details: for detail in details: - messages.append(' {}:left_arrow_curving_right: {}'.format( + if ': ' in str(detail['message']): + if 'Time taken' in str(detail['message']): + messages.append(' {}:left_arrow_curving_right: {}'.format( + '[orange1]', + detail['message'] + ) + ) + else: + messages.append(' {}:left_arrow_curving_right: {}'.format( + color, + detail['message'] + ) + ) + elif 'backtrace' in str(detail['message']): + messages.append(' {}:left_arrow_curving_right: {}'.format( color, detail['message'] + ) + ) + else: + messages.append(' {}:left_arrow_curving_right: {}'.format( + color, + detail['message'] + ) ) - ) if recommendations: if not args.only_issues: @@ -381,7 +411,7 @@ def main(): information = darshanll.log_get_job(log) log_version = information['metadata']['lib_ver'] - library_version = darshanll.darshan.backend.cffi_backend.get_lib_version() + library_version = darshanll.get_lib_version() # Make sure log format is of the same version filename = check_log_version(args.darshan, log_version, library_version) @@ -443,6 +473,88 @@ def main(): total_size_mpiio = 0 + if args.backtrace: + if "DXT_POSIX" in report.records: + dxt_posix = report.records["DXT_POSIX"].to_df() + dxt_posix = pd.DataFrame(dxt_posix) + + read_id = [] + read_rank = [] + read_length = [] + read_offsets = [] + read_end_time = [] + read_start_time = [] + read_operation = [] + + write_id = [] + write_rank = [] + write_length = [] + write_offsets = [] + write_end_time = [] + write_start_time = [] + write_operation = [] + + for r in zip(dxt_posix['rank'], dxt_posix['read_segments'], dxt_posix['write_segments'], dxt_posix['id']): + if not r[1].empty: + read_id.append([r[3]] * len((r[1]['length'].to_list()))) + read_rank.append([r[0]] * len((r[1]['length'].to_list()))) + read_length.append(r[1]['length'].to_list()) + read_end_time.append(r[1]['end_time'].to_list()) + read_start_time.append(r[1]['start_time'].to_list()) + read_operation.append(['read'] * len((r[1]['length'].to_list()))) + read_offsets.append(r[1]['offset'].to_list()) + + if not r[2].empty: + write_id.append([r[3]] * len((r[2]['length'].to_list()))) + write_rank.append([r[0]] * len((r[2]['length'].to_list()))) + write_length.append(r[2]['length'].to_list()) + write_end_time.append(r[2]['end_time'].to_list()) + write_start_time.append(r[2]['start_time'].to_list()) + write_operation.append(['write'] * len((r[2]['length'].to_list()))) + write_offsets.append(r[2]['offset'].to_list()) + + read_id = [element for nestedlist in read_id for element in nestedlist] + read_rank = [element for nestedlist in read_rank for element in nestedlist] + read_length = [element for nestedlist in read_length for element in nestedlist] + read_offsets = [element for nestedlist in read_offsets for element in nestedlist] + read_end_time = [element for nestedlist in read_end_time for element in nestedlist] + read_operation = [element for nestedlist in read_operation for element in nestedlist] + read_start_time = [element for nestedlist in read_start_time for element in nestedlist] + + write_id = [element for nestedlist in write_id for element in nestedlist] + write_rank = [element for nestedlist in write_rank for element in nestedlist] + write_length = [element for nestedlist in write_length for element in nestedlist] + write_offsets = [element for nestedlist in write_offsets for element in nestedlist] + write_end_time = [element for nestedlist in write_end_time for element in nestedlist] + write_operation = [element for nestedlist in write_operation for element in nestedlist] + write_start_time = [element for nestedlist in write_start_time for element in nestedlist] + + dxt_posix_read_data = pd.DataFrame( + { + 'id': read_id, + 'rank': read_rank, + 'length': read_length, + 'end_time': read_end_time, + 'start_time': read_start_time, + 'operation': read_operation, + 'offsets': read_offsets, + }) + + dxt_posix_write_data = pd.DataFrame( + { + 'id': write_id, + 'rank': write_rank, + 'length': write_length, + 'end_time': write_end_time, + 'start_time': write_start_time, + 'operation': write_operation, + 'offsets': write_offsets, + }) + + if "DXT_MPIIO" in report.records: + dxt_mpiio = report.records["DXT_MPIIO"].to_df() + dxt_mpiio = pd.DataFrame(dxt_mpiio) + # Since POSIX will capture both POSIX-only accesses and those comming from MPI-IO, we can subtract those if total_size_posix > 0 and total_size_posix >= total_size_mpiio: total_size_posix -= total_size_mpiio @@ -554,7 +666,6 @@ def main(): issue = 'Application is read operation intensive ({:.2f}% writes vs. {:.2f}% reads)'.format( total_writes / total_operations * 100.0, total_reads / total_operations * 100.0 ) - insights_metadata.append( message(INSIGHTS_POSIX_READ_COUNT_INTENSIVE, TARGET_DEVELOPER, INFO, issue, None) ) @@ -623,6 +734,8 @@ def main(): detail = [] recommendation = [] + file_count = 0 + dxt_trigger_time = 0 for index, row in detected_files.iterrows(): if row['total_reads'] > (total_reads * THRESHOLD_SMALL_REQUESTS / 2): @@ -636,6 +749,66 @@ def main(): } ) + # DXT Analysis + if args.backtrace: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] + temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(row['id'])] + + if not temp_df.empty: + temp_df = temp_df.loc[temp_df['length'] < SMALL_REQUEST_SIZE_THRESHOLD] + small_read_requests_ranks = temp_df['rank'].unique() + + if int(small_read_requests_ranks[0]) == 0 and len(small_read_requests_ranks) > 1: + rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[1]))] + else: + rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[0]))] + + rank_df = rank_df['read_segments'].iloc[0] + rank_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(rank_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + if len(small_read_requests_ranks) > 0: + detail.append( + { + 'message': '{} rank(s) made small read requests in "{}". Below is the backtrace information:'.format( + len(small_read_requests_ranks), + file_map[int(row['id'])] if args.full_path else os.path.basename(file_map[int(row['id'])]) + ) + } + ) + + for index, row in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row['function_name'], + row['line_number'] + ) + } + ) + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to the previous files' + } + ) + + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) + recommendation.append( { 'message': 'Consider buffering read operations into larger more contiguous ones' @@ -657,7 +830,7 @@ def main(): ) insights_operation.append( - message(INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation, detail) + message(INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation, detail) ) # Get the number of small I/O operations (less than the stripe size) @@ -676,6 +849,8 @@ def main(): detail = [] recommendation = [] + file_count = 0 + dxt_trigger_time = 0 for index, row in detected_files.iterrows(): if row['total_writes'] > (total_writes * THRESHOLD_SMALL_REQUESTS / 2): @@ -688,7 +863,68 @@ def main(): ) } ) + + # DXT Analysis + if args.backtrace: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] + temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(row['id'])] + + if not temp_df.empty: + temp_df = temp_df.loc[temp_df['length'] < SMALL_REQUEST_SIZE_THRESHOLD] + small_write_requests_ranks = temp_df['rank'].unique() + + if int(small_write_requests_ranks[0]) == 0 and len(small_write_requests_ranks) > 1: + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[1]))] + else: + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] + + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] + rank_df = rank_df['write_segments'].iloc[0] + rank_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(rank_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + if len(small_write_requests_ranks) > 0: + detail.append( + { + 'message': '{} rank(s) made small write requests in "{}". Below is the backtrace information:'.format( + len(small_write_requests_ranks), + file_map[int(row['id'])] if args.full_path else os.path.basename(file_map[int(row['id'])]) + ) + } + ) + + for index, row in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row['function_name'], + row['line_number'] + ) + } + ) + + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to previous files' + } + ) + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation.append( { 'message': 'Consider buffering write operations into larger more contiguous ones' @@ -710,7 +946,7 @@ def main(): ) insights_operation.append( - message(INSIGHTS_POSIX_HIGH_SMALL_READ_REQUESTS_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation, detail) + message(INSIGHTS_POSIX_HIGH_SMALL_WRITE_REQUESTS_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation, detail) ) ######################################################################################################################################################################### @@ -751,7 +987,78 @@ def main(): } ) + detail = [] if 'LUSTRE' in modules: + # DXT Analysis + if args.backtrace: + start = time.time() + df_lustre = report.records['LUSTRE'].to_df() + + if not df_lustre['counters']['LUSTRE_STRIPE_SIZE'].empty: + stripe_size = df_lustre['counters']['LUSTRE_STRIPE_SIZE'].iloc[0] + else: + stripe_size = df_lustre['counters']['POSIX_FILE_ALIGNMENT'].iloc[0] + + file_count = 0 + + ids = dxt_posix.id.unique().tolist() + for id in ids: + temp = dxt_posix.loc[dxt_posix['id'] == id] + temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == id] + + misaligned_ranks = [] + misaligned_ranks_opr = [] + + offsets = temp_df["offsets"].to_numpy().tolist() + rank = temp_df["rank"].to_numpy().tolist() + operation = temp_df["operation"].to_numpy().tolist() + + for i in range(len(offsets)): + if offsets[i] % stripe_size != 0: + misaligned_ranks.append(rank[i]) + misaligned_ranks_opr.append(operation[i]) + + if misaligned_ranks: + misaligned_rank_ind = misaligned_ranks[0] + misaligned_rank_opr = misaligned_ranks_opr[0] + misaligned_rank_df = temp.loc[(temp['rank'] == int(misaligned_rank_ind))] + if misaligned_rank_opr == 'read': + misaligned_rank_df = misaligned_rank_df['read_segments'].iloc[0] + else: + misaligned_rank_df = misaligned_rank_df['write_segments'].iloc[0] + misaligned_rank_stack_addresses = misaligned_rank_df['stack_memory_addresses'].iloc[0] + + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(misaligned_rank_stack_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + detail.append( + { + 'message': '{} rank(s) made misaligned requests in "{}". Below is the backtrace information:'.format( + len(misaligned_ranks), + file_map[id] if args.full_path else os.path.basename(file_map[id]) + ) + } + ) + + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + file_count += 1 + + end = time.time() + time_taken = end - start + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + } + ) recommendation.append( { 'message': 'Consider using a Lustre alignment that matches the file system stripe configuration', @@ -760,7 +1067,7 @@ def main(): ) insights_metadata.append( - message(INSIGHTS_POSIX_HIGH_MISALIGNED_FILE_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation) + message(INSIGHTS_POSIX_HIGH_MISALIGNED_FILE_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation, detail) ) ######################################################################################################################################################################### @@ -770,19 +1077,139 @@ def main(): max_read_offset = df['counters']['POSIX_MAX_BYTE_READ'].max() if max_read_offset > total_read_size: - issue = 'Application might have redundant read traffic (more data read than the highest offset)' - + issue = 'Application might have redundant read traffic (more data read than the highest offset)' + detail = [] + file_count = 0 + + # DXT Analysis + if args.backtrace: + start = time.time() + ids = dxt_posix.id.unique().tolist() + for id in ids: + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == id] + + redundant_ranks_ind = -1 + temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == id] + updated_offsets = (temp_df["offsets"].to_numpy()).tolist() + + for i in range(len(updated_offsets)): + if updated_offsets.count(updated_offsets[i]) > 1: + redundant_ranks_ind = i + break + + if random_ranks_ind != -1: + random_rank = temp_df.iloc[redundant_ranks_ind]['rank'] + random_offsets = temp_df.iloc[redundant_ranks_ind]['offsets'] + + temp_random_rank = temp.loc[(temp['rank'] == int(random_rank))] + temp_random_rank = temp_random_rank['read_segments'].iloc[0] + random_stack_addresses = temp_random_rank.loc[(temp_random_rank['offset'] == random_offsets) & (temp_random_rank['start_time'] == random_start_time)] + random_stack_addresses = random_stack_addresses['stack_memory_addresses'].iloc[0] + + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(random_stack_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + detail.append( + { + 'message': 'The backtrace information for these redundant read call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to the previous files' + } + ) + end = time.time() + time_taken = end - start + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + } + ) insights_metadata.append( - message(INSIGHTS_POSIX_REDUNDANT_READ_USAGE, TARGET_DEVELOPER, WARN, issue, None) + message(INSIGHTS_POSIX_REDUNDANT_READ_USAGE, TARGET_DEVELOPER, WARN, issue, None, detail) ) max_write_offset = df['counters']['POSIX_MAX_BYTE_WRITTEN'].max() if max_write_offset > total_written_size: issue = 'Application might have redundant write traffic (more data written than the highest offset)' - + + detail = [] + file_count = 0 + + # DXT Analysis + if args.backtrace: + start = time.time() + ids = dxt_posix.id.unique().tolist() + for id in ids: + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == id] + + redundant_ranks_ind = -1 + temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == id] + updated_offsets = (temp_df["offsets"].to_numpy()).tolist() + for i in range(len(updated_offsets)): + if updated_offsets.count(updated_offsets[i]) > 1: + redundant_ranks_ind = i + break + + if random_ranks_ind != -1: + random_rank = temp_df.iloc[redundant_ranks_ind]['rank'] + random_offsets = temp_df.iloc[redundant_ranks_ind]['offsets'] + + temp_random_rank = temp.loc[(temp['rank'] == int(random_rank))] + temp_random_rank = temp_random_rank['write_segments'].iloc[0] + random_stack_addresses = temp_random_rank.loc[(temp_random_rank['offset'] == random_offsets) & (temp_random_rank['start_time'] == random_start_time)] + random_stack_addresses = random_stack_addresses['stack_memory_addresses'].iloc[0] + + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(random_stack_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + detail.append( + { + 'message': 'The backtrace information for these redundant write call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to the previous files' + } + ) + end = time.time() + time_taken = end - start + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + } + ) insights_metadata.append( - message(INSIGHTS_POSIX_REDUNDANT_WRITE_USAGE, TARGET_DEVELOPER, WARN, issue, None) + message(INSIGHTS_POSIX_REDUNDANT_WRITE_USAGE, TARGET_DEVELOPER, WARN, issue, None, detail) ) ######################################################################################################################################################################### @@ -811,8 +1238,62 @@ def main(): } ] + # DXT Analysis + if args.backtrace: + start = time.time() + ids = dxt_posix.id.unique().tolist() + for id in ids: + temp = dxt_posix.loc[dxt_posix['id'] == id] + temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == id] + temp_df = temp_df.sort_values('start_time', ascending=True) + random_ranks_ind = -1 + + if not temp_df["offsets"].is_monotonic_increasing: + updated_offsets = (temp_df["offsets"].to_numpy()).tolist() + cur = 0 + for i in range(len(updated_offsets)): + if updated_offsets[i] < cur: + random_ranks_ind = i + break + cur = updated_offsets[i] + + if random_ranks_ind != -1: + random_rank = temp_df.iloc[random_ranks_ind]['rank'] + random_offsets = temp_df.iloc[random_ranks_ind]['offsets'] + random_start_time = temp_df.iloc[random_ranks_ind]['start_time'] + temp_random_rank = temp.loc[(temp['rank'] == int(random_rank))] + temp_random_rank = temp_random_rank['read_segments'].iloc[0] + random_stack_addresses = temp_random_rank.loc[(temp_random_rank['offset'] == random_offsets) & (temp_random_rank['start_time'] == random_start_time)] + random_stack_addresses = random_stack_addresses['stack_memory_addresses'].iloc[0] + + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(random_stack_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + detail = [] + detail.append( + { + 'message': 'The backtrace information for these random read call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + end = time.time() + time_taken = end - start + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + } + ) + insights_operation.append( - message(INSIGHTS_POSIX_HIGH_RANDOM_READ_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation) + message(INSIGHTS_POSIX_HIGH_RANDOM_READ_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation, detail) ) else: issue = 'Application mostly uses consecutive ({:.2f}%) and sequential ({:.2f}%) read requests'.format( @@ -846,6 +1327,61 @@ def main(): } ] + # DXT Analysis + if args.backtrace: + start = time.time() + ids = dxt_posix.id.unique().tolist() + for id in ids: + temp = dxt_posix.loc[dxt_posix['id'] == id] + + temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == id] + temp_df.sort_values('start_time', ascending=True, inplace=True) + random_ranks_ind = -1 + if not temp_df["offsets"].is_monotonic_increasing: + updated_offsets = (temp_df["offsets"].to_numpy()).tolist() + cur = 0 + for i in range(len(updated_offsets)): + if updated_offsets[i] < cur: + random_ranks_ind = i + break + cur = updated_offsets[i] + + if random_ranks_ind != -1: + random_rank = temp_df.iloc[random_ranks_ind]['rank'] + random_offsets = temp_df.iloc[random_ranks_ind]['offsets'] + random_start_time = temp_df.iloc[random_ranks_ind]['start_time'] + + temp_random_rank = temp.loc[(temp['rank'] == int(random_rank))] + temp_random_rank = temp_random_rank['write_segments'].iloc[0] + random_stack_addresses = temp_random_rank.loc[(temp_random_rank['offset'] == random_offsets) & (temp_random_rank['start_time'] == random_start_time)] + random_stack_addresses = random_stack_addresses['stack_memory_addresses'].iloc[0] + + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(random_stack_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + detail = [] + detail.append( + { + 'message': 'The backtrace information for these random write call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + + end = time.time() + time_taken = end - start + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + } + ) insights_operation.append( message(INSIGHTS_POSIX_HIGH_RANDOM_WRITE_USAGE, TARGET_DEVELOPER, HIGH, issue, recommendation) ) @@ -1024,7 +1560,9 @@ def main(): ) detail = [] - + file_count = 0 + dxt_trigger_time = 0 + for file in detected_files: detail.append( { @@ -1035,6 +1573,66 @@ def main(): } ) + # DXT Analysis + if args.backtrace: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] + temp_df_1 = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(file[0])] + temp_df_2 = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(file[0])] + + df_merged = pd.concat([temp_df_1, temp_df_2], ignore_index=True, sort=False) + df_merged['duration'] = df_merged['end_time'] - df_merged['start_time'] + df_merged.sort_values('duration', ascending=True, inplace=True) + df_merged = df_merged.iloc[0] + rank_df = temp.loc[(temp['rank'] == int(df_merged['rank']))] + + if df_merged['operation'] == 'write': + rank_df = rank_df['write_segments'].iloc[0] + stack_memory_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(stack_memory_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + else: + rank_df = rank_df['read_segments'].iloc[0] + stack_memory_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(stack_memory_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + detail.append( + { + 'message': 'The backtrace information for these imbalanced call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to the previous files' + } + ) + + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Consider better balancing the data transfer between the application ranks' @@ -1135,7 +1733,9 @@ def main(): ) detail = [] - + file_count = 0 + dxt_trigger_time = 0 + for file in detected_files: detail.append( { @@ -1146,6 +1746,56 @@ def main(): } ) + # DXT Analysis + if args.backtrace: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] + temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(file[0])] + + maxClm = temp_df['length'].max() + temp_df = temp_df.loc[(temp_df['length'] == maxClm)] + rank_df = temp.loc[(temp['rank'] == int(temp_df['rank'].iloc[0]))] + + rank_df = rank_df['write_segments'].iloc[0] + stack_memory_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(stack_memory_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + detail.append( + { + 'message': 'The backtrace information for these imbalanced write call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to the previous files' + } + ) + + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Consider better balancing the data transfer between the application ranks' @@ -1185,7 +1835,9 @@ def main(): ) detail = [] - + file_count = 0 + dxt_trigger_time = 0 + for file in detected_files: detail.append( { @@ -1195,7 +1847,56 @@ def main(): ) } ) + + # DXT Analysis + if args.backtrace: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: + temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] + temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(file[0])] + + maxClm = temp_df['length'].max() + temp_df = temp_df.loc[(temp_df['length'] == maxClm)] + rank_df = temp.loc[(temp['rank'] == int(temp_df['rank'].iloc[0]))] + + rank_df = rank_df['read_segments'].iloc[0] + stack_memory_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(stack_memory_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + + detail.append( + { + 'message': 'The backtrace information for these imbalanced read call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + + file_count += 1 + else: + detail.append( + { + 'message': 'The backtrace information for this file is similar to the previous files' + } + ) + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Consider better balancing the data transfer between the application ranks' @@ -1225,8 +1926,6 @@ def main(): df_mpiio['counters'] = df_mpiio['counters'].assign(id=lambda d: d['id'].astype(str)) - #print(df_mpiio) - # Get the files responsible detected_files = [] @@ -1244,7 +1943,7 @@ def main(): detail = [] files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index() - + dxt_trigger_time = 0 for index, row in df_mpiio_collective_reads.iterrows(): if (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS and (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS_ABSOLUTE: detail.append( @@ -1256,7 +1955,41 @@ def main(): ) } ) - + + # DXT Analysis + if args.backtrace: + start = time.time() + temp = dxt_mpiio.loc[(dxt_mpiio['id'] == int(row['id'])) & (dxt_mpiio['rank'] == 1)] + temp = temp['read_segments'].iloc[0] + stack_memory_addresses = temp['stack_memory_addresses'].iloc[0] + address = dxt_mpiio.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(stack_memory_addresses) + backtrace = dxt_mpiio.iloc[0]['address_line_mapping'].loc[dxt_mpiio.iloc[0]['address_line_mapping']['address'].isin(res)] + detail.append( + { + 'message': 'The backtrace information for these read call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Use collective read operations (e.g. MPI_File_read_all() or MPI_File_read_at_all()) and set one aggregator per compute node', @@ -1291,7 +2024,8 @@ def main(): detail = [] files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index() - + dxt_trigger_time = 0 + for index, row in df_mpiio_collective_writes.iterrows(): if (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS and (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS_ABSOLUTE: detail.append( @@ -1303,7 +2037,41 @@ def main(): ) } ) - + + # DXT Analysis + if args.backtrace: + start = time.time() + temp = dxt_mpiio.loc[(dxt_mpiio['id'] == int(row['id'])) & (dxt_mpiio['rank'] == 1)] + temp = temp['write_segments'].iloc[0] + stack_memory_addresses = temp['stack_memory_addresses'].iloc[0] + address = dxt_mpiio.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(stack_memory_addresses) + backtrace = dxt_mpiio.iloc[0]['address_line_mapping'].loc[dxt_mpiio.iloc[0]['address_line_mapping']['address'].isin(res)] + detail.append( + { + 'message': 'The backtrace information for these write call(s) is given below:' + } + ) + for index, row3 in backtrace.iterrows(): + detail.append( + { + 'message': '{}: {}'.format( + row3['function_name'], + row3['line_number'] + ) + } + ) + + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Use collective write operations (e.g. MPI_File_write_all() or MPI_File_write_at_all()) and set one aggregator per compute node', @@ -1363,7 +2131,6 @@ def main(): if df_mpiio['counters']['MPIIO_NB_WRITES'].sum() == 0: issue = 'Application could benefit from non-blocking (asynchronous) writes' - recommendation = [] if 'H5F' in modules or has_hdf5_extension: @@ -1505,9 +2272,6 @@ def main(): else: job_start = datetime.datetime.fromtimestamp(job['job']['start_time_sec'], datetime.timezone.utc) job_end = datetime.datetime.fromtimestamp(job['job']['end_time_sec'], datetime.timezone.utc) - - console.print() - console.print( Panel( '\n'.join([ @@ -1707,4 +2471,4 @@ def main(): if __name__ == '__main__': - main() + main() \ No newline at end of file