From f50b73ca4a953677605fe6e640b637763584d625 Mon Sep 17 00:00:00 2001 From: John Cherian Date: Tue, 22 Oct 2024 21:15:56 -0700 Subject: [PATCH] changes to test epsilon sampling that account for limited signal in states with few units reporting + guaranteed overlap in aggregate PIs --- .../models/BootstrapElectionModel.py | 79 +++++++++++++------ 1 file changed, 55 insertions(+), 24 deletions(-) diff --git a/src/elexmodel/models/BootstrapElectionModel.py b/src/elexmodel/models/BootstrapElectionModel.py index 003fe182..217212cf 100644 --- a/src/elexmodel/models/BootstrapElectionModel.py +++ b/src/elexmodel/models/BootstrapElectionModel.py @@ -6,6 +6,7 @@ import pandas as pd from elexsolver.OLSRegressionSolver import OLSRegressionSolver from elexsolver.QuantileRegressionSolver import QuantileRegressionSolver +from scipy.linalg import block_diag from scipy.special import expit from elexmodel.handlers.data.Featurizer import Featurizer @@ -586,40 +587,66 @@ def _sample_test_epsilon( This function generates new test epsilons (contest level random effects) NOTE: for now we are sampling from a normal with mean zero and sample variance. """ - # the contests that need a sampled contest level random effect are those that still have outstanding - # units (since for the others, the contest level random effect is a known fixed quantity) but no - # current estimate for the epsilon in that contest (so ones with NO training examples) - - # this gives us indices of contests for which we have no samples in the training set - contests_not_in_reporting_units = np.where(np.all(aggregate_indicator_train == 0, axis=0))[0] - # gives us contests for which there is at least one county not reporting - contests_in_nonreporting_units = np.where(np.any(aggregate_indicator_test > 0, axis=0))[0] - # contests that need a random effect are: - # contests that we want to make predictions on (ie. there are still outstanding units) - # BUT no units in that contest are reporting - contests_that_need_random_effect = np.intersect1d( - contests_not_in_reporting_units, contests_in_nonreporting_units - ) - - # \epsilon ~ N(0, \Sigma) - mu_hat = [0, 0] - - # the variance is the sample variance of epsilons for which we have an estimate non_zero_epsilon_indices = np.nonzero(epsilon_y_hat)[0] # if there is only one non zero contest OR if the contest is a state level election only # then just sample from nearly zero since no variance if non_zero_epsilon_indices.shape[0] == 1: return np.zeros((1, self.B)), np.zeros((1, self.B)) - sigma_hat = np.cov(np.concatenate([epsilon_y_hat, epsilon_z_hat], axis=1)[np.nonzero(epsilon_y_hat)[0]].T) + aggregate_indicator = np.concatenate([aggregate_indicator_train, aggregate_indicator_test], axis=0) + + # computes standard error of epsilon_hat estimate for each contest + # formula is given by square root of (1 - n/N) * (pop variance) / n + # where n is the number of observed units and N is the number of units in the contest + # pop variance is the variance of the residuals in the contest + def _get_epsilon_hat_std(residuals, epsilon): + var = np.var(aggregate_indicator_train * residuals, axis=0) # incorrect denominator for variance + var *= aggregate_indicator_train.shape[0] / (aggregate_indicator_train.sum(axis=0) - 1) # Bessel's correction + var *= (1 - aggregate_indicator_train.sum(axis=0) / aggregate_indicator.sum(axis=0)) / aggregate_indicator_train.sum(axis=0) + + # where we have < 2 units in a contest, we set the variance to the variance of the observed epsilon_hat values + var[np.isnan(var) | np.isinf(var)] = np.var( + epsilon[np.nonzero(epsilon)[0]].T, + ddof=1 + ) + return np.sqrt(var) + + std_y = _get_epsilon_hat_std(residuals_y, epsilon_y_hat) + std_z = _get_epsilon_hat_std(residuals_z, epsilon_z_hat) + + std = np.zeros((std_y.shape[0] + std_z.shape[0],)) + std[0::2] = std_y + std[1::2] = std_z + + # high observed correlation between epsilon_y_hat and epsilon_z_hat in 2020, so this is important + corr = np.corrcoef( + np.concatenate([epsilon_y_hat, epsilon_z_hat], axis=1)[np.nonzero(epsilon_y_hat)[0]].T + ) + # tile corr into a block diagonal matrix + corr_list = [corr] * aggregate_indicator.shape[1] + corr_hat = block_diag(*corr_list) + + # TODO: may wish to add additional correlations to corr_hat + # e.g., correlation across the rust belt, sun belt, abortion referenda states, etc. + + # \epsilon ~ N(0, \Sigma) + # Sigma is a block diagonal matrix with 2x2 blocks running down the diagonal and 0s elsewhere + # each block is the covariance matrix of epsilon_y and epsilon_z for a particular contest, + # e.g., the first block is for the AK contest, the second block is for the AL contest, etc. + # we can sample from this distribution to get new epsilons + mu_hat = np.zeros(corr_hat.shape[0]) + sigma_hat = np.diag(std) @ corr_hat @ np.diag(std) - # sample new test epsilons, but only for states that need tem test_epsilon = self.rng.multivariate_normal( - mu_hat, sigma_hat, size=(len(contests_that_need_random_effect), self.B) + mu_hat, sigma_hat, size=self.B ) - test_epsilon_y = aggregate_indicator_test[:, contests_that_need_random_effect] @ test_epsilon[:, :, 0] - test_epsilon_z = aggregate_indicator_test[:, contests_that_need_random_effect] @ test_epsilon[:, :, 1] + test_epsilon_y = test_epsilon[:,0::2].T + test_epsilon_z = test_epsilon[:,1::2].T + + test_epsilon_y = aggregate_indicator_test @ test_epsilon_y + test_epsilon_z = aggregate_indicator_test @ test_epsilon_z + return test_epsilon_y, test_epsilon_z def _sample_test_errors( @@ -1437,6 +1464,10 @@ def get_aggregate_prediction_intervals( interval_upper = interval_upper.reshape(-1, 1) interval_lower = interval_lower.reshape(-1, 1) + # guarantee overlap between the prediction interval and the point prediction + interval_lower = np.minimum(interval_lower, aggregate_perc_margin_total - 0.001) + interval_upper = np.maximum(interval_upper, aggregate_perc_margin_total + 0.001) + return PredictionIntervals(interval_lower, interval_upper) def get_national_summary_estimates(self, nat_sum_data_dict: dict, base_to_add: int | float, alpha: float) -> list: