Skip to content

Commit

Permalink
Enable thresholds display
Browse files Browse the repository at this point in the history
  • Loading branch information
onewbiek committed Feb 2, 2024
1 parent 872d801 commit 68ebab8
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 74 deletions.
23 changes: 12 additions & 11 deletions drishti/handlers/handle_darshan.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def handler():

#########################################################################################################################################################################

count_long_metadata = len(df['fcounters'][(df['fcounters']['POSIX_F_META_TIME'] > metadata_time_rank)])
count_long_metadata = len(df['fcounters'][(df['fcounters']['POSIX_F_META_TIME'] > thresholds['metadata_time_rank'][0])])

check_long_metadata(count_long_metadata, modules)

Expand All @@ -375,7 +375,7 @@ def handler():
for index, row in shared_files.iterrows():
total_transfer_size = row['POSIX_BYTES_WRITTEN'] + row['POSIX_BYTES_READ']

if total_transfer_size and abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > imbalance_stragglers:
if total_transfer_size and abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > thresholds['imbalance_stragglers'][0]:
stragglers_count += 1

detected_files.append([
Expand Down Expand Up @@ -403,7 +403,7 @@ def handler():
for index, row in shared_files_times.iterrows():
total_transfer_time = row['POSIX_F_WRITE_TIME'] + row['POSIX_F_READ_TIME'] + row['POSIX_F_META_TIME']

if total_transfer_time and abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > imbalance_stragglers:
if total_transfer_time and abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > thresholds['imbalance_stragglers'][0]:
stragglers_count += 1

detected_files.append([
Expand Down Expand Up @@ -432,7 +432,7 @@ def handler():
detected_files = []

for index, row in aggregated.iterrows():
if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row['POSIX_BYTES_WRITTEN_max'] > imbalance_size:
if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row['POSIX_BYTES_WRITTEN_max'] > thresholds['imbalance_size'][0]:
imbalance_count += 1

detected_files.append([
Expand All @@ -448,7 +448,7 @@ def handler():
detected_files = []

for index, row in aggregated.iterrows():
if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] > imbalance_size:
if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] > thresholds['imbalance_size'][0]:
imbalance_count += 1

detected_files.append([
Expand Down Expand Up @@ -478,12 +478,12 @@ def handler():
mpiio_indep_reads = df_mpiio['counters']['MPIIO_INDEP_READS'].sum()

detected_files = []
if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > collective_operations_absolute:
if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > thresholds['collective_operations_absolute'][0]:
files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index()
for index, row in df_mpiio_collective_reads.iterrows():
if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and
row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > collective_operations and
(row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > collective_operations_absolute):
row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations'][0] and
(row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations_absolute'][0]):

detected_files.append([
row['id'], row['MPIIO_INDEP_READS'], row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100
Expand All @@ -502,13 +502,13 @@ def handler():
mpiio_indep_writes = df_mpiio['counters']['MPIIO_INDEP_WRITES'].sum()

detected_files = []
if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > collective_operations_absolute:
if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > thresholds['collective_operations_absolute'][0]:
files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index()

for index, row in df_mpiio_collective_writes.iterrows():
if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and
row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > collective_operations and
(row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > collective_operations_absolute):
row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations'][0] and
(row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > thresholds['collective_operations_absolute'][0]):

detected_files.append([
row['id'], row['MPIIO_INDEP_WRITES'], row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100
Expand Down Expand Up @@ -651,6 +651,7 @@ def handler():
console.print()

display_content(console)
display_thresholds(console)
display_footer(console, insights_start_time, insights_end_time)

filename = '{}.html'.format(args.log_path)
Expand Down
39 changes: 21 additions & 18 deletions drishti/handlers/handle_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def handler():
file_map = None

if os.path.exists(args.log_path + '.intervals.csv') and os.path.exists(args.log_path + '.records.csv') and os.path.exists(args.log_path + '.filemap.csv'):
print('Using existing parsed log file')
print('Using parsed file: {}'.format(os.path.abspath(args.log_path + '.intervals.csv')))
print('Using parsed file: {}'.format(os.path.abspath(args.log_path + '.records.csv')))
print('Using parsed file: {}'.format(os.path.abspath(args.log_path + '.filemap.csv')))
df_intervals = pd.read_csv(args.log_path + '.intervals.csv')
df_posix_records = pd.read_csv(args.log_path + '.records.csv')
df_file_map = pd.read_csv(args.log_path + '.filemap.csv')
Expand Down Expand Up @@ -174,16 +176,16 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):

# Get the number of small I/O operations (less than 1 MB)

total_reads_small = len(df_posix[(df_posix['function'].str.contains('read')) & (df_posix['size'] < small_bytes)])
total_writes_small = len(df_posix[~(df_posix['function'].str.contains('read')) & (df_posix['size'] < small_bytes)])
total_reads_small = len(df_posix[(df_posix['function'].str.contains('read')) & (df_posix['size'] < thresholds['small_bytes'][0])])
total_writes_small = len(df_posix[~(df_posix['function'].str.contains('read')) & (df_posix['size'] < thresholds['small_bytes'][0])])

if args.split_files:
detected_files = pd.DataFrame()
else:
detected_files = []
for id in file_map.keys():
read_cnt = len(df_posix[(df_posix['file_id'] == id) & (df_posix['function'].str.contains('read')) & (df_posix['size'] < small_bytes)])
write_cnt = len(df_posix[(df_posix['file_id'] == id) & ~(df_posix['function'].str.contains('read')) & (df_posix['size'] < small_bytes)])
read_cnt = len(df_posix[(df_posix['file_id'] == id) & (df_posix['function'].str.contains('read')) & (df_posix['size'] < thresholds['small_bytes'][0])])
write_cnt = len(df_posix[(df_posix['file_id'] == id) & ~(df_posix['function'].str.contains('read')) & (df_posix['size'] < thresholds['small_bytes'][0])])
detected_files.append([id, read_cnt, write_cnt])

column_names = ['id', 'total_reads', 'total_writes']
Expand Down Expand Up @@ -258,12 +260,12 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
total_shared_reads = len(df_posix[(df_posix['file_id'].isin(shared_files)) & (df_posix['function'].str.contains('read'))])
total_shared_reads_small = len(df_posix[(df_posix['file_id'].isin(shared_files))
& (df_posix['function'].str.contains('read'))
& (df_posix['size'] < small_bytes)])
& (df_posix['size'] < thresholds['small_bytes'][0])])

total_shared_writes = len(df_posix[(df_posix['file_id'].isin(shared_files)) & ~(df_posix['function'].str.contains('read'))])
total_shared_writes_small = len(df_posix[(df_posix['file_id'].isin(shared_files))
& ~(df_posix['function'].str.contains('read'))
& (df_posix['size'] < small_bytes)])
& (df_posix['size'] < thresholds['small_bytes'][0])])

if args.split_files:
detected_files = pd.DataFrame()
Expand All @@ -272,10 +274,10 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
for id in shared_files:
read_cnt = len(df_posix[(df_posix['file_id'] == id)
& (df_posix['function'].str.contains('read'))
& (df_posix['size'] < small_bytes)])
& (df_posix['size'] < thresholds['small_bytes'][0])])
write_cnt = len(df_posix[(df_posix['file_id'] == id)
& ~(df_posix['function'].str.contains('read'))
& (df_posix['size'] < small_bytes)])
& (df_posix['size'] < thresholds['small_bytes'][0])])
detected_files.append([id, read_cnt, write_cnt])

column_names = ['id', 'INSIGHTS_POSIX_SMALL_READS', 'INSIGHTS_POSIX_SMALL_WRITES']
Expand All @@ -287,7 +289,7 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):

# TODO: Assumed metadata operations: open, close, sync, create, seek
df_detected = df_posix_records.groupby('rank')['duration'].sum().reset_index()
count_long_metadata = len(df_detected[(df_detected['duration'] > metadata_time_rank)])
count_long_metadata = len(df_detected[(df_detected['duration'] > thresholds['metadata_time_rank'][0])])

check_long_metadata(count_long_metadata, modules)

Expand Down Expand Up @@ -318,7 +320,7 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
slowest_rank_bytes = df_detected.loc[df_detected['duration'].idxmax(), 'size']
fastest_rank_bytes = df_detected.loc[df_detected['duration'].idxmin(), 'size']

if total_transfer_size and abs(slowest_rank_bytes - fastest_rank_bytes) / total_transfer_size > imbalance_stragglers:
if total_transfer_size and abs(slowest_rank_bytes - fastest_rank_bytes) / total_transfer_size > thresholds['imbalance_stragglers'][0]:
stragglers_count += 1

detected_files.append([
Expand Down Expand Up @@ -356,7 +358,7 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
slowest_rank_time = df_detected['duration'].max()
fastest_rank_time = df_detected['duration'].min()

if total_transfer_time and abs(slowest_rank_time - fastest_rank_time) / total_transfer_time > imbalance_stragglers:
if total_transfer_time and abs(slowest_rank_time - fastest_rank_time) / total_transfer_time > thresholds['imbalance_stragglers'][0]:
stragglers_count += 1

detected_files.append([
Expand Down Expand Up @@ -396,7 +398,7 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
max_bytes_written = df_detected['size'].max()
min_bytes_written = df_detected['size'].min()

if max_bytes_written and abs(max_bytes_written - min_bytes_written) / max_bytes_written > imbalance_size:
if max_bytes_written and abs(max_bytes_written - min_bytes_written) / max_bytes_written > thresholds['imbalance_size'][0]:
imbalance_count += 1

detected_files.append([
Expand All @@ -417,7 +419,7 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
max_bytes_read = df_detected['size'].max()
min_bytes_read = df_detected['size'].min()

if max_bytes_read and abs(max_bytes_read - min_bytes_read) / max_bytes_read > imbalance_size:
if max_bytes_read and abs(max_bytes_read - min_bytes_read) / max_bytes_read > thresholds['imbalance_size'][0]:
imbalance_count += 1

detected_files.append([
Expand Down Expand Up @@ -448,13 +450,13 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
detected_files = pd.DataFrame()
else:
detected_files = []
if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > collective_operations_absolute:
if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > thresholds['collective_operations_absolute'][0]:
for id in file_map.keys():
indep_read_count = df_mpiio_reads[~(df_mpiio_reads['function'].str.contains('_all')) & (df_mpiio_reads['file_id'] == id)]
indep_write_count = df_mpiio_writes[~(df_mpiio_writes['function'].str.contains('_all')) & (df_mpiio_writes['file_id'] == id)]
indep_total_count = indep_read_count + indep_write_count

if (indep_total_count > collective_operations_absolute and indep_read_count / indep_total_count > collective_operations):
if (indep_total_count > thresholds['collective_operations_absolute'][0] and indep_read_count / indep_total_count > thresholds['collective_operations'][0]):
detected_files.append([
id, indep_read_count, indep_read_count / indep_total_count * 100
])
Expand All @@ -468,13 +470,13 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
detected_files = pd.DataFrame()
else:
detected_files = []
if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > collective_operations_absolute:
if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > thresholds['collective_operations_absolute'][0]:
for id in file_map.keys():
indep_read_count = df_mpiio_reads[~(df_mpiio_reads['function'].str.contains('_all')) & (df_mpiio_reads['file_id'] == id)]
indep_write_count = df_mpiio_writes[~(df_mpiio_writes['function'].str.contains('_all')) & (df_mpiio_writes['file_id'] == id)]
indep_total_count = indep_read_count + indep_write_count

if (indep_total_count > collective_operations_absolute and indep_write_count / indep_total_count > collective_operations):
if (indep_total_count > thresholds['collective_operations_absolute'][0] and indep_write_count / indep_total_count > thresholds['collective_operations'][0]):
detected_files.append([
id, indep_write_count, indep_write_count / indep_total_count * 100
])
Expand Down Expand Up @@ -572,6 +574,7 @@ def process_helper(file_map, df_intervals, df_posix_records, fid=None):
console.print()

display_content(console)
display_thresholds(console)
display_footer(console, insights_start_time, insights_end_time)

if args.split_files:
Expand Down
48 changes: 27 additions & 21 deletions drishti/includes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@
insights_total[WARN] = 0
insights_total[RECOMMENDATIONS] = 0

imbalance_operations = 0.1
small_bytes = 1048576 # 1MB
small_requests = 0.1
small_requests_absolute = 1000
misaligned_requests = 0.1
metadata_time_rank = 30 # seconds
random_operations = 0.2
random_operations_absolute = 1000
imbalance_stragglers = 0.15
imbalance_size = 0.30
interface_stdio = 0.1
collective_operations = 0.5
collective_operations_absolute = 1000
thresholds = {
'imbalance_operations': [0.1, False],
'small_bytes': [1048576, False],
'small_requests': [0.1, False],
'small_requests_absolute': [1000, False],
'misaligned_requests': [0.1, False],
'metadata_time_rank': [30, False],
'random_operations': [0.2, False],
'random_operations_absolute': [1000, False],
'imbalance_stragglers': [0.15, False],
'imbalance_size': [0.3, False],
'interface_stdio': [0.1, False],
'collective_operations': [0.5, False],
'collective_operations_absolute': [1000, False],
}

INSIGHTS_STDIO_HIGH_USAGE = 'S01'
INSIGHTS_POSIX_WRITE_COUNT_INTENSIVE = 'P01'
Expand Down Expand Up @@ -98,6 +100,10 @@ def init_console():
insights_total[HIGH] = 0
insights_total[WARN] = 0
insights_total[RECOMMENDATIONS] = 0

for name in thresholds:
thresholds[name][1] = False

return console


Expand Down Expand Up @@ -166,14 +172,14 @@ def validate_thresholds():

for category, thresholds_spec in data.items():
for threshold_name, threshold_value in thresholds_spec.items():
globals()[threshold_name] = threshold_value

assert(imbalance_operations >= 0.0 and imbalance_operations <= 1.0)
assert(small_requests >= 0.0 and small_requests <= 1.0)
assert(misaligned_requests >= 0.0 and misaligned_requests <= 1.0)
assert(random_operations >= 0.0 and random_operations <= 1.0)

assert(metadata_time_rank >= 0.0)
thresholds[category + '_' + threshold_name][0] = threshold_value
assert(thresholds['imbalance_operations'][0] >= 0.0 and thresholds['imbalance_operations'][0] <= 1.0)
assert(thresholds['small_requests'][0] >= 0.0 and thresholds['small_requests'][0] <= 1.0)
assert(thresholds['misaligned_requests'][0] >= 0.0 and thresholds['misaligned_requests'][0] <= 1.0)
assert(thresholds['random_operations'][0] >= 0.0 and thresholds['random_operations'][0] <= 1.0)

assert(thresholds['metadata_time_rank'][0] >= 0.0)


def convert_bytes(bytes_number):
Expand Down
Loading

0 comments on commit 68ebab8

Please sign in to comment.