From bb4d911a2333d5ae4d35cffb1e00bf741c43f0a1 Mon Sep 17 00:00:00 2001 From: Joel Tony Date: Sat, 31 Aug 2024 00:28:21 +0530 Subject: [PATCH] feat (part): Process an aggregated darshan trace --- drishti/handlers/handle_darshan.py | 623 +++++++++++++++++++---------- 1 file changed, 418 insertions(+), 205 deletions(-) diff --git a/drishti/handlers/handle_darshan.py b/drishti/handlers/handle_darshan.py index 72e8558..cf947ce 100644 --- a/drishti/handlers/handle_darshan.py +++ b/drishti/handlers/handle_darshan.py @@ -2,13 +2,16 @@ import dataclasses import io import shlex +import csv import shutil import subprocess import os import sys import typing -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional +import abc +from typing import List import darshan import pandas as pd @@ -16,11 +19,15 @@ import darshan.backend.cffi_backend as darshanll from packaging import version from rich import print +from rich.padding import Padding import includes.config as config import includes.module as module -from includes.module import * +from includes.module import Panel, insights_total +from includes.module import HIGH, WARN, RECOMMENDATIONS +# from includes.module import * from includes.parser import args +import time def is_available(name): @@ -83,56 +90,27 @@ class TimestampPair: start: datetime.date end: datetime.date - @dataclass -class DarshanTrace: +class AbstractDarshanTrace(abc.ABC): # Trace metadata - path: str jobid: str log_ver: str time: TimestampPair exe: str # Report - report: darshan.DarshanReport modules: typing.Iterable[str] + name_records: dict[str, str] = field(default_factory=dict) + max_read_offset: int = float('-inf') + max_write_offset: int = float('-inf') ### - total_write_size_stdio: int - total_write_size_stdio: int - total_size_stdio: int - - total_write_size_posix: int - total_read_size_posix: int - total_size_posix: int - - total_write_size_mpiio: int - total_read_size_mpiio: int - total_size_mpiio: int - - total_size: int - total_files: int - ### - max_read_offset: int - max_write_offset: int - ### - - stdio_df: pd.DataFrame = None - posix_df: pd.DataFrame = None - mpiio_df: pd.DataFrame = None - lustre_df: pd.DataFrame = None - - dxt_posix: pd.DataFrame = None - dxt_mpiio: pd.DataFrame = None - - dxt_posix_read_data: pd.DataFrame = None - dxt_posix_write_data: pd.DataFrame = None total_files_stdio: int = 0 total_files_posix: int = 0 total_files_mpiio: int = 0 - files: dict[str, dict[str, int]] = dataclasses.field(default_factory=dict) + files: dict[str, dict[str, int]] = None total_reads: int = 0 total_writes: int = 0 @@ -143,6 +121,23 @@ class DarshanTrace: total_reads_small: int = 0 total_writes_small: int = 0 + ### + total_write_size_stdio: int = 0 + total_write_size_stdio: int = 0 + total_size_stdio: int = 0 + + total_write_size_posix: int = 0 + total_read_size_posix: int = 0 + total_size_posix: int = 0 + + total_write_size_mpiio: int = 0 + total_read_size_mpiio: int = 0 + total_size_mpiio: int = 0 + + total_size: int = 0 + total_files: int = 0 + ### + total_mem_not_aligned: int = 0 total_file_not_aligned: int = 0 @@ -153,7 +148,6 @@ class DarshanTrace: write_sequential: int = 0 write_random: int = 0 - shared_files: pd.DataFrame = None total_shared_reads: int = 0 total_shared_reads_small: int = 0 total_shared_writes: int = 0 @@ -165,11 +159,12 @@ class DarshanTrace: # 2 functions (unsure ones) - has_hdf5_extension: bool = False + has_hdf5_extension: bool = False # TODO: OR this mpiio_nb_reads: int = 0 mpiio_nb_writes: int = 0 + # TODO: Should be a list of CB nodes for agg cb_nodes: Optional[int] = None number_of_compute_nodes: int = 0 hints: list[str] = dataclasses.field(default_factory=list) @@ -179,34 +174,22 @@ class DarshanTrace: aggregated: pd.DataFrame = None - def __init__(self, trace_path: str, job_information, report: darshan.DarshanReport): - self.path = trace_path - - self.jobid = job_information['jobid'] - self.log_ver = job_information['log_ver'] if 'log_ver' in job_information else job_information['metadata'][ - 'lib_ver'] - self.exe = report.metadata['exe'] - - _start_time = datetime.datetime.fromtimestamp(job_information['start_time_sec'], tz=datetime.timezone.utc) - _end_time = datetime.datetime.fromtimestamp(job_information['end_time_sec'], tz=datetime.timezone.utc) - self.time = TimestampPair(_start_time, _end_time) - - self.modules = report.modules.keys() - - # TODO: Should I search in self.modules or in report.records? - # ! All dfs are being materialised - self.report = report - self.posix_df = report.records['POSIX'].to_df() if 'POSIX' in self.modules else None - self.stdio_df = report.records['STDIO'].to_df() if 'STDIO' in self.modules else None - self.mpiio_df = report.records['MPI-IO'].to_df() if 'MPI-IO' in self.modules else None - - self.lustre_df = report.records['LUSTRE'].to_df() if 'LUSTRE' in self.modules else None - - self.dxt_posix = report.records['DXT_POSIX'].to_df() if 'DXT_POSIX' in self.modules else None - self.dxt_mpiio = report.records['DXT_MPIIO'].to_df() if 'DXT_MPIIO' in self.modules else None - - self.hints = [] - self.files = {} + ## EXTRA from module being split + mpiio_coll_reads: int = 0 + mpiio_indep_reads: int = 0 + total_mpiio_read_operations: int = 0 + detected_files_mpi_coll_reads: pd.DataFrame = None + mpiio_coll_writes: int = 0 + mpiio_indep_writes: int = 0 + total_mpiio_write_operations: int = 0 + detected_files_mpiio_coll_writes: pd.DataFrame = None + imbalance_count_posix_shared_time: int = 0 + posix_shared_time_imbalance_detected_files1: pd.DataFrame = None + posix_shared_time_imbalance_detected_files2: pd.DataFrame = None + posix_shared_time_imbalance_detected_files3: pd.DataFrame = None + posix_total_size: int = 0 + posix_total_read_size: int = 0 + posix_total_written_size: int = 0 def generate_dxt_posix_rw_df(self) -> None: if not args.backtrace: @@ -342,12 +325,87 @@ def files_stuff(self) -> None: 'mpiio': uses_mpiio } - def check_stdio(self) -> None: + + def generate_insights(self): + # TODO: Check if module exists. Replicate from each function which calculates insights. + self._check_stdio() + self._check_mpiio() + self._do_something() + self._small_operation_insight() + + + + def _check_stdio(self) -> None: module.check_stdio(self.total_posix_size, self.total_size_stdio) - def check_mpiio(self) -> None: + def _check_mpiio(self) -> None: module.check_mpiio(self.modules) + def _do_something(self): + module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes) + module.check_size_intensive(self.posix_total_size, self.posix_total_read_size, self.posix_total_written_size) + + # TODO: for trace in traces + for trace in self.traces: + pass + module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned, + self.modules, self.name_records, self.lustre_df, self.dxt_posix, + self.dxt_posix_read_data) # posix alignment + + module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size, + self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data) # redundant reads + + module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads, + self.write_consecutive, self.write_sequential, self.write_random, + self.total_writes, self.dxt_posix, + self.dxt_posix_read_data, self.dxt_posix_write_data) # random check + + module.check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small, + self.total_shared_writes, + self.total_shared_writes_small, self.shared_files, self.report.name_records) + + module.check_long_metadata(self.count_long_metadata, self.modules) + + module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count, + self.posix_data_straggler_files, + self.report.name_records, self.dxt_posix, + self.dxt_posix_read_data, + self.dxt_posix_write_data) + + module.check_shared_time_imbalance(self.posix_stragglers_shared_file_time_imbalance_count, + self.posix_shared_time_imbalance_detected_files1, self.report.name_records) + + module.check_individual_write_imbalance(self.posix_data_imbalance_count, + self.posix_shared_time_imbalance_detected_files2, + self.report.name_records, self.dxt_posix, self.dxt_posix_write_data) + + module.check_mpi_collective_read_operation(self.mpiio_coll_reads, self.mpiio_indep_reads, + self.total_mpiio_read_operations, + self.detected_files_mpi_coll_reads, self.report.name_records, + self.dxt_mpiio) + + module.check_mpi_collective_write_operation(self.mpiio_coll_writes, self.mpiio_indep_writes, + self.total_mpiio_write_operations, + self.detected_files_mpiio_coll_writes, self.report.name_records, self.dxt_mpiio) + + module.check_individual_read_imbalance(self.imbalance_count_posix_shared_time, + self.posix_shared_time_imbalance_detected_files3, + self.report.name_records, self.dxt_posix, self.dxt_posix_read_data) + + module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension, + self.modules) + + + + def _small_operation_insight(self): + module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes, + self.total_writes_small, + self.small_operation_detected_files, + self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data, + self.dxt_posix_write_data) + + + def something(self) -> None: if not self.posix_df: return @@ -356,13 +414,13 @@ def something(self) -> None: self.total_writes = self.posix_df['counters']['POSIX_WRITES'].sum() self.total_operations = self.total_writes + self.total_reads # ---------------------------------------------------------------------------------------------------------------------- - module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes) + # module.check_operation_intensive(self.total_operations, self.total_reads, self.total_writes) - total_read_size = self.posix_df['counters']['POSIX_BYTES_READ'].sum() - total_written_size = self.posix_df['counters']['POSIX_BYTES_WRITTEN'].sum() - total_size = total_written_size + total_read_size + self.posix_total_read_size = self.posix_df['counters']['POSIX_BYTES_READ'].sum() + self.posix_total_written_size = self.posix_df['counters']['POSIX_BYTES_WRITTEN'].sum() + self.posix_total_size = self.posix_total_written_size + self.posix_total_read_size - module.check_size_intensive(total_size, total_read_size, total_written_size) + # module.check_size_intensive(self.posix_total_size, self.posix_total_read_size, self.posix_total_written_size) # ----- self.total_reads_small = ( self.posix_df['counters']['POSIX_SIZE_READ_0_100'].sum() + @@ -401,18 +459,18 @@ def small_operation_calculation(self): self.posix_df['counters']['POSIX_SIZE_WRITE_100K_1M'] ) - detected_files = pd.DataFrame(self.posix_df['counters'].groupby('id')[['INSIGHTS_POSIX_SMALL_READ', + self.small_operation_detected_files = pd.DataFrame(self.posix_df['counters'].groupby('id')[['INSIGHTS_POSIX_SMALL_READ', 'INSIGHTS_POSIX_SMALL_WRITE']].sum()).reset_index() - detected_files.columns = ['id', 'total_reads', + self.small_operation_detected_files.columns = ['id', 'total_reads', 'total_writes'] # !: Rename later. total_small_reads, total_small_writes - detected_files.loc[:, 'id'] = detected_files.loc[:, 'id'].astype(str) + self.small_operation_detected_files.loc[:, 'id'] = self.small_operation_detected_files.loc[:, 'id'].astype(str) self.report.name_records = self.report.name_records - module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes, - self.total_writes_small, - detected_files, - self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data, - self.dxt_posix_write_data) + # module.check_small_operation(self.total_reads, self.total_reads_small, self.total_writes, + # self.total_writes_small, + # self.small_operation_detected_files, + # self.modules, self.report.name_records, self.dxt_posix, self.dxt_posix_read_data, + # self.dxt_posix_write_data) def posix_alignment(self): if not self.posix_df: @@ -422,9 +480,9 @@ def posix_alignment(self): self.total_file_not_aligned = self.posix_df['counters']['POSIX_FILE_NOT_ALIGNED'].sum() self.report.name_records = self.report.name_records - module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned, - self.modules, self.report.name_records, self.lustre_df, self.dxt_posix, - self.dxt_posix_read_data) + # module.check_misaligned(self.total_operations, self.total_mem_not_aligned, self.total_file_not_aligned, + # self.modules, self.report.name_records, self.lustre_df, self.dxt_posix, + # self.dxt_posix_read_data) def posix_redundant_reads(self): if not self.posix_df: @@ -433,8 +491,8 @@ def posix_redundant_reads(self): self.max_read_offset = self.posix_df['counters']['POSIX_MAX_BYTE_READ'].max() self.max_write_offset = self.posix_df['counters']['POSIX_MAX_BYTE_WRITTEN'].max() - module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size, - self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data) + # module.check_traffic(self.max_read_offset, self.total_read_size, self.max_write_offset, self.total_written_size, + # self.dxt_posix, self.dxt_posix_read_data, self.dxt_posix_write_data) def posix_random_check(self): if not self.posix_df: @@ -454,10 +512,10 @@ def posix_random_check(self): self.write_random = self.total_writes - self.write_consecutive - self.write_sequential - module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads, - self.write_consecutive, self.write_sequential, self.write_random, - self.total_writes, self.dxt_posix, - self.dxt_posix_read_data, self.dxt_posix_write_data) + # module.check_random_operation(self.read_consecutive, self.read_sequential, self.read_random, self.total_reads, + # self.write_consecutive, self.write_sequential, self.write_random, + # self.total_writes, self.dxt_posix, + # self.dxt_posix_read_data, self.dxt_posix_write_data) def posix_shared_file(self): if not self.posix_df: @@ -503,9 +561,9 @@ def posix_shared_file(self): ) self.report.name_records = self.report.name_records - check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small, - self.total_shared_writes, - self.total_shared_writes_small, self.shared_files, self.report.name_records) + # module.check_shared_small_operation(self.total_shared_reads, self.total_shared_reads_small, + # self.total_shared_writes, + # self.total_shared_writes_small, self.shared_files, self.report.name_records) def posix_long_metadata(self): if not self.posix_df: @@ -515,7 +573,7 @@ def posix_long_metadata(self): self.posix_df['fcounters'][ (self.posix_df['fcounters']['POSIX_F_META_TIME'] > config.thresholds['metadata_time_rank'][0])]) - module.check_long_metadata(self.count_long_metadata, self.modules) + # module.check_long_metadata(self.count_long_metadata, self.modules) def posix_stragglers(self): if not self.posix_df: @@ -529,29 +587,29 @@ def posix_stragglers(self): self.shared_files = self.shared_files.assign(id=lambda d: d['id'].astype(str)) - posix_straggler_files = [] + self.posix_data_straggler_files = [] for index, row in self.shared_files.iterrows(): total_transfer_size = row['POSIX_BYTES_WRITTEN'] + row['POSIX_BYTES_READ'] if total_transfer_size and abs( row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size > \ - thresholds['imbalance_stragglers'][0]: + module.thresholds['imbalance_stragglers'][0]: self.posix_shared_data_imbalance_stragglers_count += 1 - posix_straggler_files.append([ + self.posix_data_straggler_files.append([ row['id'], abs(row['POSIX_SLOWEST_RANK_BYTES'] - row['POSIX_FASTEST_RANK_BYTES']) / total_transfer_size * 100 ]) column_names = ['id', 'data_imbalance'] - posix_straggler_files = pd.DataFrame(posix_straggler_files, columns=column_names) + self.posix_data_straggler_files = pd.DataFrame(self.posix_data_straggler_files, columns=column_names) self.report.name_records = self.report.name_records - module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count, posix_straggler_files, - self.report.name_records, self.dxt_posix, - self.dxt_posix_read_data, - self.dxt_posix_write_data) + # module.check_shared_data_imblance(self.posix_shared_data_imbalance_stragglers_count, self.posix_data_straggler_files, + # self.report.name_records, self.dxt_posix, + # self.dxt_posix_read_data, + # self.dxt_posix_write_data) # POSIX_F_FASTEST_RANK_TIME # POSIX_F_SLOWEST_RANK_TIME @@ -560,14 +618,16 @@ def posix_stragglers(self): ################################################################################################################# def posix_stragglers2(self): - # Get the files responsible + if not self.posix_df: + return + # Get the files responsible shared_files_times = self.posix_df['fcounters'].loc[(self.posix_df['fcounters']['rank'] == -1)] - posix_shared_time_imbalance_detected_files = [] + self.posix_shared_time_imbalance_detected_files1 = [] - posix_stragglers_shared_file_time_imbalance_count = 0 - posix_stragglers_shared_file_time_imbalance = {} + self.posix_stragglers_shared_file_time_imbalance_count = 0 + # posix_stragglers_shared_file_time_imbalance = {} # UNUSED? shared_files_times = shared_files_times.assign(id=lambda d: d['id'].astype(str)) @@ -577,20 +637,23 @@ def posix_stragglers2(self): if total_transfer_time and abs( row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time > \ config.thresholds['imbalance_stragglers'][0]: - posix_stragglers_shared_file_time_imbalance_count += 1 + self.posix_stragglers_shared_file_time_imbalance_count += 1 - posix_shared_time_imbalance_detected_files.append([ + self.posix_shared_time_imbalance_detected_files1.append([ row['id'], abs(row['POSIX_F_SLOWEST_RANK_TIME'] - row['POSIX_F_FASTEST_RANK_TIME']) / total_transfer_time * 100 ]) column_names = ['id', 'time_imbalance'] - posix_shared_time_imbalance_detected_files = pd.DataFrame(posix_shared_time_imbalance_detected_files, + self.posix_shared_time_imbalance_detected_files1 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files1, columns=column_names) - module.check_shared_time_imbalance(posix_stragglers_shared_file_time_imbalance_count, - posix_shared_time_imbalance_detected_files, self.report.name_records) + # module.check_shared_time_imbalance(self.posix_stragglers_shared_file_time_imbalance_count, + # self.posix_shared_time_imbalance_detected_files1, self.report.name_records) def posix_imbalance(self): + if not self.posix_df: + return + aggregated = self.posix_df['counters'].loc[(self.posix_df['counters']['rank'] != -1)][ ['rank', 'id', 'POSIX_BYTES_WRITTEN', 'POSIX_BYTES_READ'] ].groupby('id', as_index=False).agg({ @@ -603,25 +666,25 @@ def posix_imbalance(self): self.aggregated = aggregated # Get the files responsible - imbalance_count = 0 + self.posix_data_imbalance_count = 0 - posix_shared_time_imbalance_detected_files = [] + self.posix_shared_time_imbalance_detected_files2 = [] for index, row in self.aggregated.iterrows(): if row['POSIX_BYTES_WRITTEN_max'] and abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / \ row['POSIX_BYTES_WRITTEN_max'] > config.thresholds['imbalance_size'][0]: - imbalance_count += 1 + self.posix_data_imbalance_count += 1 - posix_shared_time_imbalance_detected_files.append([ + self.posix_shared_time_imbalance_detected_files2.append([ row['id'], abs(row['POSIX_BYTES_WRITTEN_max'] - row['POSIX_BYTES_WRITTEN_min']) / row[ 'POSIX_BYTES_WRITTEN_max'] * 100 ]) column_names = ['id', 'write_imbalance'] - posix_shared_time_imbalance_detected_files = pd.DataFrame(posix_shared_time_imbalance_detected_files, + self.posix_shared_time_imbalance_detected_files2 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files2, columns=column_names) - module.check_individual_write_imbalance(imbalance_count, posix_shared_time_imbalance_detected_files, - self.report.name_records, self.dxt_posix, self.dxt_posix_write_data) + # module.check_individual_write_imbalance(self.posix_data_imbalance_count, self.posix_shared_time_imbalance_detected_files2, + # self.report.name_records, self.dxt_posix, self.dxt_posix_write_data) def mpiio_processing(self): if not self.mpiio_df: @@ -630,94 +693,99 @@ def mpiio_processing(self): self.mpiio_df['counters'] = self.mpiio_df['counters'].assign( id=lambda d: d['id'].astype(str)) # What does this do? - # Get the files responsible - detected_files = [] df_mpiio_collective_reads = self.mpiio_df['counters'] # .loc[(df_mpiio['counters']['MPIIO_COLL_READS'] > 0)] - total_mpiio_read_operations = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() + self.mpiio_df['counters'][ + self.total_mpiio_read_operations = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() + self.mpiio_df['counters'][ 'MPIIO_COLL_READS'].sum() - mpiio_coll_reads = self.mpiio_df['counters']['MPIIO_COLL_READS'].sum() - mpiio_indep_reads = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() + self.mpiio_coll_reads = self.mpiio_df['counters']['MPIIO_COLL_READS'].sum() + self.mpiio_indep_reads = self.mpiio_df['counters']['MPIIO_INDEP_READS'].sum() - detected_files = [] - if mpiio_coll_reads == 0 and total_mpiio_read_operations and total_mpiio_read_operations > \ - thresholds['collective_operations_absolute'][0]: + self.detected_files_mpi_coll_reads = [] + if self.mpiio_coll_reads == 0 and self.total_mpiio_read_operations and self.total_mpiio_read_operations > \ + module.thresholds['collective_operations_absolute'][0]: files = pd.DataFrame(df_mpiio_collective_reads.groupby('id').sum()).reset_index() for index, row in df_mpiio_collective_reads.iterrows(): if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > - thresholds['collective_operations'][0] and + module.thresholds['collective_operations'][0] and (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > - thresholds['collective_operations_absolute'][0]): - detected_files.append([ + module.thresholds['collective_operations_absolute'][0]): + self.detected_files_mpi_coll_reads.append([ row['id'], row['MPIIO_INDEP_READS'], row['MPIIO_INDEP_READS'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100 ]) column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads'] - detected_files = pd.DataFrame(detected_files, columns=column_names) + self.detected_files_mpi_coll_reads = pd.DataFrame(self.detected_files_mpi_coll_reads, columns=column_names) + + # module.check_mpi_collective_read_operation(self.mpiio_coll_reads, self.mpiio_indep_reads, self.total_mpiio_read_operations, + # self.detected_files_mpi_coll_reads, self.report.name_records, self.dxt_mpiio) + + # TODO: Split this into 2 functions for each module insight - check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, - detected_files, self.report.name_records, self.dxt_mpiio) df_mpiio_collective_writes = self.mpiio_df['counters'] # .loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)] - total_mpiio_write_operations = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() + \ + self.total_mpiio_write_operations = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() + \ self.mpiio_df['counters'][ 'MPIIO_COLL_WRITES'].sum() - mpiio_coll_writes = self.mpiio_df['counters']['MPIIO_COLL_WRITES'].sum() - mpiio_indep_writes = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() + self.mpiio_coll_writes = self.mpiio_df['counters']['MPIIO_COLL_WRITES'].sum() + self.mpiio_indep_writes = self.mpiio_df['counters']['MPIIO_INDEP_WRITES'].sum() - detected_files = [] - if mpiio_coll_writes == 0 and total_mpiio_write_operations and total_mpiio_write_operations > \ - thresholds['collective_operations_absolute'][0]: + self.detected_files_mpiio_coll_writes = [] + if self.mpiio_coll_writes == 0 and self.total_mpiio_write_operations and self.total_mpiio_write_operations > \ + module.thresholds['collective_operations_absolute'][0]: files = pd.DataFrame(df_mpiio_collective_writes.groupby('id').sum()).reset_index() for index, row in df_mpiio_collective_writes.iterrows(): if ((row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) and row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > - thresholds['collective_operations'][0] and + module.thresholds['collective_operations'][0] and (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) > - thresholds['collective_operations_absolute'][0]): - detected_files.append([ + module.thresholds['collective_operations_absolute'][0]): + self.detected_files_mpiio_coll_writes.append([ row['id'], row['MPIIO_INDEP_WRITES'], row['MPIIO_INDEP_WRITES'] / (row['MPIIO_INDEP_READS'] + row['MPIIO_INDEP_WRITES']) * 100 ]) column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes'] - detected_files = pd.DataFrame(detected_files, columns=column_names) + self.detected_files_mpiio_coll_writes = pd.DataFrame(self.detected_files_mpiio_coll_writes, columns=column_names) - check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, - detected_files, self.report.name_records, self.dxt_mpiio) + # module.check_mpi_collective_write_operation(self.mpiio_coll_writes, self.mpiio_indep_writes, self.total_mpiio_write_operations, + # detected_files_mpiio_coll_writes, self.report.name_records, self.dxt_mpiio) def posix_imbalance2(self): - imbalance_count = 0 + if not self.posix_df: + return - posix_shared_time_imbalance_detected_files = [] + self.imbalance_count_posix_shared_time = 0 + + self.posix_shared_time_imbalance_detected_files3 = [] for index, row in self.aggregated.iterrows(): if row['POSIX_BYTES_READ_max'] and abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row[ - 'POSIX_BYTES_READ_max'] > thresholds['imbalance_size'][0]: - imbalance_count += 1 + 'POSIX_BYTES_READ_max'] > module.thresholds['imbalance_size'][0]: + self.imbalance_count_posix_shared_time += 1 - posix_shared_time_imbalance_detected_files.append([ + self.posix_shared_time_imbalance_detected_files3.append([ row['id'], abs(row['POSIX_BYTES_READ_max'] - row['POSIX_BYTES_READ_min']) / row['POSIX_BYTES_READ_max'] * 100 ]) column_names = ['id', 'read_imbalance'] - posix_shared_time_imbalance_detected_files = pd.DataFrame(posix_shared_time_imbalance_detected_files, + self.posix_shared_time_imbalance_detected_files3 = pd.DataFrame(self.posix_shared_time_imbalance_detected_files3, columns=column_names) - module.check_individual_read_imbalance(imbalance_count, posix_shared_time_imbalance_detected_files, - self.report.name_records, self.dxt_posix, self.dxt_posix_read_data) + # module.check_individual_read_imbalance(self.imbalance_count_posix_shared_time, self.posix_shared_time_imbalance_detected_files3, + # self.report.name_records, self.dxt_posix, self.dxt_posix_read_data) def hdf5_check(self): if not self.mpiio_df: return + self.report.name_records = self.report.name_records # Will this be optimised via JIT? Nvm CPython doesn't have JIT lol for index, row in self.mpiio_df['counters'].iterrows(): if self.report.name_records[int(row['id'])].endswith('.h5') or self.report.name_records[ @@ -732,8 +800,8 @@ def mpiio_non_blocking(self): self.mpiio_nb_reads = self.mpiio_df['counters']['MPIIO_NB_READS'].sum() self.mpiio_nb_writes = self.mpiio_df['counters']['MPIIO_NB_WRITES'].sum() - module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension, - self.modules) + # module.check_mpi_none_block_operation(self.mpiio_nb_reads, self.mpiio_nb_writes, self.has_hdf5_extension, + # self.modules) def CHECKnumber_of_aggregators(self): hints = '' @@ -793,76 +861,231 @@ def something_else(self): datetime.timezone.utc) + +@dataclass +class DarshanTrace(AbstractDarshanTrace): + path: Optional[str] = None + report: Optional[darshan.DarshanReport] = None + + stdio_df: pd.DataFrame = None + posix_df: pd.DataFrame = None + mpiio_df: pd.DataFrame = None + lustre_df: pd.DataFrame = None + + dxt_posix: pd.DataFrame = None + dxt_mpiio: pd.DataFrame = None + + dxt_posix_read_data: pd.DataFrame = None + dxt_posix_write_data: pd.DataFrame = None + + shared_files: pd.DataFrame = None + + def __init__(self, trace_path: str, job_information, report: darshan.DarshanReport): + self.path = trace_path + + self.jobid = job_information['jobid'] + self.log_ver = job_information['log_ver'] if 'log_ver' in job_information else job_information['metadata'][ + 'lib_ver'] + self.exe = report.metadata['exe'] + + _start_time = datetime.datetime.fromtimestamp(job_information['start_time_sec'], tz=datetime.timezone.utc) + _end_time = datetime.datetime.fromtimestamp(job_information['end_time_sec'], tz=datetime.timezone.utc) + self.time = TimestampPair(_start_time, _end_time) + + self.modules = report.modules.keys() + + # TODO: Should I search in self.modules or in report.records? + # ! All dfs are being materialised + self.report = report + self.posix_df = report.records['POSIX'].to_df() if 'POSIX' in self.modules else None + self.stdio_df = report.records['STDIO'].to_df() if 'STDIO' in self.modules else None + self.mpiio_df = report.records['MPI-IO'].to_df() if 'MPI-IO' in self.modules else None + + self.lustre_df = report.records['LUSTRE'].to_df() if 'LUSTRE' in self.modules else None + + self.dxt_posix = report.records['DXT_POSIX'].to_df() if 'DXT_POSIX' in self.modules else None + self.dxt_mpiio = report.records['DXT_MPIIO'].to_df() if 'DXT_MPIIO' in self.modules else None + + self.hints = [] + self.files = {} + + +@dataclass +class AggregatedDarshanTraces(AbstractDarshanTrace): + + traces: List[DarshanTrace] = field(default_factory=list) + # reports: List[darshan.DarshanReport] = field(default_factory=list) + + def __init__(self, traces: List[DarshanTrace]): + assert len(traces) > 0 + self.traces = traces + + reports = [current_trace.report for current_trace in traces] + self.name_records = dict() + for report in reports: + self.name_records |= report.name_records + + def aggregate_traces(self): + self.modules = set() + self.files = dict() + for current_trace in self.traces: + self.modules.union(current_trace.modules) + + + self.total_write_size_stdio += current_trace.total_write_size_stdio + self.total_write_size_stdio += current_trace.total_write_size_stdio + self.total_size_stdio += current_trace.total_size_stdio + + self.total_write_size_posix += current_trace.total_write_size_posix + self.total_read_size_posix += current_trace.total_read_size_posix + self.total_size_posix += current_trace.total_size_posix + + self.total_write_size_mpiio += current_trace.total_write_size_mpiio + self.total_read_size_mpiio += current_trace.total_read_size_mpiio + self.total_size_mpiio += current_trace.total_size_mpiio + + self.total_size += current_trace.total_size + self.total_files += current_trace.total_files + ### + self.max_read_offset = max(self.max_read_offset, current_trace.max_read_offset) + self.max_write_offset = max(self.max_read_offset, current_trace.max_write_offset) + ### + + self.total_files_stdio += current_trace.total_files_stdio + self.total_files_posix += current_trace.total_size_posix + self.total_files_mpiio += current_trace.total_files_mpiio + + self.files |= current_trace.files + + self.total_reads += current_trace.total_reads + self.total_writes += current_trace.total_writes + self.total_operations += current_trace.total_operations + self.total_read_size += current_trace.total_read_size + self.total_written_size += current_trace.total_written_size + self.total_posix_size += current_trace.total_posix_size + self.total_reads_small += current_trace.total_reads_small + self.total_writes_small += current_trace.total_writes_small + + self.total_mem_not_aligned += current_trace.total_mem_not_aligned + self.total_file_not_aligned += current_trace.total_file_not_aligned + + self.read_consecutive += current_trace.read_consecutive + self.read_sequential += current_trace.read_sequential + self.read_random += current_trace.read_random + self.write_consecutive += current_trace.write_consecutive + self.write_sequential += current_trace.write_sequential + self.write_random += current_trace.write_random + + self.total_shared_reads += current_trace.total_shared_reads + self.total_shared_reads_small += current_trace.total_shared_reads_small + self.total_shared_writes += current_trace.total_shared_writes + self.total_shared_writes_small += current_trace.total_shared_writes_small + + self.count_long_metadata += current_trace.count_long_metadata + + self.posix_shared_data_imbalance_stragglers_count += current_trace.posix_shared_data_imbalance_stragglers_count + + self.has_hdf5_extension = self.has_hdf5_extension or current_trace.has_hdf5_extension + + self.mpiio_nb_reads += current_trace.mpiio_nb_reads + self.mpiio_nb_writes += current_trace.mpiio_nb_writes + def log_relation_check(): # TODO: Ensure that all logs are from a single job, generated at the same time, from the same executable and using the same library version pass def handler(): - console = init_console() + console = config.init_console() insights_start_time = time.time() - log_path = args.log_paths[0] # TODO: A single file rn - log = darshanll.log_open(log_path) - - modules = darshanll.log_get_modules(log) - - information = darshanll.log_get_job(log) - - trace_path = args.log_paths[0] # TODO: A single file rn - darshan.enable_experimental() library_version = darshanll.get_lib_version() - # TODO: Can this be put in a with block? - log = darshanll.log_open(trace_path) - information = darshanll.log_get_job(log) - darshanll.log_close(log) + # trace_path = args.log_paths[0] # TODO: A single file rn + darshan_traces = [] + - report = darshan.DarshanReport(trace_path) - current_trace = DarshanTrace(trace_path, information, report) + for trace_path in args.log_paths: + log = darshanll.log_open(trace_path) + information = darshanll.log_get_job(log) + darshanll.log_close(log) + + report = darshan.DarshanReport(trace_path) + current_trace = DarshanTrace(trace_path, information, report) + darshan_traces.append(current_trace) # # Leave this as is for now # # Make sure log format is of the same version - # filename = args.log_path - # # check_log_version(console, args.log_path, log_version, library_version) + # filename = args.trace_path + # # check_log_version(console, args.trace_path, log_version, library_version) # + # Compute values for each trace + for current_trace in darshan_traces: + current_trace.generate_dxt_posix_rw_df() + current_trace.calculate_insights() + current_trace.files_stuff() + # current_trace.check_stdio() + # current_trace.check_mpiio() + current_trace.something() + current_trace.small_operation_calculation() + current_trace.posix_alignment() + current_trace.posix_redundant_reads() + current_trace.posix_random_check() + current_trace.posix_shared_file() + current_trace.posix_long_metadata() + current_trace.posix_stragglers() + current_trace.posix_stragglers2() + current_trace.posix_imbalance() + current_trace.hdf5_check() + current_trace.mpiio_non_blocking() + current_trace.CHECKnumber_of_aggregators() + current_trace.something_else() + # current_trace.generate_insights() + + # Create aggregated trace + aggregated_trace = AggregatedDarshanTraces(traces=darshan_traces) + aggregated_trace.aggregate_traces() + aggregated_trace.generate_insights() + - current_trace.generate_dxt_posix_rw_df() - current_trace.calculate_insights() - current_trace.files_stuff() - current_trace.check_stdio() - current_trace.check_mpiio() - current_trace.something() - current_trace.small_operation_calculation() - current_trace.posix_alignment() - current_trace.posix_redundant_reads() - current_trace.posix_random_check() - current_trace.posix_shared_file() - current_trace.posix_long_metadata() - current_trace.posix_stragglers() - current_trace.posix_stragglers2() - current_trace.posix_imbalance() - current_trace.hdf5_check() - current_trace.mpiio_non_blocking() - current_trace.CHECKnumber_of_aggregators() - current_trace.something_else() insights_end_time = time.time() # Version 3.4.1 of py-darshan changed the contents on what is reported in 'job' + job_end, job_start = set_job_time(report) + + print_insights(console, current_trace, insights_end_time, insights_start_time, job_end, job_start, trace_path, + report) + + export_results(console, trace_path, report) + + +def set_job_time(report): if 'start_time' in report.metadata['job']: job_start = datetime.datetime.fromtimestamp(report.metadata['job']['start_time'], datetime.timezone.utc) job_end = datetime.datetime.fromtimestamp(report.metadata['job']['end_time'], datetime.timezone.utc) else: job_start = datetime.datetime.fromtimestamp(report.metadata['job']['start_time_sec'], datetime.timezone.utc) job_end = datetime.datetime.fromtimestamp(report.metadata['job']['end_time_sec'], datetime.timezone.utc) + return job_end, job_start - console.print() +def export_results(console, log_path, report): + # Export to HTML, SVG, and CSV + trace_name = os.path.basename(log_path).replace('.darshan', '') + out_dir = args.export_dir if args.export_dir != "" else os.getcwd() + module.export_html(console, out_dir, trace_name) + module.export_svg(console, out_dir, trace_name) + module.export_csv(out_dir, trace_name, report.metadata['job']['jobid']) + + +def print_insights(console, current_trace, insights_end_time, insights_start_time, job_end, job_start, log_path, + report): + console.print() console.print( Panel( '\n'.join([ @@ -908,17 +1131,7 @@ def handler(): padding=1 ) ) - console.print() - - display_content(console) - display_thresholds(console) - display_footer(console, insights_start_time, insights_end_time) - - # Export to HTML, SVG, and CSV - trace_name = os.path.basename(log_path).replace('.darshan', '') - out_dir = args.export_dir if args.export_dir != "" else os.getcwd() - - export_html(console, out_dir, trace_name) - export_svg(console, out_dir, trace_name) - export_csv(out_dir, trace_name, report.metadata['job']['jobid']) + module.display_content(console) + module.display_thresholds(console) + module.display_footer(console, insights_start_time, insights_end_time)