From 38051770fb73f47e5d4176e9e255d01460b42c8b Mon Sep 17 00:00:00 2001 From: hammad45 Date: Wed, 29 Nov 2023 23:03:32 -0800 Subject: [PATCH] Updated DXT and backtrace analysis code --- drishti/main.py | 227 +++++++++++++++++++++++++++++------------------- 1 file changed, 140 insertions(+), 87 deletions(-) diff --git a/drishti/main.py b/drishti/main.py index 7b7b781..b8c7056 100644 --- a/drishti/main.py +++ b/drishti/main.py @@ -11,7 +11,6 @@ import datetime import argparse import subprocess - import pandas as pd import numpy as np @@ -101,7 +100,7 @@ INSIGHTS_MPI_IO_AGGREGATORS_OK = 'M10' SMALL_REQUEST_SIZE_THRESHOLD = 1000000 -BACKTRACE_FILES = 2 +BACKTRACE_THRESHOLD = 2 # TODO: need to verify the threashold to be between 0 and 1 # TODO: read thresholds from file @@ -295,7 +294,7 @@ def message(code, target, level, issue, recommendations=None, details=None): if ': ' in str(detail['message']): if 'Time taken' in str(detail['message']): messages.append(' {}:left_arrow_curving_right: {}'.format( - color, + '[orange1]', detail['message'] ) ) @@ -412,7 +411,7 @@ def main(): information = darshanll.log_get_job(log) log_version = information['metadata']['lib_ver'] - library_version = darshanll.darshan.backend.cffi_backend.get_lib_version() + library_version = darshanll.get_lib_version() # Make sure log format is of the same version filename = check_log_version(args.darshan, log_version, library_version) @@ -495,7 +494,6 @@ def main(): write_start_time = [] write_operation = [] - start = time.time() for r in zip(dxt_posix['rank'], dxt_posix['read_segments'], dxt_posix['write_segments'], dxt_posix['id']): if not r[1].empty: read_id.append([r[3]] * len((r[1]['length'].to_list()))) @@ -737,6 +735,8 @@ def main(): detail = [] recommendation = [] file_count = 0 + dxt_trigger_time = 0 + for index, row in detected_files.iterrows(): if row['total_reads'] > (total_reads * THRESHOLD_SMALL_REQUESTS / 2): detail.append( @@ -749,39 +749,44 @@ def main(): } ) + # DXT Analysis if args.backtrace: start = time.time() - if file_count < BACKTRACE_FILES: + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(row['id'])] if not temp_df.empty: temp_df = temp_df.loc[temp_df['length'] < SMALL_REQUEST_SIZE_THRESHOLD] small_read_requests_ranks = temp_df['rank'].unique() - rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[0]))] + + if int(small_read_requests_ranks[0]) == 0 and len(small_read_requests_ranks) > 1: + rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[1]))] + else: + rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[0]))] + rank_df = rank_df['read_segments'].iloc[0] rank_addresses = rank_df['stack_memory_addresses'].iloc[0] address = dxt_posix.iloc[0]['address_line_mapping']['address'] res = set(list(address)) & set(rank_addresses) backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] - if len(small_read_requests_ranks) > 0: detail.append( { - 'message': '{} ranks made small read requests in "{}". The backtrace information for these read calls is given below:'.format( + 'message': '{} rank(s) made small read requests in "{}". Below is the backtrace information:'.format( len(small_read_requests_ranks), file_map[int(row['id'])] if args.full_path else os.path.basename(file_map[int(row['id'])]) ) } ) - for index, row3 in backtrace.iterrows(): + for index, row in backtrace.iterrows(): detail.append( { 'message': '{}: {}'.format( - row3['function_name'], - row3['line_number'] + row['function_name'], + row['line_number'] ) } ) @@ -789,17 +794,20 @@ def main(): else: detail.append( { - 'message': 'The backtrace information for this file is similar to previous files' + 'message': 'The backtrace information for this file is similar to the previous files' } ) end = time.time() time_taken = end - start - detail.append( - { - 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken)) - } - ) + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation.append( { @@ -842,7 +850,8 @@ def main(): detail = [] recommendation = [] file_count = 0 - start = time.time() + dxt_trigger_time = 0 + for index, row in detected_files.iterrows(): if row['total_writes'] > (total_writes * THRESHOLD_SMALL_REQUESTS / 2): detail.append( @@ -855,15 +864,22 @@ def main(): } ) + # DXT Analysis if args.backtrace: start = time.time() - if file_count < BACKTRACE_FILES: + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(row['id'])] if not temp_df.empty: temp_df = temp_df.loc[temp_df['length'] < SMALL_REQUEST_SIZE_THRESHOLD] - small_write_requests_ranks = temp_df['rank'].unique() + small_write_requests_ranks = temp_df['rank'].unique() + + if int(small_write_requests_ranks[0]) == 0 and len(small_write_requests_ranks) > 1: + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[1]))] + else: + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] rank_df = rank_df['write_segments'].iloc[0] rank_addresses = rank_df['stack_memory_addresses'].iloc[0] @@ -874,19 +890,19 @@ def main(): if len(small_write_requests_ranks) > 0: detail.append( { - 'message': '{} ranks made small write requests in "{}". The backtrace information for these write calls is given below:'.format( + 'message': '{} rank(s) made small write requests in "{}". Below is the backtrace information:'.format( len(small_write_requests_ranks), file_map[int(row['id'])] if args.full_path else os.path.basename(file_map[int(row['id'])]) ) } ) - for index, row3 in backtrace.iterrows(): + for index, row in backtrace.iterrows(): detail.append( { 'message': '{}: {}'.format( - row3['function_name'], - row3['line_number'] + row['function_name'], + row['line_number'] ) } ) @@ -901,11 +917,14 @@ def main(): end = time.time() time_taken = end - start - detail.append( - { - 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken)) - } - ) + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation.append( { 'message': 'Consider buffering write operations into larger more contiguous ones' @@ -968,8 +987,11 @@ def main(): } ) + detail = [] if 'LUSTRE' in modules: + # DXT Analysis if args.backtrace: + start = time.time() df_lustre = report.records['LUSTRE'].to_df() if not df_lustre['counters']['LUSTRE_STRIPE_SIZE'].empty: @@ -977,10 +999,8 @@ def main(): else: stripe_size = df_lustre['counters']['POSIX_FILE_ALIGNMENT'].iloc[0] - detail = [] file_count = 0 - start = time.time() ids = dxt_posix.id.unique().tolist() for id in ids: temp = dxt_posix.loc[dxt_posix['id'] == id] @@ -1014,7 +1034,7 @@ def main(): detail.append( { - 'message': '{} ranks made misaligned requests in "{}". The backtrace information for these calls is given below:'.format( + 'message': '{} rank(s) made misaligned requests in "{}". Below is the backtrace information:'.format( len(misaligned_ranks), file_map[id] if args.full_path else os.path.basename(file_map[id]) ) @@ -1061,11 +1081,12 @@ def main(): detail = [] file_count = 0 + # DXT Analysis if args.backtrace: start = time.time() ids = dxt_posix.id.unique().tolist() for id in ids: - if file_count < BACKTRACE_FILES: + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == id] redundant_ranks_ind = -1 @@ -1092,7 +1113,7 @@ def main(): detail.append( { - 'message': 'The backtrace information for these redundant read calls is given below:' + 'message': 'The backtrace information for these redundant read call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1108,7 +1129,7 @@ def main(): else: detail.append( { - 'message': 'The backtrace information for this file is similar to previous files' + 'message': 'The backtrace information for this file is similar to the previous files' } ) end = time.time() @@ -1130,11 +1151,12 @@ def main(): detail = [] file_count = 0 + # DXT Analysis if args.backtrace: start = time.time() ids = dxt_posix.id.unique().tolist() for id in ids: - if file_count < BACKTRACE_FILES: + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == id] redundant_ranks_ind = -1 @@ -1160,7 +1182,7 @@ def main(): detail.append( { - 'message': 'The backtrace information for these redundant write calls is given below:' + 'message': 'The backtrace information for these redundant write call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1176,7 +1198,7 @@ def main(): else: detail.append( { - 'message': 'The backtrace information for this file is similar to previous files' + 'message': 'The backtrace information for this file is similar to the previous files' } ) end = time.time() @@ -1216,13 +1238,14 @@ def main(): } ] + # DXT Analysis if args.backtrace: start = time.time() ids = dxt_posix.id.unique().tolist() for id in ids: temp = dxt_posix.loc[dxt_posix['id'] == id] temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == id] - temp_df.sort_values('start_time', ascending=True, inplace=True) + temp_df = temp_df.sort_values('start_time', ascending=True) random_ranks_ind = -1 if not temp_df["offsets"].is_monotonic_increasing: @@ -1249,7 +1272,7 @@ def main(): detail = [] detail.append( { - 'message': 'The backtrace information for these random read calls is given below:' + 'message': 'The backtrace information for these random read call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1303,6 +1326,8 @@ def main(): 'message': 'Consider changing your data model to have consecutive or sequential writes' } ] + + # DXT Analysis if args.backtrace: start = time.time() ids = dxt_posix.id.unique().tolist() @@ -1337,7 +1362,7 @@ def main(): detail = [] detail.append( { - 'message': 'The backtrace information for these random write calls is given below:' + 'message': 'The backtrace information for these random write call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1536,7 +1561,8 @@ def main(): detail = [] file_count = 0 - start = time.time() + dxt_trigger_time = 0 + for file in detected_files: detail.append( { @@ -1546,8 +1572,11 @@ def main(): ) } ) + + # DXT Analysis if args.backtrace: - if file_count < BACKTRACE_FILES: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] temp_df_1 = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(file[0])] temp_df_2 = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(file[0])] @@ -1573,7 +1602,7 @@ def main(): detail.append( { - 'message': 'The backtrace information for these calls is given below:' + 'message': 'The backtrace information for these imbalanced call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1590,18 +1619,20 @@ def main(): else: detail.append( { - 'message': 'The backtrace information for this file is similar to previous files' + 'message': 'The backtrace information for this file is similar to the previous files' } ) - - if args.backtrace: + end = time.time() time_taken = end - start - detail.append( - { - 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) - } - ) + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Consider better balancing the data transfer between the application ranks' @@ -1703,7 +1734,8 @@ def main(): detail = [] file_count = 0 - start = time.time() + dxt_trigger_time = 0 + for file in detected_files: detail.append( { @@ -1714,8 +1746,10 @@ def main(): } ) + # DXT Analysis if args.backtrace: - if file_count < BACKTRACE_FILES: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(file[0])] @@ -1731,7 +1765,7 @@ def main(): detail.append( { - 'message': 'The backtrace information for these write calls is given below:' + 'message': 'The backtrace information for these imbalanced write call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1748,17 +1782,20 @@ def main(): else: detail.append( { - 'message': 'The backtrace information for this file is similar to previous files' + 'message': 'The backtrace information for this file is similar to the previous files' } - ) - if args.backtrace: + ) + end = time.time() time_taken = end - start - detail.append( - { - 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) - } - ) + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: + detail.append( + { + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) + } + ) recommendation = [ { 'message': 'Consider better balancing the data transfer between the application ranks' @@ -1799,7 +1836,8 @@ def main(): detail = [] file_count = 0 - start = time.time() + dxt_trigger_time = 0 + for file in detected_files: detail.append( { @@ -1809,8 +1847,11 @@ def main(): ) } ) + + # DXT Analysis if args.backtrace: - if file_count < BACKTRACE_FILES: + start = time.time() + if file_count < BACKTRACE_THRESHOLD: temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(file[0])] @@ -1826,7 +1867,7 @@ def main(): detail.append( { - 'message': 'The backtrace information for these read calls is given below:' + 'message': 'The backtrace information for these imbalanced read call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1843,14 +1884,17 @@ def main(): else: detail.append( { - 'message': 'The backtrace information for this file is similar to previous files' + 'message': 'The backtrace information for this file is similar to the previous files' } ) - end = time.time() - time_taken = end - start + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: detail.append( { - 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) } ) recommendation = [ @@ -1882,8 +1926,6 @@ def main(): df_mpiio['counters'] = df_mpiio['counters'].assign(id=lambda d: d['id'].astype(str)) - #print(df_mpiio) - # Get the files responsible detected_files = [] @@ -1901,7 +1943,7 @@ def main(): detail = [] files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index() - start = time.time() + dxt_trigger_time = 0 for index, row in df_mpiio_collective_reads.iterrows(): if (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS and (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS_ABSOLUTE: detail.append( @@ -1913,7 +1955,10 @@ def main(): ) } ) + + # DXT Analysis if args.backtrace: + start = time.time() temp = dxt_mpiio.loc[(dxt_mpiio['id'] == int(row['id'])) & (dxt_mpiio['rank'] == 1)] temp = temp['read_segments'].iloc[0] stack_memory_addresses = temp['stack_memory_addresses'].iloc[0] @@ -1922,7 +1967,7 @@ def main(): backtrace = dxt_mpiio.iloc[0]['address_line_mapping'].loc[dxt_mpiio.iloc[0]['address_line_mapping']['address'].isin(res)] detail.append( { - 'message': 'The backtrace information for these read calls is given below:' + 'message': 'The backtrace information for these read call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -1935,12 +1980,14 @@ def main(): } ) - if args.backtrace: - end = time.time() - time_taken = end - start + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: detail.append( { - 'message': 'Time taken to process this trigger: {}s'.format(round(time_taken, 5)) + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) } ) recommendation = [ @@ -1977,8 +2024,8 @@ def main(): detail = [] files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index() - - start = time.time() + dxt_trigger_time = 0 + for index, row in df_mpiio_collective_writes.iterrows(): if (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS and (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > THRESHOLD_COLLECTIVE_OPERATIONS_ABSOLUTE: detail.append( @@ -1990,7 +2037,10 @@ def main(): ) } ) + + # DXT Analysis if args.backtrace: + start = time.time() temp = dxt_mpiio.loc[(dxt_mpiio['id'] == int(row['id'])) & (dxt_mpiio['rank'] == 1)] temp = temp['write_segments'].iloc[0] stack_memory_addresses = temp['stack_memory_addresses'].iloc[0] @@ -1999,7 +2049,7 @@ def main(): backtrace = dxt_mpiio.iloc[0]['address_line_mapping'].loc[dxt_mpiio.iloc[0]['address_line_mapping']['address'].isin(res)] detail.append( { - 'message': 'The backtrace information for these write calls is given below:' + 'message': 'The backtrace information for these write call(s) is given below:' } ) for index, row3 in backtrace.iterrows(): @@ -2011,12 +2061,15 @@ def main(): ) } ) - if args.backtrace: - end = time.time() - time_taken = end - start + + end = time.time() + time_taken = end - start + dxt_trigger_time += time_taken + + if dxt_trigger_time > 0: detail.append( { - 'message': 'Time taken to process this trigger: {}s'.format(time_taken) + 'message': 'Time taken to process this trigger: {}s'.format(round(dxt_trigger_time, 5)) } ) recommendation = [ @@ -2418,4 +2471,4 @@ def main(): if __name__ == '__main__': - main() + main() \ No newline at end of file