Skip to content

Commit

Permalink
Merge pull request #59 from washingtonpost/estimandizer which resolve…
Browse files Browse the repository at this point in the history
…s ELEX-2763
  • Loading branch information
dmnapolitano authored Sep 13, 2023
2 parents a17d9ee + 77a14d8 commit 60f1e1f
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 60 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ Parameters for the CLI tool:
Note: When running the model with multiple fixed effects, make sure they are not linearly dependent. For example, `county_fips` and `county_classification` are linearly dependent when run together. That's because every county is in one county class, so all the fixed effect columns of the counties in the county class sum up to the fixed effect column of that county class.


#### Custom Estimands

It's possible to create a custom estimand based on other data elements. Here's how to create a new estimand called "my_estimand":

1. In `src/elexmodel/handlers/data/Estimandizer.py`, create a function with the signature `def my_estimand(data_df)`.
2. In `my_estimand()`, use the columns in `data_df` to create a new column, either `baseline_my_estimand` or `results_my_estimand` as necessary. See the `party_vote_share_dem` function for an example.
3. Specify `my_estimand` as one of your estimands. For example, via the command line:

```
elexmodel 2017-11-07_VA_G --estimands my_estimand --office_id=G --geographic_unit_type=county --percent_reporting 50
```

Your output should have columns including `baseline_my_estimand`, `results_my_estimand`, and related columns for the prediction intervals, if using them.

Here's an example showing multiple estimands, including `my_estimand`:

```
elexmodel 2017-11-07_VA_G --estimands=turnout --estimands my_estimand --estimands party_vote_share_dem --office_id=G --geographic_unit_type=county --percent_reporting 50
```

#### Python

##### Model Parameters
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ max-line-length = 120
[pylint]
max-line-length = 120
good-names= on, x, df, NonparametricElectionModel, GaussianElectionModel,
BaseElectionModel, qr, X, y, f, LiveData, n, Featurizer, fe, PreprocessedData, CombinedData,
BaseElectionModel, qr, X, y, f, LiveData, n, Featurizer, Estimandizer, fe, PreprocessedData, CombinedData,
ModelResults, GaussianModel, MODEL_THRESHOLD, LOG, w, df_X, df_y, v, n, g, a, b
disable=missing-function-docstring, missing-module-docstring, missing-class-docstring, #missing
too-many-arguments, too-many-locals, too-many-branches, too-many-instance-attributes, too-many-statements, #structure: too-many
Expand Down
4 changes: 2 additions & 2 deletions src/elexmodel/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class PythonLiteralOption(click.Option):
def type_cast_value(self, ctx, value):
try:
return ast.literal_eval(value)
except ValueError:
raise click.BadParameter(value)
except ValueError as e:
raise click.BadParameter(value) from e


@click.command()
Expand Down
22 changes: 16 additions & 6 deletions src/elexmodel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,27 @@ def _check_input_parameters(
offices = config_handler.get_offices()
if office not in offices:
raise ValueError(f"Office '{office}' is not valid. Please check config.")
valid_estimands = config_handler.get_estimands(office)
for estimand in estimands:
if estimand not in valid_estimands:
raise ValueError(f"Estimand: '{estimand}' is not valid. Please check config")

estimands_in_config = config_handler.get_estimands(office)
extra_estimands = set(estimands).difference(set(estimands_in_config))
if len(extra_estimands) > 0:
# this is ok; later they'll all be created by the Estimandizer
LOG.info("Found additional estimands that were not specified in the config file: %s", extra_estimands)

geographic_unit_types = config_handler.get_geographic_unit_types(office)
if geographic_unit_type not in geographic_unit_types:
raise ValueError(f"Geographic unit type: '{geographic_unit_type}' is not valid. Please check config")

model_features = config_handler.get_features(office)
invalid_features = [feature for feature in features if feature not in model_features]
invalid_features = list(set(features).difference(set(model_features)))
if len(invalid_features) > 0:
raise ValueError(f"Feature(s): {invalid_features} not valid. Please check config")

model_aggregates = config_handler.get_aggregates(office)
invalid_aggregates = [aggregate for aggregate in aggregates if aggregate not in model_aggregates]
if len(invalid_aggregates) > 0:
raise ValueError(f"Aggregate(s): {invalid_aggregates} not valid. Please check config")

model_fixed_effects = config_handler.get_fixed_effects(office)
if isinstance(fixed_effects, dict):
invalid_fixed_effects = [
Expand All @@ -82,14 +88,16 @@ def _check_input_parameters(
]
if len(invalid_fixed_effects) > 0:
raise ValueError(f"Fixed effect(s): {invalid_fixed_effects} not valid. Please check config")

if pi_method not in {"gaussian", "nonparametric"}:
raise ValueError(
f"Prediction interval method: {pi_method} is not valid. \
pi_method has to be either `gaussian` or `nonparametric`."
)

if not isinstance(model_parameters, dict):
raise ValueError("model_paramters is not valid. Has to be a dict.")
elif model_parameters != {}:
if model_parameters != {}:
if "lambda_" in model_parameters and (
not isinstance(model_parameters["lambda_"], (float, int)) or model_parameters["lambda_"] < 0
):
Expand All @@ -102,8 +110,10 @@ def _check_input_parameters(
elif pi_method == "nonparametric":
if "robust" in model_parameters and not isinstance(model_parameters["robust"], bool):
raise ValueError("robust is not valid. Has to be a boolean.")

if handle_unreporting not in {"drop", "zero"}:
raise ValueError("handle_unreporting must be either `drop` or `zero`")

return True

def get_aggregate_list(self, office, aggregate):
Expand Down
9 changes: 8 additions & 1 deletion src/elexmodel/handlers/data/CombinedData.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from elexmodel.handlers import s3
from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import S3_FILE_PATH, TARGET_BUCKET, convert_df_to_csv


Expand All @@ -16,6 +17,12 @@ def __init__(
handle_unreporting="drop",
):
self.estimands = estimands

estimandizer = Estimandizer()
(current_data, _) = estimandizer.check_and_create_estimands(
current_data.copy(), self.estimands, False, current_data=True
)

# if we're running this for a past election, drop results columns from preprocessed data
# so we use results_{estimand} numbers from current_data
preprocessed_results_columns = list(filter(lambda col: col.startswith("results_"), preprocessed_data.columns))
Expand All @@ -27,7 +34,7 @@ def __init__(
# if unreporting is 'drop' then drop units that are not reporting (ie. units where results are na)
# this is necessary if units will not be returning results in this election,
# but we didn't know that (ie. townships)
result_cols = [f"results_{estimand}" for estimand in estimands]
result_cols = [f"results_{estimand}" for estimand in self.estimands]
if handle_unreporting == "drop":
# Drop the whole row if an estimand is not reporting
data = data.dropna(axis=0, how="any", subset=result_cols)
Expand Down
88 changes: 88 additions & 0 deletions src/elexmodel/handlers/data/Estimandizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from numpy import nan


class EstimandException(Exception):
pass


RESULTS_PREFIX = "results_"
BASELINE_PREFIX = "baseline_"


class Estimandizer:
"""
Estimandizer. Generate estimands explicitly.
"""

def check_and_create_estimands(self, data_df, estimands, historical, current_data=False):
columns_to_return = []

for estimand in estimands:
results_col = f"{RESULTS_PREFIX}{estimand}"
baseline_col = f"{BASELINE_PREFIX}{estimand}"
target_col = results_col if current_data else baseline_col

if target_col not in data_df.columns:
if estimand in data_df.columns:
data_df[target_col] = data_df[estimand].copy()
else:
# will raise a KeyError if a function with the same name as `estimand` doesn't exist
data_df = globals()[estimand](data_df)
if target_col == baseline_col:
data_df[results_col] = data_df[baseline_col].copy()

if historical:
data_df[results_col] = nan
else:
if results_col not in data_df.columns:
raise EstimandException("This is missing results data for estimand: ", estimand)

columns_to_return.append(results_col)

results_column_names = [x for x in data_df.columns if x.startswith(RESULTS_PREFIX)]
# If this is not a historical run, then this is a live election
# so we are expecting that there will be actual results data
if not historical and len(results_column_names) == 0:
raise EstimandException("This is not a test election, it is missing results data")

return (data_df, columns_to_return)

def add_estimand_baselines(self, data_df, estimand_baselines, historical):
# if we are in a historical election we are only reading preprocessed data to get
# the historical election results of the currently reporting units.
# so we don't care about the total voters or the baseline election.

for estimand, pointer in estimand_baselines.items():
if pointer is None:
# should only happen when we're going to create a new estimand
pointer = estimand

baseline_col = f"{BASELINE_PREFIX}{pointer}"

if baseline_col not in data_df.columns:
# will raise a KeyError if a function with the same name as `pointer` doesn't exist
data_df = globals()[pointer](data_df)
results_col = f"{RESULTS_PREFIX}{estimand}"
data_df[results_col] = data_df[baseline_col].copy()

if not historical:
# Adding one to prevent zero divison
data_df[f"last_election_results_{estimand}"] = data_df[baseline_col].copy() + 1

return data_df


# custom estimands


def party_vote_share_dem(data_df):
# should only happen when we're replaying an election
if f"{BASELINE_PREFIX}dem" not in data_df.columns and f"{BASELINE_PREFIX}turnout" not in data_df.columns:
data_df[f"{RESULTS_PREFIX}party_vote_share_dem"] = (
data_df[f"{RESULTS_PREFIX}dem"] / data_df[f"{RESULTS_PREFIX}turnout"]
)
else:
data_df[f"{BASELINE_PREFIX}party_vote_share_dem"] = (
data_df[f"{BASELINE_PREFIX}dem"] / data_df[f"{BASELINE_PREFIX}turnout"]
)
return data_df
27 changes: 9 additions & 18 deletions src/elexmodel/handlers/data/LiveData.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
import numpy as np
import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import get_directory_path


class MockLiveDataHandlerException(Exception):
pass


class MockLiveDataHandler:
"""
Handles current data, which we would pull from Dynamo on an election night
Expand All @@ -35,6 +32,7 @@ def __init__(
self.s3_client = s3_client
self.historical = historical
self.unexpected_rows = unexpected_units
self.estimandizer = Estimandizer()

self.shuffle_columns = [
"postal_code",
Expand All @@ -46,7 +44,7 @@ def __init__(
self.data = data
if data is not None:
# passed in as a df
data_for_estimands = self.load_data(data, estimands, historical)
data_for_estimands = self.load_data(data)
self.data = data_for_estimands
else:
self.data = self.get_data()
Expand Down Expand Up @@ -76,26 +74,19 @@ def get_data(self):
live_data,
dtype={"geographic_unit_fips": str, "geographic_unit_type": str, "county_fips": str, "district": str},
)
data = self.load_data(data, self.estimands, self.historical)
data = self.load_data(data)
return data

def get_live_data_file_path(self):
directory_path = get_directory_path()
return f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv"

def load_data(self, data, estimands, historical):
def load_data(self, data):
columns_to_return = ["postal_code", "geographic_unit_fips"]
for estimand in estimands:
if historical:
data[f"results_{estimand}"] = np.nan
results_column_names = [x for x in data.columns if x.startswith("results")]
# If this is not a historical run, then this is a live election
# so we are expecting that there will be actual results data
if not self.historical and len(results_column_names) == 0:
raise MockLiveDataHandlerException("This is not a test election, it is missing results data")
if f"results_{estimand}" not in results_column_names:
raise MockLiveDataHandlerException("This is missing results data for estimand: ", estimand)
columns_to_return.append(f"results_{estimand}")

(data, more_columns) = self.estimandizer.check_and_create_estimands(data, self.estimands, self.historical)
columns_to_return += more_columns

self.shuffle_dataframe = data[self.shuffle_columns].copy()
return data[columns_to_return].copy()

Expand Down
21 changes: 6 additions & 15 deletions src/elexmodel/handlers/data/PreprocessedData.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import create_directory, get_directory_path

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,11 +36,12 @@ def __init__(
self.s3_client = s3_client
self.estimand_baselines = estimand_baselines
self.historical = historical
self.estimandizer = Estimandizer()

self.local_file_path = self.get_preprocessed_data_path()

if data is not None:
self.data = self.load_data(data, estimand_baselines)
self.data = self.load_data(data)
else:
self.data = self.get_data()

Expand All @@ -61,7 +63,7 @@ def get_data(self):
preprocessed_data = self.local_file_path

data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str})
return self.load_data(data, self.estimand_baselines)
return self.load_data(data)

def get_preprocessed_data_path(self):
directory_path = get_directory_path()
Expand All @@ -76,24 +78,13 @@ def select_rows_in_states(self, data, states_with_election):
)
return data

def load_data(self, preprocessed_data, estimand_baselines):
def load_data(self, preprocessed_data):
"""
Load preprocessed csv data as df
"""
LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office, self.geographic_unit_type)

if self.historical:
# if we are in a historical election we are only reading preprocessed data to get
# the historical election results of the currently reporting units.
# so we don't care about the total voters or the baseline election.
return preprocessed_data

for estimand, pointer in estimand_baselines.items():
baseline_name = f"baseline_{pointer}"
# Adding one to prevent zero divison
preprocessed_data[f"last_election_results_{estimand}"] = preprocessed_data[baseline_name].copy() + 1

return preprocessed_data
return self.estimandizer.add_estimand_baselines(preprocessed_data, self.estimand_baselines, self.historical)

def save_data(self, preprocessed_data):
if not Path(self.local_file_path).parent.exists():
Expand Down
2 changes: 1 addition & 1 deletion src/elexmodel/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"elexmodel": {
"handlers": ["default"],
"level": "INFO",
"propagate": False,
"propagate": True,
}
},
}
Expand Down
Loading

0 comments on commit 60f1e1f

Please sign in to comment.