Skip to content

Commit

Permalink
Merge pull request #147 from washingtonpost/preprocessed-data-handler…
Browse files Browse the repository at this point in the history
…-superclass

Preprocessed data handler superclass
  • Loading branch information
dmnapolitano authored Dec 19, 2024
2 parents 76019d0 + cc14cd0 commit f36517b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 101 deletions.
60 changes: 60 additions & 0 deletions src/elexmodel/handlers/data/BasePreprocessedDataHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import abc
from io import StringIO
from pathlib import Path

import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import get_directory_path


class BasePreprocessedDataHandler(abc.ABC):
"""
Abstract base handler for preprocessed (input) model data
"""

def __init__(
self, election_id, office_id, geographic_unit_type, estimands, s3_client=None, historical=False, data=None
):
self.election_id = election_id
self.office_id = office_id
self.geographic_unit_type = geographic_unit_type
self.estimands = estimands
self.s3_client = s3_client
self.historical = historical
self.estimandizer = Estimandizer()
self.file_path = self.get_data_path()

if data is not None:
self.data = self.load_data(data)
else:
self.data = self.get_data()

def get_data_path(self):
directory_path = get_directory_path()
path = f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv"
return path

def get_data(self):
# If local data file is not available, read data from s3
if not Path(self.file_path).is_file():
path_info = {
"election_id": self.election_id,
"office": self.office_id,
"geographic_unit_type": self.geographic_unit_type,
}
file_path = self.s3_client.get_file_path("preprocessed", path_info)

csv_data = self.s3_client.get(file_path)
# read data as a buffer
preprocessed_data = StringIO(csv_data)
else:
# read data as a filepath
preprocessed_data = self.file_path

data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str})
return self.load_data(data)

@abc.abstractmethod
def load_data(self, data):
pass
58 changes: 10 additions & 48 deletions src/elexmodel/handlers/data/LiveData.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import math
from io import StringIO
from pathlib import Path

import numpy as np
import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import get_directory_path
from elexmodel.handlers.data.BasePreprocessedDataHandler import BasePreprocessedDataHandler


class MockLiveDataHandler:
class MockLiveDataHandler(BasePreprocessedDataHandler):
"""
Handles current data, which we would pull from Dynamo on an election night
"""
Expand All @@ -25,14 +22,7 @@ def __init__(
s3_client=None,
unexpected_units=0,
):
self.election_id = election
self.office_id = office_id
self.geographic_unit_type = geographic_unit_type
self.estimands = estimands
self.s3_client = s3_client
self.historical = historical
self.unexpected_rows = unexpected_units
self.estimandizer = Estimandizer()

self.shuffle_columns = [
"postal_code",
Expand All @@ -41,45 +31,17 @@ def __init__(
] # columns we may want to sample by
self.shuffle_dataframe = None

self.data = data
if data is not None:
# passed in as a df
data_for_estimands = self.load_data(data)
self.data = data_for_estimands
else:
self.data = self.get_data()

self.current_reporting_data = None

def get_data(self):
file_path = self.get_live_data_file_path()
# If local data file is not available, read data from s3
if not Path(file_path).is_file():
path_info = {
"election_id": self.election_id,
"office": self.office_id,
"geographic_unit_type": self.geographic_unit_type,
}
# we're mimicking live data from a file of preprocessed data
# but for a real live election, we will pull live data from dynamo
file_path = self.s3_client.get_file_path("preprocessed", path_info)
csv_data = self.s3_client.get(file_path)
# read data as a buffer
live_data = StringIO(csv_data)
else:
# read data as a filepath
live_data = file_path

data = pd.read_csv(
live_data,
dtype={"geographic_unit_fips": str, "geographic_unit_type": str, "county_fips": str, "district": str},
super().__init__(
election,
office_id,
geographic_unit_type,
estimands,
s3_client=s3_client,
historical=historical,
data=data,
)
data = self.load_data(data)
return data

def get_live_data_file_path(self):
directory_path = get_directory_path()
return f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv"

def load_data(self, data):
columns_to_return = ["postal_code", "geographic_unit_fips"]
Expand Down
71 changes: 19 additions & 52 deletions src/elexmodel/handlers/data/PreprocessedData.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
from io import StringIO
from pathlib import Path

import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.handlers.data.BasePreprocessedDataHandler import BasePreprocessedDataHandler
from elexmodel.logger import getModelLogger
from elexmodel.utils.file_utils import create_directory, get_directory_path
from elexmodel.utils.file_utils import create_directory

LOG = getModelLogger()


class PreprocessedDataHandler:
class PreprocessedDataHandler(BasePreprocessedDataHandler):
"""
Handler for preprocessed data for model
"""

def __init__(
self,
election_id,
office,
office_id,
geographic_unit_type,
estimands,
estimand_baselines,
Expand All @@ -30,47 +27,17 @@ def __init__(
"""
Initialize preprocessed data. If not present, download from s3.
"""
self.election_id = election_id
self.office = office
self.geographic_unit_type = geographic_unit_type
self.estimands = estimands
self.s3_client = s3_client
self.estimand_baselines = estimand_baselines
self.historical = historical
self.include_results_estimand = include_results_estimand
self.estimandizer = Estimandizer()

self.local_file_path = self.get_preprocessed_data_path()

if data is not None:
self.data = self.load_data(data)
else:
self.data = self.get_data()

def get_data(self):
# If local data file is not available, read data from s3
if not Path(self.local_file_path).is_file():
path_info = {
"election_id": self.election_id,
"office": self.office,
"geographic_unit_type": self.geographic_unit_type,
}
file_path = self.s3_client.get_file_path("preprocessed", path_info)

csv_data = self.s3_client.get(file_path)
# read data as a buffer
preprocessed_data = StringIO(csv_data)
else:
# read data as a filepath
preprocessed_data = self.local_file_path

data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str})
return self.load_data(data)

def get_preprocessed_data_path(self):
directory_path = get_directory_path()
path = f"{directory_path}/data/{self.election_id}/{self.office}/data_{self.geographic_unit_type}.csv"
return path
super().__init__(
election_id,
office_id,
geographic_unit_type,
estimands,
s3_client=s3_client,
historical=historical,
data=data,
)

def select_rows_in_states(self, data, states_with_election):
data = data.query(
Expand All @@ -80,13 +47,13 @@ def select_rows_in_states(self, data, states_with_election):
)
return data

def load_data(self, preprocessed_data):
def load_data(self, data):
"""
Load preprocessed csv data as df
"""
LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office, self.geographic_unit_type)
LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office_id, self.geographic_unit_type)
data = self.estimandizer.add_estimand_baselines(
preprocessed_data,
data,
self.estimand_baselines,
self.historical,
include_results_estimand=self.include_results_estimand,
Expand All @@ -95,6 +62,6 @@ def load_data(self, preprocessed_data):
return data

def save_data(self, preprocessed_data):
if not Path(self.local_file_path).parent.exists():
create_directory(str(Path(self.local_file_path).parent))
preprocessed_data.to_csv(self.local_file_path, index=False)
if not Path(self.file_path).parent.exists():
create_directory(str(Path(self.file_path).parent))
preprocessed_data.to_csv(self.file_path, index=False)
2 changes: 1 addition & 1 deletion tests/handlers/test_preprocessed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def test_save(va_governor_county_data, test_path):
local_file_path = f"{test_path}/test_dir/data_county.csv"
if os.path.exists(local_file_path):
os.remove(local_file_path)
data_handler.local_file_path = local_file_path
data_handler.file_path = local_file_path
data_handler.save_data(va_governor_county_data)

assert os.path.exists(local_file_path)
Expand Down

0 comments on commit f36517b

Please sign in to comment.