Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preprocessed data handler superclass #147

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/elexmodel/handlers/data/BasePreprocessedDataHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import abc
from io import StringIO
from pathlib import Path

import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import get_directory_path


class BasePreprocessedDataHandler(abc.ABC):
"""
Abstract base handler for preprocessed (input) model data
"""

def __init__(
self, election_id, office_id, geographic_unit_type, estimands, s3_client=None, historical=False, data=None
):
self.election_id = election_id
self.office_id = office_id
self.geographic_unit_type = geographic_unit_type
self.estimands = estimands
self.s3_client = s3_client
self.historical = historical
self.estimandizer = Estimandizer()
self.file_path = self.get_data_path()

if data is not None:
self.data = self.load_data(data)
else:
self.data = self.get_data()

def get_data_path(self):
directory_path = get_directory_path()
path = f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv"
return path

def get_data(self):
# If local data file is not available, read data from s3
if not Path(self.file_path).is_file():
path_info = {
"election_id": self.election_id,
"office": self.office_id,
"geographic_unit_type": self.geographic_unit_type,
}
file_path = self.s3_client.get_file_path("preprocessed", path_info)

csv_data = self.s3_client.get(file_path)
# read data as a buffer
preprocessed_data = StringIO(csv_data)
else:
# read data as a filepath
preprocessed_data = self.file_path

data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str})
return self.load_data(data)

@abc.abstractmethod
def load_data(self, data):
pass
58 changes: 10 additions & 48 deletions src/elexmodel/handlers/data/LiveData.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import math
from io import StringIO
from pathlib import Path

import numpy as np
import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.utils.file_utils import get_directory_path
from elexmodel.handlers.data.BasePreprocessedDataHandler import BasePreprocessedDataHandler


class MockLiveDataHandler:
class MockLiveDataHandler(BasePreprocessedDataHandler):
"""
Handles current data, which we would pull from Dynamo on an election night
"""
Expand All @@ -25,14 +22,7 @@ def __init__(
s3_client=None,
unexpected_units=0,
):
self.election_id = election
self.office_id = office_id
self.geographic_unit_type = geographic_unit_type
self.estimands = estimands
self.s3_client = s3_client
self.historical = historical
self.unexpected_rows = unexpected_units
self.estimandizer = Estimandizer()

self.shuffle_columns = [
"postal_code",
Expand All @@ -41,45 +31,17 @@ def __init__(
] # columns we may want to sample by
self.shuffle_dataframe = None

self.data = data
if data is not None:
# passed in as a df
data_for_estimands = self.load_data(data)
self.data = data_for_estimands
else:
self.data = self.get_data()

self.current_reporting_data = None

def get_data(self):
file_path = self.get_live_data_file_path()
# If local data file is not available, read data from s3
if not Path(file_path).is_file():
path_info = {
"election_id": self.election_id,
"office": self.office_id,
"geographic_unit_type": self.geographic_unit_type,
}
# we're mimicking live data from a file of preprocessed data
# but for a real live election, we will pull live data from dynamo
file_path = self.s3_client.get_file_path("preprocessed", path_info)
csv_data = self.s3_client.get(file_path)
# read data as a buffer
live_data = StringIO(csv_data)
else:
# read data as a filepath
live_data = file_path

data = pd.read_csv(
live_data,
dtype={"geographic_unit_fips": str, "geographic_unit_type": str, "county_fips": str, "district": str},
super().__init__(
election,
office_id,
geographic_unit_type,
estimands,
s3_client=s3_client,
historical=historical,
data=data,
)
data = self.load_data(data)
return data

def get_live_data_file_path(self):
directory_path = get_directory_path()
return f"{directory_path}/data/{self.election_id}/{self.office_id}/data_{self.geographic_unit_type}.csv"

def load_data(self, data):
columns_to_return = ["postal_code", "geographic_unit_fips"]
Expand Down
71 changes: 19 additions & 52 deletions src/elexmodel/handlers/data/PreprocessedData.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
from io import StringIO
from pathlib import Path

import pandas as pd

from elexmodel.handlers.data.Estimandizer import Estimandizer
from elexmodel.handlers.data.BasePreprocessedDataHandler import BasePreprocessedDataHandler
from elexmodel.logger import getModelLogger
from elexmodel.utils.file_utils import create_directory, get_directory_path
from elexmodel.utils.file_utils import create_directory

LOG = getModelLogger()


class PreprocessedDataHandler:
class PreprocessedDataHandler(BasePreprocessedDataHandler):
"""
Handler for preprocessed data for model
"""

def __init__(
self,
election_id,
office,
office_id,
geographic_unit_type,
estimands,
estimand_baselines,
Expand All @@ -30,47 +27,17 @@ def __init__(
"""
Initialize preprocessed data. If not present, download from s3.
"""
self.election_id = election_id
self.office = office
self.geographic_unit_type = geographic_unit_type
self.estimands = estimands
self.s3_client = s3_client
self.estimand_baselines = estimand_baselines
self.historical = historical
self.include_results_estimand = include_results_estimand
self.estimandizer = Estimandizer()

self.local_file_path = self.get_preprocessed_data_path()

if data is not None:
self.data = self.load_data(data)
else:
self.data = self.get_data()

def get_data(self):
# If local data file is not available, read data from s3
if not Path(self.local_file_path).is_file():
path_info = {
"election_id": self.election_id,
"office": self.office,
"geographic_unit_type": self.geographic_unit_type,
}
file_path = self.s3_client.get_file_path("preprocessed", path_info)

csv_data = self.s3_client.get(file_path)
# read data as a buffer
preprocessed_data = StringIO(csv_data)
else:
# read data as a filepath
preprocessed_data = self.local_file_path

data = pd.read_csv(preprocessed_data, dtype={"geographic_unit_fips": str, "county_fips": str, "district": str})
return self.load_data(data)

def get_preprocessed_data_path(self):
directory_path = get_directory_path()
path = f"{directory_path}/data/{self.election_id}/{self.office}/data_{self.geographic_unit_type}.csv"
return path
super().__init__(
election_id,
office_id,
geographic_unit_type,
estimands,
s3_client=s3_client,
historical=historical,
data=data,
)

def select_rows_in_states(self, data, states_with_election):
data = data.query(
Expand All @@ -80,13 +47,13 @@ def select_rows_in_states(self, data, states_with_election):
)
return data

def load_data(self, preprocessed_data):
def load_data(self, data):
"""
Load preprocessed csv data as df
"""
LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office, self.geographic_unit_type)
LOG.info("Loading preprocessed data: %s, %s, %s", self.election_id, self.office_id, self.geographic_unit_type)
data = self.estimandizer.add_estimand_baselines(
preprocessed_data,
data,
self.estimand_baselines,
self.historical,
include_results_estimand=self.include_results_estimand,
Expand All @@ -95,6 +62,6 @@ def load_data(self, preprocessed_data):
return data

def save_data(self, preprocessed_data):
if not Path(self.local_file_path).parent.exists():
create_directory(str(Path(self.local_file_path).parent))
preprocessed_data.to_csv(self.local_file_path, index=False)
if not Path(self.file_path).parent.exists():
create_directory(str(Path(self.file_path).parent))
preprocessed_data.to_csv(self.file_path, index=False)
2 changes: 1 addition & 1 deletion tests/handlers/test_preprocessed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def test_save(va_governor_county_data, test_path):
local_file_path = f"{test_path}/test_dir/data_county.csv"
if os.path.exists(local_file_path):
os.remove(local_file_path)
data_handler.local_file_path = local_file_path
data_handler.file_path = local_file_path
data_handler.save_data(va_governor_county_data)

assert os.path.exists(local_file_path)
Expand Down
Loading