Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Processing Steps Logging #1317

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 195 additions & 48 deletions dcpy/lifecycle/ingest/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Callable, Literal

from dcpy.models import file
from dcpy.models.base import SortedSerializedBase
from dcpy.models.lifecycle.ingest import ProcessingStep, Column
from dcpy.utils import data, introspect
from dcpy.utils.geospatial import transform, parquet as geoparquet
Expand All @@ -14,6 +15,37 @@
OUTPUT_GEOM_COLUMN = "geom"


class ProcessingSummary(SortedSerializedBase):
"""Summary of the changes from a data processing function."""

description: str
row_modifications: dict = {}
column_modifications: dict = {}
custom: dict = {}


class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True):
df: pd.DataFrame
summary: ProcessingSummary


def make_generic_change_stats(
before: pd.DataFrame, after: pd.DataFrame, *, description: str
) -> ProcessingSummary:
"""Generate a ProcessingSummary by comparing two dataframes before and after processing."""
initial_columns = set(before.columns)
final_columns = set(after.columns)

return ProcessingSummary(
description=description,
row_modifications={"count_before": len(before), "count_after": len(after)},
column_modifications={
"added": list(final_columns - initial_columns),
"removed": list(initial_columns - final_columns),
},
)


def to_parquet(
file_format: file.Format,
local_data_path: Path,
Expand Down Expand Up @@ -71,46 +103,94 @@ class ProcessingFunctions:

def __init__(self, dataset_id: str):
self.dataset_id = dataset_id
self._REPROJECTION_DESCRIPTION_PREFIX = "Reprojected geometries"
self._REPROJECTION_NOT_REQUIRED_DESCRIPTION = (
"No reprojection required, as source and target crs are the same."
)
self._SORTED_BY_COLUMNS_DESCRIPTION_PREFIX = "Sorted by columns"

def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> ProcessingResult:
starting_crs = df.crs.to_string()
needs_reproject = starting_crs != target_crs
result = (
transform.reproject_gdf(df, target_crs=target_crs)
if needs_reproject
else df
)
return ProcessingResult(
df=result,
summary=ProcessingSummary(
row_modifications={"modified": len(df)} if needs_reproject else {},
description=f"{self._REPROJECTION_DESCRIPTION_PREFIX} from {starting_crs} to {target_crs}"
if needs_reproject
else self._REPROJECTION_NOT_REQUIRED_DESCRIPTION,
),
)

def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> gpd.GeoDataFrame:
return transform.reproject_gdf(df, target_crs=target_crs)

def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> pd.DataFrame:
sorted = df.sort_values(by=by, ascending=ascending)
return sorted.reset_index(drop=True)
def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> ProcessingResult:
sorted = df.sort_values(by=by, ascending=ascending).reset_index(drop=True)
summary = make_generic_change_stats(
df,
sorted,
description=f"{self._SORTED_BY_COLUMNS_DESCRIPTION_PREFIX}: {', '.join(by)}",
)
summary.row_modifications["modified"] = len(df) if not sorted.equals(df) else 0
return ProcessingResult(df=sorted, summary=summary)

def filter_rows(
self,
df: pd.DataFrame,
type: Literal["equals", "contains"],
column_name: str | int,
val: str | int,
) -> pd.DataFrame:
) -> ProcessingResult:
if type == "contains":
filter = df[column_name].str.contains(str(val))
else:
filter = df[column_name] == val
filtered = df[filter]
return filtered.reset_index(drop=True)
filtered = df[filter].reset_index(drop=True)
return ProcessingResult(
df=filtered,
summary=ProcessingSummary(
description="Filtered Rows",
row_modifications={"removed": len(df) - len(filtered)},
),
)

def rename_columns(
self, df: pd.DataFrame, map: dict[str, str], drop_others=False
) -> pd.DataFrame:
self, df: pd.DataFrame, drop_others=False, **kwargs
) -> ProcessingResult:
assert "map" in kwargs, "map must be supplied to rename_columns"
col_map: dict[str, str] = kwargs[
"map"
] # doing this to avoid shadowing the builtin `map` fn
renamed = df.copy()
if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in map:
renamed.rename_geometry(map.pop(renamed.geometry.name), inplace=True)
renamed = renamed.rename(columns=map, errors="raise")
if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in col_map:
renamed.rename_geometry(col_map.pop(renamed.geometry.name), inplace=True)
renamed = renamed.rename(columns=col_map, errors="raise")
removed_cols = []
if drop_others:
renamed = renamed[list(map.values())]
return renamed
renamed = renamed[list(col_map.values())]
removed_cols = [
col
for col in (set(df.columns) - set(renamed.columns))
if col not in col_map
]
return ProcessingResult(
df=renamed,
summary=ProcessingSummary(
description="Renamed columns",
column_modifications={"renamed": col_map, "removed": removed_cols},
),
)

def clean_column_names(
self,
df: pd.DataFrame,
*,
replace: dict[str, str] | None = None,
lower: bool = False,
) -> pd.DataFrame:
) -> ProcessingResult:
cleaned = df.copy()
replace = replace or {}
columns = list(cleaned.columns)
Expand All @@ -119,22 +199,43 @@ def clean_column_names(
if lower:
columns = [c.lower() for c in columns]
cleaned.columns = pd.Index(columns)
return cleaned
renamed_cols = {old: new for old, new in zip(df.columns, columns) if old != new}
return ProcessingResult(
df=cleaned,
summary=ProcessingSummary(
description="Cleaned column names",
column_modifications={"renamed": renamed_cols},
),
)

def update_column(
self,
df: pd.DataFrame,
column_name: str,
val: str | int,
) -> pd.DataFrame:
) -> ProcessingResult:
updated = df.copy()
updated[column_name] = val
return updated
return ProcessingResult(
df=updated,
summary=ProcessingSummary(
description=f"Updated column '{column_name}' with value '{val}'",
row_modifications={"updated": len(df)}, # assume we modified all rows
),
)

def append_prev(self, df: pd.DataFrame, version: str = "latest") -> pd.DataFrame:
def append_prev(
self, df: pd.DataFrame, version: str = "latest"
) -> ProcessingResult:
prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version))
appended = pd.concat((prev_df, df))
return appended.reset_index(drop=True)
appended = appended.reset_index(drop=True)
summary = ProcessingSummary(
description=f"Appended rows from previous version: {version}",
custom={"previous_version": version},
row_modifications={"added": len(prev_df)},
)
return ProcessingResult(df=appended, summary=summary)

def upsert_column_of_previous_version(
self,
Expand All @@ -143,44 +244,76 @@ def upsert_column_of_previous_version(
version: str = "latest",
insert_behavior: Literal["allow", "ignore", "error"] = "allow",
missing_key_behavior: Literal["null", "coalesce", "error"] = "error",
) -> pd.DataFrame:
) -> ProcessingResult:
assert key, "Must provide non-empty list of columns to be used as keys"
prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version))
df_initial_cols = set(df.columns)
df = data.upsert_df_columns(
prev_df,
df,
key=key,
insert_behavior=insert_behavior,
missing_key_behavior=missing_key_behavior,
)
return df
summary = make_generic_change_stats(prev_df, df, description="Upserted columns")
# print(set(prev_df.columns))
# print(set(df.columns))
summary = ProcessingSummary(
description="Appended rows",
custom={
"previous_version": version,
},
column_modifications={
"added": sorted(list(set(prev_df.columns) - df_initial_cols))
},
)
return ProcessingResult(df=df, summary=summary)

def deduplicate(
self,
df: pd.DataFrame,
sort_columns: list[str] | None = None,
sort_ascending: bool = True,
by: list[str] | None = None,
) -> pd.DataFrame:
) -> ProcessingResult:
deduped = df.copy()
if sort_columns:
deduped = deduped.sort_values(by=sort_columns, ascending=sort_ascending)
deduped = deduped.drop_duplicates(by)
return deduped.reset_index(drop=True)
deduped = deduped.drop_duplicates(by).reset_index(drop=True)
summary = ProcessingSummary(
description="Removed duplicates",
row_modifications={"dropped": len(df) - len(deduped)},
)
return ProcessingResult(df=deduped, summary=summary)

def drop_columns(self, df: pd.DataFrame, columns: list[str | int]) -> pd.DataFrame:
def drop_columns(
self, df: pd.DataFrame, columns: list[str | int]
) -> ProcessingResult:
columns = [df.columns[i] if isinstance(i, int) else i for i in columns]
return df.drop(columns, axis=1)
result = df.drop(columns, axis=1)
summary = ProcessingSummary(
description="Dropped columns", column_modifications={"dropped": columns}
)
return ProcessingResult(df=result, summary=summary)

def strip_columns(
self, df: pd.DataFrame, cols: list[str] | None = None
) -> pd.DataFrame:
) -> ProcessingResult:
stripped = df.copy()
if cols:
for col in cols:
df[col] = df[col].str.strip()
stripped[col] = stripped[col].str.strip()
else:
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
return df
stripped = stripped.apply(
lambda x: x.str.strip() if x.dtype == "object" else x
)
summary = ProcessingSummary(
description="Stripped Whitespace",
row_modifications={
"modified": len(stripped.compare(df))
}, # TODO: evaluate if this is performant
)
return ProcessingResult(df=stripped, summary=summary)

def coerce_column_types(
self,
Expand All @@ -189,7 +322,7 @@ def coerce_column_types(
str, Literal["numeric", "integer", "bigint", "string", "date", "datetime"]
],
errors: Literal["raise", "coerce"] = "raise",
):
) -> ProcessingResult:
def to_str(obj):
if isinstance(obj, int):
return str(obj)
Expand All @@ -200,31 +333,39 @@ def to_str(obj):
else:
return str(obj)

df = df.copy()
result = df.copy()
for column in column_types:
match column_types[column]:
case "numeric":
df[column] = pd.to_numeric(df[column], errors=errors)
result[column] = pd.to_numeric(result[column], errors=errors)
case "integer" | "bigint" as t:
mapping = {"integer": "Int32", "bigint": "Int64"}
df[column] = pd.array(df[column], dtype=mapping[t]) # type: ignore
result[column] = pd.array(result[column], dtype=mapping[t]) # type: ignore
case "string":
df[column] = df[column].apply(to_str)
result[column] = result[column].apply(to_str)
case "date":
df[column] = pd.to_datetime(df[column], errors=errors).dt.date
df[column] = df[column].replace(pd.NaT, None) # type: ignore
result[column] = pd.to_datetime(
result[column], errors=errors
).dt.date
result[column] = result[column].replace(pd.NaT, None) # type: ignore
case "datetime":
df[column] = pd.to_datetime(df[column], errors=errors)
df[column] = df[column].replace(pd.NaT, None) # type: ignore
return df
result[column] = pd.to_datetime(result[column], errors=errors)
result[column] = result[column].replace(pd.NaT, None) # type: ignore
summary = make_generic_change_stats(
df, result, description="Coerced column types"
)
return ProcessingResult(df=result, summary=summary)

def multi(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
def multi(self, df: gpd.GeoDataFrame) -> ProcessingResult:
multi_gdf = df.copy()
multi_gdf.set_geometry(
gpd.GeoSeries([transform.multi(feature) for feature in multi_gdf.geometry]),
inplace=True,
)
return multi_gdf
summary = make_generic_change_stats(
df, multi_gdf, description="Converted geometries"
)
return ProcessingResult(df=multi_gdf, summary=summary)

def pd_series_func(
self,
Expand All @@ -234,7 +375,7 @@ def pd_series_func(
output_column_name: str | None = None,
geo: bool = False, # only used for validation
**kwargs,
) -> pd.DataFrame:
) -> ProcessingResult:
"""
Operates on a given column using a given pandas Series function and supplied kwargs

Expand Down Expand Up @@ -265,8 +406,14 @@ def pd_series_func(
func = transformed[column_name]
for part in parts:
func = func.__getattribute__(part)

transformed[output_column_name or column_name] = func(**kwargs) # type: ignore
return transformed
summary = make_generic_change_stats(
df,
transformed,
description=f"Applied {function_name} to column {column_name}",
)
return ProcessingResult(df=transformed, summary=summary)


def validate_pd_series_func(
Expand Down Expand Up @@ -352,7 +499,7 @@ def process(
compiled_steps = validate_processing_steps(dataset_id, processing_steps)

for step in compiled_steps:
df = step(df)
df = step(df).df

validate_columns(df, expected_columns)

Expand Down
Loading
Loading