From ecde5f53a7610b2891d3e0a4adabaa081584004d Mon Sep 17 00:00:00 2001 From: Diane Napolitano Date: Fri, 13 Dec 2024 11:38:24 -0500 Subject: [PATCH] Moving preprocessed data handler subclassing to this branch --- .../data/BasePreprocessedDataHandler.py | 60 ++++++++++++++++ src/elexmodel/handlers/data/LiveData.py | 58 +++------------ .../handlers/data/PreprocessedData.py | 71 +++++-------------- tests/handlers/test_preprocessed_data.py | 2 +- 4 files changed, 90 insertions(+), 101 deletions(-) create mode 100644 src/elexmodel/handlers/data/BasePreprocessedDataHandler.py diff --git a/src/elexmodel/handlers/data/BasePreprocessedDataHandler.py b/src/elexmodel/handlers/data/BasePreprocessedDataHandler.py new file mode 100644 index 00000000..f99f9273 --- /dev/null +++ b/src/elexmodel/handlers/data/BasePreprocessedDataHandler.py @@ -0,0 +1,60 @@ +import abc +from io import StringIO +from pathlib import Path + +import pandas as pd + +from elexmodel.handlers.data.Estimandizer import Estimandizer +from elexmodel.utils.file_utils import get_directory_path + + +class BasePreprocessedDataHandler(abc.ABC): + """ + Abstract base handler for preprocessed (input) model data + """ + + def __init__( + self, election_id, office_id, geographic_unit_type, estimands, s3_client=None, historical=False, data=None + ): + self.election_id = election_id + self.office_id = office_id + self.geographic_unit_type = geographic_unit_type + self.estimands = estimands + self.s3_client = s3_client + self.historical = historical + self.estimandizer = Estimandizer() + self.file_path = self.get_data_path() + + if data is not None: + self.data = self.load_data(data) + else: + self.data = self.get_data() + + def get_data_path(self): + directory_path = get_directory_path() + path = f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv" + return path + + def get_data(self): + # If local data file is not available, read data from s3 + if not Path(self.file_path).is_file(): + path_info = { + "election_id": self.election_id, + "office": self.office_id, + "geographic_unit_type": self.geographic_unit_type, + } + file_path = self.s3_client.get_file_path("preprocessed", path_info) + + csv_data = self.s3_client.get(file_path) + # read data as a buffer + preprocessed_data = StringIO(csv_data) + else: + # read data as a filepath + preprocessed_data = self.file_path + + data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str}) + return self.load_data(data) + + @abc.abstractmethod + def load_data(self, data): + pass diff --git a/src/elexmodel/handlers/data/LiveData.py b/src/elexmodel/handlers/data/LiveData.py index 56e6c352..797f61be 100644 --- a/src/elexmodel/handlers/data/LiveData.py +++ b/src/elexmodel/handlers/data/LiveData.py @@ -1,15 +1,12 @@ import math -from io import StringIO -from pathlib import Path import numpy as np import pandas as pd -from elexmodel.handlers.data.Estimandizer import Estimandizer -from elexmodel.utils.file_utils import get_directory_path +from elexmodel.handlers.data.BasePreprocessedDataHandler import BasePreprocessedDataHandler -class MockLiveDataHandler: +class MockLiveDataHandler(BasePreprocessedDataHandler): """ Handles current data, which we would pull from Dynamo on an election night """ @@ -25,14 +22,7 @@ def __init__( s3_client=None, unexpected_units=0, ): - self.election_id = election - self.office_id = office_id - self.geographic_unit_type = geographic_unit_type - self.estimands = estimands - self.s3_client = s3_client - self.historical = historical self.unexpected_rows = unexpected_units - self.estimandizer = Estimandizer() self.shuffle_columns = [ "postal_code", @@ -41,45 +31,17 @@ def __init__( ] # columns we may want to sample by self.shuffle_dataframe = None - self.data = data - if data is not None: - # passed in as a df - data_for_estimands = self.load_data(data) - self.data = data_for_estimands - else: - self.data = self.get_data() - self.current_reporting_data = None - def get_data(self): - file_path = self.get_live_data_file_path() - # If local data file is not available, read data from s3 - if not Path(file_path).is_file(): - path_info = { - "election_id": self.election_id, - "office": self.office_id, - "geographic_unit_type": self.geographic_unit_type, - } - # we're mimicking live data from a file of preprocessed data - # but for a real live election, we will pull live data from dynamo - file_path = self.s3_client.get_file_path("preprocessed", path_info) - csv_data = self.s3_client.get(file_path) - # read data as a buffer - live_data = StringIO(csv_data) - else: - # read data as a filepath - live_data = file_path - - data = pd.read_csv( - live_data, - dtype={"geographic_unit_fips": str, "geographic_unit_type": str, "county_fips": str, "district": str}, + super().__init__( + election, + office_id, + geographic_unit_type, + estimands, + s3_client=s3_client, + historical=historical, + data=data, ) - data = self.load_data(data) - return data - - def get_live_data_file_path(self): - directory_path = get_directory_path() - return f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv" def load_data(self, data): columns_to_return = ["postal_code", "geographic_unit_fips"] diff --git a/src/elexmodel/handlers/data/PreprocessedData.py b/src/elexmodel/handlers/data/PreprocessedData.py index 82ba7335..01170a3a 100644 --- a/src/elexmodel/handlers/data/PreprocessedData.py +++ b/src/elexmodel/handlers/data/PreprocessedData.py @@ -1,16 +1,13 @@ -from io import StringIO from pathlib import Path -import pandas as pd - -from elexmodel.handlers.data.Estimandizer import Estimandizer +from elexmodel.handlers.data.BasePreprocessedDataHandler import BasePreprocessedDataHandler from elexmodel.logger import getModelLogger -from elexmodel.utils.file_utils import create_directory, get_directory_path +from elexmodel.utils.file_utils import create_directory LOG = getModelLogger() -class PreprocessedDataHandler: +class PreprocessedDataHandler(BasePreprocessedDataHandler): """ Handler for preprocessed data for model """ @@ -18,7 +15,7 @@ class PreprocessedDataHandler: def __init__( self, election_id, - office, + office_id, geographic_unit_type, estimands, estimand_baselines, @@ -30,47 +27,17 @@ def __init__( """ Initialize preprocessed data. If not present, download from s3. """ - self.election_id = election_id - self.office = office - self.geographic_unit_type = geographic_unit_type - self.estimands = estimands - self.s3_client = s3_client self.estimand_baselines = estimand_baselines - self.historical = historical self.include_results_estimand = include_results_estimand - self.estimandizer = Estimandizer() - - self.local_file_path = self.get_preprocessed_data_path() - - if data is not None: - self.data = self.load_data(data) - else: - self.data = self.get_data() - - def get_data(self): - # If local data file is not available, read data from s3 - if not Path(self.local_file_path).is_file(): - path_info = { - "election_id": self.election_id, - "office": self.office, - "geographic_unit_type": self.geographic_unit_type, - } - file_path = self.s3_client.get_file_path("preprocessed", path_info) - - csv_data = self.s3_client.get(file_path) - # read data as a buffer - preprocessed_data = StringIO(csv_data) - else: - # read data as a filepath - preprocessed_data = self.local_file_path - - data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str}) - return self.load_data(data) - - def get_preprocessed_data_path(self): - directory_path = get_directory_path() - path = f"{directory_path}/data/{self.election_id}/{self.office}/data_{self.geographic_unit_type}.csv" - return path + super().__init__( + election_id, + office_id, + geographic_unit_type, + estimands, + s3_client=s3_client, + historical=historical, + data=data, + ) def select_rows_in_states(self, data, states_with_election): data = data.query( @@ -80,13 +47,13 @@ def select_rows_in_states(self, data, states_with_election): ) return data - def load_data(self, preprocessed_data): + def load_data(self, data): """ Load preprocessed csv data as df """ - LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office, self.geographic_unit_type) + LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office_id, self.geographic_unit_type) data = self.estimandizer.add_estimand_baselines( - preprocessed_data, + data, self.estimand_baselines, self.historical, include_results_estimand=self.include_results_estimand, @@ -95,6 +62,6 @@ def load_data(self, preprocessed_data): return data def save_data(self, preprocessed_data): - if not Path(self.local_file_path).parent.exists(): - create_directory(str(Path(self.local_file_path).parent)) - preprocessed_data.to_csv(self.local_file_path, index=False) + if not Path(self.file_path).parent.exists(): + create_directory(str(Path(self.file_path).parent)) + preprocessed_data.to_csv(self.file_path, index=False) diff --git a/tests/handlers/test_preprocessed_data.py b/tests/handlers/test_preprocessed_data.py index 35adac7b..fac5f7c8 100644 --- a/tests/handlers/test_preprocessed_data.py +++ b/tests/handlers/test_preprocessed_data.py @@ -10,7 +10,7 @@ def test_save(va_governor_county_data, test_path): local_file_path = f"{test_path}/test_dir/data_county.csv" if os.path.exists(local_file_path): os.remove(local_file_path) - data_handler.local_file_path = local_file_path + data_handler.file_path = local_file_path data_handler.save_data(va_governor_county_data) assert os.path.exists(local_file_path)