-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Code migration texas #102
base: main
Are you sure you want to change the base?
Code migration texas #102
Changes from 2 commits
866e465
7c88945
31f908a
31d01cd
5be4a63
92efae2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import pandas as pd | ||
import os | ||
|
||
contrib_files = [file for file in os.listdir('/Users/yuexu/Downloads/TEC_CF_CSV') if file.startswith('contribs_') and file.endswith('.csv')] | ||
expend_files = [file for file in os.listdir('/Users/yuexu/Downloads/TEC_CF_CSV') if file.startswith('expend_') and file.endswith('.csv')] | ||
|
||
def mergeFiles(files,mergedFileName): | ||
merged_df = pd.DataFrame() | ||
|
||
for file in files: | ||
print(f"Processing File {file}...") | ||
try: | ||
df = pd.read_csv(os.path.join('/Users/yuexu/Downloads/TEC_CF_CSV', file), low_memory=False) # Read CSV file into DataFrame | ||
# if mergedFileName =="merged_trimmed_contribs": | ||
# df = df[['reportInfoIdent', | ||
# 'receivedDt', 'filerIdent', 'filerTypeCd', 'filerName', | ||
# 'contributionInfoId', 'contributionDt', 'contributionAmount', | ||
# 'contributionDescr', 'contributorNameOrganization', | ||
# 'contributorNameLast', 'contributorNameSuffixCd', | ||
# 'contributorNameFirst', 'contributorNamePrefixCd', | ||
# 'contributorNameShort', 'contributorEmployer', | ||
# 'contributorOccupation', 'contributorJobTitle']] | ||
df= df.sample(50) | ||
merged_df = pd.concat([merged_df, df], ignore_index=True) # Concatenate with merged DataFrame | ||
except FileNotFoundError: | ||
print(f"File {file} not found. Skipping...") | ||
continue | ||
|
||
merged_df.to_csv(f'/Users/yuexu/Desktop/practicum project/climate-cabinet-campaign-finance-tracker/data/raw/TX/sample/{mergedFileName}.csv', index=False) | ||
|
||
|
||
|
||
|
||
|
||
mergeFiles(contrib_files,"contribs") | ||
# mergeFiles(expend_files,"merged_expend") | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
import abc | ||
|
||
import pandas as pd | ||
|
||
from utils.transform import constants as const | ||
|
||
|
||
class Form(abc.ABC): | ||
def __init__(self, required_columns: list[str], column_mapper: dict): | ||
self.required_columns = required_columns | ||
self.column_mapper = column_mapper | ||
self.table = None | ||
|
||
@abc.abstractmethod | ||
def read_table(self, paths: list[str]) -> pd.DataFrame: | ||
"""Read table(s) into a DataFrame""" | ||
pass | ||
|
||
def map_columns(self) -> None: | ||
"""Map and filter columns of the DataFrame""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a bit more explanation to this |
||
self.table.rename(columns=self.column_mapper, inplace=True) | ||
self.table = self.table[self.required_columns] | ||
return | ||
|
||
def get_table(self) -> pd.DataFrame: | ||
return self.table | ||
|
||
|
||
class ContributionForm(Form): | ||
def __init__(self, column_mapper=None): | ||
required_columns = ["RECIPIENT_ID", "DONOR", "AMOUNT", "YEAR", "PURPOSE"] | ||
|
||
super().__init__(required_columns, column_mapper) | ||
|
||
def read_table(self, paths: list[str]) -> pd.DataFrame: | ||
# Read and concatenate multiple tables (e.g., expense_01.csv, expense_02.csv) | ||
tables = [pd.read_csv(path) for path in paths] | ||
self.table = pd.concat(tables) | ||
print("Reading contribution table...") | ||
return self.table | ||
|
||
def map_columns(self) -> None: | ||
"""Refine the mapping and selection of columns specific to Texas contributions.""" | ||
# Call the parent implementation of map_columns which automatically handles the mapping and filtering based on 'column_mapper' and 'required_columns' | ||
super().map_columns() | ||
self.table["RECIPIENT_ID"] = self.table["RECIPIENT_ID"].astype("str") | ||
return | ||
|
||
|
||
class ExpenseForm(Form): | ||
def __init__(self, column_mapper=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would need different 'read_table' for some states. To deal with this, you can make state specific subclasses. This class should be considered the most common/standard Expense form. Make sure you add a docstring to the class that describes that |
||
required_columns = [ | ||
"DONOR_ID", | ||
"RECIPIENT", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RECIPIENT_ID |
||
"PURPOSE", | ||
"AMOUNT", | ||
"YEAR", | ||
] | ||
|
||
super().__init__(required_columns, column_mapper) | ||
|
||
def read_table(self, paths: list[str]) -> pd.DataFrame: | ||
# Read and concatenate multiple tables (e.g., expense_01.csv, expense_02.csv) | ||
tables = [pd.read_csv(path) for path in paths] | ||
self.table = pd.concat(tables) | ||
return self.table | ||
|
||
def map_columns(self) -> None: | ||
"""Refine the mapping and selection of columns specific to Texas contributions.""" | ||
# Call the parent implementation of map_columns which automatically handles the mapping and filtering based on 'column_mapper' and 'required_columns' | ||
super().map_columns() | ||
self.table["DONOR_ID"] = self.table["DONOR_ID"].astype("str") | ||
return | ||
|
||
|
||
class FilerForm(Form): | ||
def __init__(self, column_mapper=None): | ||
required_columns = [ | ||
"RECIPIENT_ID", | ||
"RECIPIENT_TYPE", | ||
"RECIPIENT", | ||
"RECIPIENT_OFFICE", | ||
"RECIPIENT_PARTY", | ||
] | ||
|
||
super().__init__(required_columns, column_mapper) | ||
|
||
def read_table(self, paths: list[str]) -> pd.DataFrame: | ||
# Read and concatenate multiple tables (e.g., filer_01.csv, filer_02.csv) | ||
try: | ||
tables = [pd.read_csv(path) for path in paths] | ||
self.table = pd.concat(tables) | ||
except Exception as e: | ||
print(f"Error reading table: {e}") | ||
return self.table | ||
|
||
def map_columns(self) -> None: | ||
"""Refine the mapping and selection of columns specific to Texas contributions.""" | ||
super().map_columns() | ||
self.table["RECIPIENT_ID"] = self.table["RECIPIENT_ID"].astype("str") | ||
self.table.drop_duplicates(subset=["RECIPIENT_ID"], inplace=True) | ||
return | ||
|
||
|
||
class TexasContributionForm(ContributionForm): | ||
def __init__(self): | ||
column_mapper = {"filerIdent": "RECIPIENT_ID", "contributionAmount": "AMOUNT"} | ||
|
||
super().__init__(column_mapper) | ||
|
||
def type_classifier(self, PersentTypeCd: str) -> str: | ||
return "Individual" if PersentTypeCd.lower() == "individual" else "Organization" | ||
|
||
def get_additional_columns(self) -> None: | ||
"""Enhance and prepare the dataset for final output.""" | ||
self.table["RECIPIENT_TYPE"] = None | ||
self.table["RECIPIENT_TYPE"] = self.table["contributorPersentTypeCd"].apply( | ||
self.type_classifier | ||
) | ||
|
||
self.table["DONOR"] = self.table.apply( | ||
lambda row: f"{row['contributorNameLast']}, {row['contributorNameFirst']}" | ||
if "contributorPersentTypeCd" in row | ||
and row["contributorPersentTypeCd"] == "INDIVIDUAL" | ||
else row.get("contributorNameOrganization", ""), | ||
axis=1, | ||
) | ||
self.table["YEAR"] = pd.to_datetime(self.table["contributionDt"]).dt.year | ||
self.table["PURPOSE"] = pd.NA | ||
return | ||
|
||
def preprocess_data(self) -> None: | ||
self.get_additional_columns() | ||
self.map_columns() | ||
return | ||
|
||
|
||
class TexasFilerForm(FilerForm): | ||
def __init__(self): | ||
column_mapper = { | ||
"filerIdent": "RECIPIENT_ID", | ||
# "filerName": "RECIPIENT", | ||
# "filerTypeCd": "RECIPIENT_TYPE", | ||
} | ||
|
||
super().__init__(column_mapper=column_mapper) | ||
|
||
def get_additional_columns(self) -> None: | ||
self.table["RECIPIENT_TYPE"] = self.table.filerTypeCd.map( | ||
const.PA_FILER_ABBREV_DICT | ||
) | ||
self.table["RECIPIENT"] = self.table["filerName"].apply( | ||
lambda x: str(x).title() | ||
) | ||
self.table["RECIPIENT_OFFICE"] = pd.NA | ||
self.table["RECIPIENT_PARTY"] = pd.NA | ||
|
||
def preprocess_data(self) -> None: | ||
"""Preprocess additional data if necessary.""" | ||
self.get_additional_columns() | ||
self.map_columns() | ||
return | ||
|
||
|
||
class TexasExpenseForm(ExpenseForm): | ||
def __init__(self): | ||
column_mapper = {"expendAmount": "AMOUNT", "filerIdent": "DONOR_ID"} | ||
super().__init__(column_mapper=column_mapper) | ||
|
||
def get_additional_columns(self) -> None: | ||
self.table["RECIPIENT"] = self.table.apply( | ||
lambda row: row["payeeNameLast"] + row["payeeNameFirst"] | ||
if row["payeePersentTypeCd"] == "INDIVIDUAL" | ||
else row["payeeNameOrganization"], | ||
axis=1, | ||
) | ||
self.table["YEAR"] = pd.to_datetime(self.table["expendDt"]).dt.year | ||
self.table["PURPOSE"] = pd.NA | ||
|
||
def preprocess_data(self) -> None: | ||
"""Preprocess additional data if necessary.""" | ||
self.get_additional_columns() | ||
self.map_columns() | ||
return |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
|
||
/** | ||
* The `standardized_df` is a list of essential columns that should be present | ||
* for the Penn and Texas tables in the transformmer. | ||
* These columns represent the standardized data fields for transformer instance for Penn and Texas. | ||
* | ||
* The columns in `standardized_df` are as follows: | ||
* - DONOR: The name of the donor. | ||
* - DONOR_ID: The unique identifier of the donor. | ||
* - DONOR_PARTY: The political party affiliation of the donor. | ||
* - DONOR_TYPE: The type of the donor (individual, organization, etc.). | ||
* - RECIPIENT: The name of the recipient. | ||
* - RECIPIENT_ID: The unique identifier of the recipient. | ||
* - RECIPIENT_PARTY: The political party affiliation of the recipient. | ||
* - RECIPIENT_TYPE: The type of the recipient (individual, organization, etc.). | ||
* - AMOUNT: The amount of the donation. | ||
* - DONOR_OFFICE: The office held by the donor (if applicable). | ||
* - PURPOSE: The purpose of the donation. | ||
* - RECIPIENT_OFFICE: The office held by the recipient (if applicable). | ||
* - YEAR: The year of the transaction. | ||
* - TRANSACTION_ID: The unique identifier of the transaction. | ||
*/ | ||
standardized_df = [ | ||
"DONOR", | ||
"DONOR_ID", | ||
"DONOR_PARTY", | ||
"DONOR_TYPE", | ||
"RECIPIENT", | ||
"RECIPIENT_ID", | ||
"RECIPIENT_PARTY", | ||
"RECIPIENT_TYPE", | ||
"AMOUNT", | ||
"DONOR_OFFICE", | ||
"PURPOSE", | ||
"RECIPIENT_OFFICE", | ||
"YEAR", | ||
"TRANSACTION_ID", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
google style docstring, describe 'paths'