From 023adddeb85ea78be617bb9c9c6a246fe4f79711 Mon Sep 17 00:00:00 2001 From: iguinn Date: Sun, 25 Aug 2024 10:02:27 -0700 Subject: [PATCH 1/5] Updated pargen.utils.load_data to use LH5Iterator and field_mask to be more memory efficient --- src/pygama/pargen/utils.py | 143 +++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 69 deletions(-) diff --git a/src/pygama/pargen/utils.py b/src/pygama/pargen/utils.py index 91002a007..179c2147e 100644 --- a/src/pygama/pargen/utils.py +++ b/src/pygama/pargen/utils.py @@ -9,7 +9,6 @@ from lgdo import lh5 log = logging.getLogger(__name__) -sto = lh5.LH5Store() def convert_to_minuit(pars, func): @@ -35,101 +34,107 @@ def return_nans(input): return m.values, m.errors, np.full((len(m.values), len(m.values)), np.nan) -def get_params(file_params, param_list): - out_params = [] - if isinstance(file_params, dict): - possible_keys = file_params.keys() - elif isinstance(file_params, list): - possible_keys = file_params - for param in param_list: - for key in possible_keys: - if key in param: - out_params.append(key) - return np.unique(out_params).tolist() - - def load_data( - files: list, + files: str | list | dict, lh5_path: str, cal_dict: dict, - params: list, + params: set, cal_energy_param: str = "cuspEmax_ctc_cal", threshold=None, return_selection_mask=False, -) -> tuple(np.array, np.array, np.array, np.array): +) -> pd.DataFrame | tuple(pd.DataFrame, np.array): """ - Loads in the A/E parameters needed and applies calibration constants to energy + Loads parameters from data files. Applies calibration to cal_energy_param + and uses this to apply a lower energy threshold. + + files + file or list of files or dict pointing from timestamps to lists of files + lh5_path + path to table in files + cal_dict + dictionary with operations used to apply calibration constants + params + list of parameters to load from file + cal_energy_param + name of uncalibrated energy parameter + threshold + lower energy threshold for events to load + return_selection_map + if True, return selection mask for threshold along with data """ + params = set(params) if isinstance(files, str): files = [files] if isinstance(files, dict): - keys = lh5.ls( - files[list(files)[0]][0], - lh5_path if lh5_path[-1] == "/" else lh5_path + "/", - ) - keys = [key.split("/")[-1] for key in keys] - if list(files)[0] in cal_dict: - params = get_params(keys + list(cal_dict[list(files)[0]].keys()), params) - else: - params = get_params(keys + list(cal_dict.keys()), params) - + # Go through each tstamp and recursively load_data on file lists df = [] - all_files = [] - masks = np.array([], dtype=bool) + masks = [] for tstamp, tfiles in files.items(): - table = sto.read(lh5_path, tfiles)[0] - - file_df = pd.DataFrame(columns=params) - if tstamp in cal_dict: - cal_dict_ts = cal_dict[tstamp] - else: - cal_dict_ts = cal_dict - - for outname, info in cal_dict_ts.items(): - outcol = table.eval(info["expression"], info.get("parameters", None)) - table.add_column(outname, outcol) - - for param in params: - file_df[param] = table[param] - + file_df = load_data( + tfiles, + lh5_path, + cal_dict.get(tstamp, cal_dict), + params, + cal_energy_param, + threshold, + return_selection_mask, + ) file_df["run_timestamp"] = np.full(len(file_df), tstamp, dtype=object) - if threshold is not None: - mask = file_df[cal_energy_param] > threshold - file_df.drop(np.where(~mask)[0], inplace=True) + if return_selection_mask: + df.append(file_df[0]) + masks.append(file_df[1]) else: - mask = np.ones(len(file_df), dtype=bool) - masks = np.append(masks, mask) - df.append(file_df) - all_files += tfiles + df.append(file_df) - params.append("run_timestamp") df = pd.concat(df) + if return_selection_mask: + masks = np.concat(masks) elif isinstance(files, list): - keys = lh5.ls(files[0], lh5_path if lh5_path[-1] == "/" else lh5_path + "/") - keys = [key.split("/")[-1] for key in keys] - params = get_params(keys + list(cal_dict.keys()), params) - - table = sto.read(lh5_path, files)[0] - df = pd.DataFrame(columns=params) - for outname, info in cal_dict.items(): - outcol = table.eval(info["expression"], info.get("parameters", None)) - table.add_column(outname, outcol) - for param in params: - df[param] = table[param] + # Get set of available fields between input table and cal_dict + file_keys = lh5.ls( + files[0], lh5_path if lh5_path[-1] == "/" else lh5_path + "/" + ) + file_keys = {key.split("/")[-1] for key in file_keys} + + # Get set of keys in calibration expressions that show up in file + cal_keys = { + name + for expr in cal_dict.values() + for name in compile(expr, "0vbb is real!", "eval").co_names + } & file_keys + + # Get set of fields to read from files + fields = cal_keys | (file_keys & params) + + lh5_it = lh5.iterator.LH5Iterator( + files, lh5_path, field_mask=fields, buffer_len=100000 + ) + df = pd.DataFrame(columns=list(params)) + for table, entry, n_rows in lh5_it: + # Evaluate all provided expressions and add to table + for outname, info in cal_dict.items(): + table[outname] = table.eval( + info["expression"], local_dict=info.get("parameters", None) + ) + + # Copy params in table into dataframe + for par in params: + # First set of entries: allocate enough memory for all entries + if entry == 0: + df[par] = np.resize(table[par], len(lh5_it)) + else: + df.loc[entry : entry + n_rows - 1, par] = table[par][:n_rows] + + # Evaluate threshold mask and drop events below threshold if threshold is not None: masks = df[cal_energy_param] > threshold df.drop(np.where(~masks)[0], inplace=True) else: masks = np.ones(len(df), dtype=bool) - all_files = files - - for col in list(df.keys()): - if col not in params: - df.drop(col, inplace=True, axis=1) log.debug("data loaded") if return_selection_mask: From c28dfedaf75f6ec1a383efb80414cd12a5404f85 Mon Sep 17 00:00:00 2001 From: iguinn Date: Sun, 25 Aug 2024 14:07:58 -0700 Subject: [PATCH 2/5] Whoops pushed the wrong version of this file --- src/pygama/pargen/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pygama/pargen/utils.py b/src/pygama/pargen/utils.py index 179c2147e..13608a5b2 100644 --- a/src/pygama/pargen/utils.py +++ b/src/pygama/pargen/utils.py @@ -103,8 +103,8 @@ def load_data( # Get set of keys in calibration expressions that show up in file cal_keys = { name - for expr in cal_dict.values() - for name in compile(expr, "0vbb is real!", "eval").co_names + for info in cal_dict.values() + for name in compile(info["expression"], "0vbb is real!", "eval").co_names } & file_keys # Get set of fields to read from files @@ -118,7 +118,7 @@ def load_data( # Evaluate all provided expressions and add to table for outname, info in cal_dict.items(): table[outname] = table.eval( - info["expression"], local_dict=info.get("parameters", None) + info["expression"], info.get("parameters", None) ) # Copy params in table into dataframe From 087b028f6cbd0fcfa5b41cd3db1739b022e0006f Mon Sep 17 00:00:00 2001 From: iguinn Date: Sun, 25 Aug 2024 14:30:19 -0700 Subject: [PATCH 3/5] Bug fixes --- src/pygama/pargen/utils.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/pygama/pargen/utils.py b/src/pygama/pargen/utils.py index 13608a5b2..6020c4c0f 100644 --- a/src/pygama/pargen/utils.py +++ b/src/pygama/pargen/utils.py @@ -81,17 +81,18 @@ def load_data( threshold, return_selection_mask, ) - file_df["run_timestamp"] = np.full(len(file_df), tstamp, dtype=object) if return_selection_mask: + file_df[0]["run_timestamp"] = np.full(len(file_df[0]), tstamp, dtype=object) df.append(file_df[0]) masks.append(file_df[1]) else: + file_df["run_timestamp"] = np.full(len(file_df), tstamp, dtype=object) df.append(file_df) df = pd.concat(df) if return_selection_mask: - masks = np.concat(masks) + masks = np.concatenate(masks) elif isinstance(files, list): # Get set of available fields between input table and cal_dict @@ -113,7 +114,11 @@ def load_data( lh5_it = lh5.iterator.LH5Iterator( files, lh5_path, field_mask=fields, buffer_len=100000 ) - df = pd.DataFrame(columns=list(params)) + df_fields = params & (fields | set(cal_dict)) + if df_fields != params: + log.debug(f"load_data(): params not found in data files or cal_dict: {params-df_fields}") + df = pd.DataFrame(columns=list(df_fields)) + for table, entry, n_rows in lh5_it: # Evaluate all provided expressions and add to table for outname, info in cal_dict.items(): @@ -122,7 +127,7 @@ def load_data( ) # Copy params in table into dataframe - for par in params: + for par in df: # First set of entries: allocate enough memory for all entries if entry == 0: df[par] = np.resize(table[par], len(lh5_it)) From ee73d2aa88d316543cbc152acde8a46e179c25d1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 25 Aug 2024 22:04:40 +0000 Subject: [PATCH 4/5] style: pre-commit fixes --- src/pygama/pargen/utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/pygama/pargen/utils.py b/src/pygama/pargen/utils.py index 6020c4c0f..ec0cab06f 100644 --- a/src/pygama/pargen/utils.py +++ b/src/pygama/pargen/utils.py @@ -83,7 +83,9 @@ def load_data( ) if return_selection_mask: - file_df[0]["run_timestamp"] = np.full(len(file_df[0]), tstamp, dtype=object) + file_df[0]["run_timestamp"] = np.full( + len(file_df[0]), tstamp, dtype=object + ) df.append(file_df[0]) masks.append(file_df[1]) else: @@ -116,9 +118,11 @@ def load_data( ) df_fields = params & (fields | set(cal_dict)) if df_fields != params: - log.debug(f"load_data(): params not found in data files or cal_dict: {params-df_fields}") + log.debug( + f"load_data(): params not found in data files or cal_dict: {params-df_fields}" + ) df = pd.DataFrame(columns=list(df_fields)) - + for table, entry, n_rows in lh5_it: # Evaluate all provided expressions and add to table for outname, info in cal_dict.items(): From fa52dec41301102d5a6f19a01f7ed8e2d22f6a1c Mon Sep 17 00:00:00 2001 From: iguinn Date: Fri, 1 Nov 2024 07:29:21 -0700 Subject: [PATCH 5/5] Change default parallel to false --- src/pygama/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pygama/utils.py b/src/pygama/utils.py index 888ca396c..b6b73d26c 100644 --- a/src/pygama/utils.py +++ b/src/pygama/utils.py @@ -51,7 +51,7 @@ class NumbaPygamaDefaults(MutableMapping): """ def __init__(self) -> None: - self.parallel: bool = getenv_bool("PYGAMA_PARALLEL", default=True) + self.parallel: bool = getenv_bool("PYGAMA_PARALLEL", default=False) self.fastmath: bool = getenv_bool("PYGAMA_FASTMATH", default=True) def __getitem__(self, item: str) -> Any: