From 96c817c0ac49bc28815496ba055fafad836c54a4 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Thu, 12 Dec 2024 13:31:05 -0500 Subject: [PATCH 1/3] Add very basic wrapper around proc func outputs --- dcpy/lifecycle/ingest/transform.py | 166 ++++++++++++++----- dcpy/test/lifecycle/ingest/test_transform.py | 50 +++--- 2 files changed, 149 insertions(+), 67 deletions(-) diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index aea3f89d3..7e7e53bee 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -5,6 +5,7 @@ from typing import Callable, Literal from dcpy.models import file +from dcpy.models.base import SortedSerializedBase from dcpy.models.lifecycle.ingest import ProcessingStep, Column from dcpy.utils import data, introspect from dcpy.utils.geospatial import transform, parquet as geoparquet @@ -14,6 +15,36 @@ OUTPUT_GEOM_COLUMN = "geom" +class ProcessingSummary(SortedSerializedBase): + """Summary of the changes from a data processing function.""" + + description: str | None = None + row_modifications: dict + column_modifications: dict + + +class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True): + df: pd.DataFrame + summary: ProcessingSummary + + +def make_generic_change_stats( + before: pd.DataFrame, after: pd.DataFrame, *, description: str | None +) -> ProcessingSummary: + """Generate a ProcessingSummary by comparing two dataframes before and after processing.""" + initial_columns = set(before.columns) + final_columns = set(after.columns) + + return ProcessingSummary( + description=description, + row_modifications={"count_before": len(before), "count_after": len(after)}, + column_modifications={ + "added": list(final_columns - initial_columns), + "removed": list(initial_columns - final_columns), + }, + ) + + def to_parquet( file_format: file.Format, local_data_path: Path, @@ -72,12 +103,19 @@ class ProcessingFunctions: def __init__(self, dataset_id: str): self.dataset_id = dataset_id - def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> gpd.GeoDataFrame: - return transform.reproject_gdf(df, target_crs=target_crs) + def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> ProcessingResult: + result = transform.reproject_gdf(df, target_crs=target_crs) + summary = make_generic_change_stats( + df, result, description=f"Reprojected geometries to {target_crs}" + ) + return ProcessingResult(df=result, summary=summary) - def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> pd.DataFrame: - sorted = df.sort_values(by=by, ascending=ascending) - return sorted.reset_index(drop=True) + def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> ProcessingResult: + sorted = df.sort_values(by=by, ascending=ascending).reset_index(drop=True) + summary = make_generic_change_stats( + df, sorted, description=f"Sorted by columns: {', '.join(by)}" + ) + return ProcessingResult(df=sorted, summary=summary) def filter_rows( self, @@ -85,24 +123,30 @@ def filter_rows( type: Literal["equals", "contains"], column_name: str | int, val: str | int, - ) -> pd.DataFrame: + ) -> ProcessingResult: if type == "contains": filter = df[column_name].str.contains(str(val)) else: filter = df[column_name] == val - filtered = df[filter] - return filtered.reset_index(drop=True) + filtered = df[filter].reset_index(drop=True) + summary = make_generic_change_stats(df, filtered, description="Filtered rows") + return ProcessingResult(df=filtered, summary=summary) def rename_columns( self, df: pd.DataFrame, map: dict[str, str], drop_others=False - ) -> pd.DataFrame: + ) -> ProcessingResult: renamed = df.copy() if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in map: renamed.rename_geometry(map.pop(renamed.geometry.name), inplace=True) renamed = renamed.rename(columns=map, errors="raise") if drop_others: renamed = renamed[list(map.values())] - return renamed + summary = make_generic_change_stats( + df, + renamed, + description=("Renamed columns"), + ) + return ProcessingResult(df=renamed, summary=summary) def clean_column_names( self, @@ -110,7 +154,7 @@ def clean_column_names( *, replace: dict[str, str] | None = None, lower: bool = False, - ) -> pd.DataFrame: + ) -> ProcessingResult: cleaned = df.copy() replace = replace or {} columns = list(cleaned.columns) @@ -119,22 +163,32 @@ def clean_column_names( if lower: columns = [c.lower() for c in columns] cleaned.columns = pd.Index(columns) - return cleaned + summary = make_generic_change_stats( + df, cleaned, description="Cleaned column names" + ) + return ProcessingResult(df=cleaned, summary=summary) def update_column( self, df: pd.DataFrame, column_name: str, val: str | int, - ) -> pd.DataFrame: + ) -> ProcessingResult: updated = df.copy() updated[column_name] = val - return updated + summary = make_generic_change_stats(df, updated, description="Updated columns") + return ProcessingResult(df=updated, summary=summary) - def append_prev(self, df: pd.DataFrame, version: str = "latest") -> pd.DataFrame: + def append_prev( + self, df: pd.DataFrame, version: str = "latest" + ) -> ProcessingResult: prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version)) appended = pd.concat((prev_df, df)) - return appended.reset_index(drop=True) + appended = appended.reset_index(drop=True) + summary = make_generic_change_stats( + prev_df, appended, description="Appended rows" + ) + return ProcessingResult(df=appended, summary=summary) def upsert_column_of_previous_version( self, @@ -143,7 +197,7 @@ def upsert_column_of_previous_version( version: str = "latest", insert_behavior: Literal["allow", "ignore", "error"] = "allow", missing_key_behavior: Literal["null", "coalesce", "error"] = "error", - ) -> pd.DataFrame: + ) -> ProcessingResult: assert key, "Must provide non-empty list of columns to be used as keys" prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version)) df = data.upsert_df_columns( @@ -153,7 +207,8 @@ def upsert_column_of_previous_version( insert_behavior=insert_behavior, missing_key_behavior=missing_key_behavior, ) - return df + summary = make_generic_change_stats(prev_df, df, description="Upserted columns") + return ProcessingResult(df=df, summary=summary) def deduplicate( self, @@ -161,26 +216,39 @@ def deduplicate( sort_columns: list[str] | None = None, sort_ascending: bool = True, by: list[str] | None = None, - ) -> pd.DataFrame: + ) -> ProcessingResult: deduped = df.copy() if sort_columns: deduped = deduped.sort_values(by=sort_columns, ascending=sort_ascending) - deduped = deduped.drop_duplicates(by) - return deduped.reset_index(drop=True) + deduped = deduped.drop_duplicates(by).reset_index(drop=True) + summary = make_generic_change_stats( + df, deduped, description="Removed duplicates" + ) + return ProcessingResult(df=deduped, summary=summary) - def drop_columns(self, df: pd.DataFrame, columns: list[str | int]) -> pd.DataFrame: + def drop_columns( + self, df: pd.DataFrame, columns: list[str | int] + ) -> ProcessingResult: columns = [df.columns[i] if isinstance(i, int) else i for i in columns] - return df.drop(columns, axis=1) + result = df.drop(columns, axis=1) + summary = make_generic_change_stats(df, result, description="Dropped columns") + return ProcessingResult(df=result, summary=summary) def strip_columns( self, df: pd.DataFrame, cols: list[str] | None = None - ) -> pd.DataFrame: + ) -> ProcessingResult: + stripped = df.copy() if cols: for col in cols: - df[col] = df[col].str.strip() + stripped[col] = stripped[col].str.strip() else: - df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x) - return df + stripped = stripped.apply( + lambda x: x.str.strip() if x.dtype == "object" else x + ) + summary = make_generic_change_stats( + df, stripped, description="Stripped whitespace" + ) + return ProcessingResult(df=stripped, summary=summary) def coerce_column_types( self, @@ -189,7 +257,7 @@ def coerce_column_types( str, Literal["numeric", "integer", "bigint", "string", "date", "datetime"] ], errors: Literal["raise", "coerce"] = "raise", - ): + ) -> ProcessingResult: def to_str(obj): if isinstance(obj, int): return str(obj) @@ -200,31 +268,39 @@ def to_str(obj): else: return str(obj) - df = df.copy() + result = df.copy() for column in column_types: match column_types[column]: case "numeric": - df[column] = pd.to_numeric(df[column], errors=errors) + result[column] = pd.to_numeric(result[column], errors=errors) case "integer" | "bigint" as t: mapping = {"integer": "Int32", "bigint": "Int64"} - df[column] = pd.array(df[column], dtype=mapping[t]) # type: ignore + result[column] = pd.array(result[column], dtype=mapping[t]) # type: ignore case "string": - df[column] = df[column].apply(to_str) + result[column] = result[column].apply(to_str) case "date": - df[column] = pd.to_datetime(df[column], errors=errors).dt.date - df[column] = df[column].replace(pd.NaT, None) # type: ignore + result[column] = pd.to_datetime( + result[column], errors=errors + ).dt.date + result[column] = result[column].replace(pd.NaT, None) # type: ignore case "datetime": - df[column] = pd.to_datetime(df[column], errors=errors) - df[column] = df[column].replace(pd.NaT, None) # type: ignore - return df + result[column] = pd.to_datetime(result[column], errors=errors) + result[column] = result[column].replace(pd.NaT, None) # type: ignore + summary = make_generic_change_stats( + df, result, description="Coerced column types" + ) + return ProcessingResult(df=result, summary=summary) - def multi(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + def multi(self, df: gpd.GeoDataFrame) -> ProcessingResult: multi_gdf = df.copy() multi_gdf.set_geometry( gpd.GeoSeries([transform.multi(feature) for feature in multi_gdf.geometry]), inplace=True, ) - return multi_gdf + summary = make_generic_change_stats( + df, multi_gdf, description="Converted geometries" + ) + return ProcessingResult(df=multi_gdf, summary=summary) def pd_series_func( self, @@ -234,7 +310,7 @@ def pd_series_func( output_column_name: str | None = None, geo: bool = False, # only used for validation **kwargs, - ) -> pd.DataFrame: + ) -> ProcessingResult: """ Operates on a given column using a given pandas Series function and supplied kwargs @@ -265,8 +341,14 @@ def pd_series_func( func = transformed[column_name] for part in parts: func = func.__getattribute__(part) + transformed[output_column_name or column_name] = func(**kwargs) # type: ignore - return transformed + summary = make_generic_change_stats( + df, + transformed, + description=f"Applied {function_name} to column {column_name}", + ) + return ProcessingResult(df=transformed, summary=summary) def validate_pd_series_func( @@ -352,7 +434,7 @@ def process( compiled_steps = validate_processing_steps(dataset_id, processing_steps) for step in compiled_steps: - df = step(df) + df = step(df).df validate_columns(df, expected_columns) diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index 48ff2a0bc..e8b125f6b 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -107,7 +107,7 @@ def test_validate_processing_steps(): } ).set_geometry("col3") for step in compiled_steps: - df = step(df) + df = step(df).df expected = gpd.GeoDataFrame( {"col3": gpd.GeoSeries([None, None, None])} ).set_geometry("col3") @@ -202,60 +202,60 @@ def test_reproject(self): assert self.gdf.crs.to_string() == "EPSG:4326" target = "EPSG:2263" reprojected = self.proc.reproject(self.gdf, target_crs=target) - assert reprojected.crs.to_string() == target + assert reprojected.df.crs.to_string() == target def test_sort(self): sorted = self.proc.sort(self.basic_df, by=["a"]) expected = pd.DataFrame({"a": [1, 2, 3], "b": ["c_3", "b_1", "b_2"]}) - assert sorted.equals(expected) + assert sorted.df.equals(expected) def test_filter_rows_equals(self): filtered = self.proc.filter_rows( self.basic_df, type="equals", column_name="a", val=1 ) expected = pd.DataFrame({"a": [1], "b": ["c_3"]}) - assert filtered.equals(expected) + assert filtered.df.equals(expected) def test_filter_rows_contains(self): filtered = self.proc.filter_rows( self.basic_df, type="contains", column_name="b", val="b_" ) expected = pd.DataFrame({"a": [2, 3], "b": ["b_1", "b_2"]}) - assert filtered.equals(expected) + assert filtered.df.equals(expected) def test_rename_columns(self): - renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}) + renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}).df expected = pd.DataFrame({"c": [2, 3, 1], "b": ["b_1", "b_2", "c_3"]}) assert renamed.equals(expected) def test_rename_columns_drop(self): renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}, drop_others=True) expected = pd.DataFrame({"c": [2, 3, 1]}) - assert renamed.equals(expected) + assert renamed.df.equals(expected) def test_clean_column_names(self): cleaned = self.proc.clean_column_names(self.messy_names_df, replace={"_": "-"}) expected = pd.DataFrame({"Column": [1, 2], "Two-Words": [3, 4]}) - assert cleaned.equals(expected) + assert cleaned.df.equals(expected) def test_clean_column_names_lower(self): cleaned = self.proc.clean_column_names( self.messy_names_df, replace={"_": "-"}, lower=True ) expected = pd.DataFrame({"column": [1, 2], "two-words": [3, 4]}) - assert cleaned.equals(expected) + assert cleaned.df.equals(expected) def test_update_column(self): updated = self.proc.update_column(self.basic_df, column_name="a", val=5) expected = pd.DataFrame({"a": [5, 5, 5], "b": ["b_1", "b_2", "c_3"]}) - assert updated.equals(expected) + assert updated.df.equals(expected) @mock.patch("dcpy.connectors.edm.recipes.read_df") def test_append_prev(self, read_df): read_df.return_value = self.prev_df appended = self.proc.append_prev(self.basic_df) expected = pd.DataFrame({"a": [-1, 2, 3, 1], "b": ["z", "b_1", "b_2", "c_3"]}) - assert appended.equals(expected) + assert appended.df.equals(expected) @mock.patch("dcpy.connectors.edm.recipes.read_df") def test_upsert_column_of_previous_version(self, read_df): @@ -264,35 +264,35 @@ def test_upsert_column_of_previous_version(self, read_df): expected = pd.DataFrame( {"a": [3, 2, 1], "b": ["b_2", "b_1", "c_3"], "c": [True, False, True]} ) - assert upserted.equals(expected) + assert upserted.df.equals(expected) def test_deduplicate(self): deduped = self.proc.deduplicate(self.dupe_df) expected = pd.DataFrame({"a": [1, 1, 2], "b": [3, 1, 2]}) - assert deduped.equals(expected) + assert deduped.df.equals(expected) def test_deduplicate_by(self): deduped = self.proc.deduplicate(self.dupe_df, by="a") expected = pd.DataFrame({"a": [1, 2], "b": [3, 2]}) - assert deduped.equals(expected) + assert deduped.df.equals(expected) def test_deduplicate_by_sort(self): deduped = self.proc.deduplicate(self.dupe_df, by="a", sort_columns="b") expected = pd.DataFrame({"a": [1, 2], "b": [1, 2]}) - assert deduped.equals(expected) + assert deduped.df.equals(expected) def test_drop_columns(self): dropped = self.proc.drop_columns(self.basic_df, columns=["b"]) expected = pd.DataFrame({"a": [2, 3, 1]}) - assert dropped.equals(expected) + assert dropped.df.equals(expected) def test_strip_columns(self): stripped = self.proc.strip_columns(self.whitespace_df, ["b"]) - assert stripped.equals(self.basic_df) + assert stripped.df.equals(self.basic_df) def test_strip_all_columns(self): stripped = self.proc.strip_columns(self.whitespace_df) - assert stripped.equals(self.basic_df) + assert stripped.df.equals(self.basic_df) @pytest.mark.parametrize( "original_column, cast, errors, expected_column", @@ -335,14 +335,14 @@ def test_coerce_column_type(self, original_column, cast, errors, expected_column coerced = self.proc.coerce_column_types( self.coerce_df, {original_column: cast}, errors=errors ) - assert coerced[original_column].equals(self.coerce_df[expected_column]) + assert coerced.df[original_column].equals(self.coerce_df[expected_column]) def test_pd_series_func(self): transformed = self.proc.pd_series_func( self.basic_df, column_name="b", function_name="map", arg={"b_1": "c_1"} ) expected = pd.DataFrame({"a": [2, 3, 1], "b": ["c_1", np.nan, np.nan]}) - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_pd_series_func_str(self): transformed = self.proc.pd_series_func( @@ -353,7 +353,7 @@ def test_pd_series_func_str(self): repl="B-", ) expected = pd.DataFrame({"a": [2, 3, 1], "b": ["B-1", "B-2", "c_3"]}) - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_gpd_series_func(self): gdf = gpd.GeoDataFrame( @@ -365,7 +365,7 @@ def test_gpd_series_func(self): transformed = self.proc.pd_series_func( gdf, column_name="wkt", function_name="force_2d", geo=True ) - assert transformed.equals( + assert transformed.df.equals( gpd.GeoDataFrame( { "a": [1, 2], @@ -386,9 +386,9 @@ def test_rename_geodataframe(self): transformed: gpd.GeoDataFrame = self.proc.rename_columns( self.gdf, map={"wkt": "geom"} ) - assert transformed.active_geometry_name == "geom" + assert transformed.df.active_geometry_name == "geom" expected = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "renamed.parquet") - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_multi(self): gdf = gpd.GeoDataFrame( @@ -426,7 +426,7 @@ def test_multi(self): ), } ) - assert transformed.equals(expected) + assert transformed.df.equals(expected) def test_processing_no_steps(create_temp_filesystem: Path): From 2794900ae7d77d5611b99c5326c4e13881c9446b Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Thu, 12 Dec 2024 15:08:45 -0500 Subject: [PATCH 2/3] WIP: fill out ProcessingSummary for proc funcs --- dcpy/lifecycle/ingest/transform.py | 44 ++++++++++++++------ dcpy/test/lifecycle/ingest/test_transform.py | 27 ++++++++++++ 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index 7e7e53bee..795bbb3c0 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -18,9 +18,9 @@ class ProcessingSummary(SortedSerializedBase): """Summary of the changes from a data processing function.""" - description: str | None = None - row_modifications: dict - column_modifications: dict + description: str + row_modifications: dict = {} + column_modifications: dict = {} class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True): @@ -29,7 +29,7 @@ class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True): def make_generic_change_stats( - before: pd.DataFrame, after: pd.DataFrame, *, description: str | None + before: pd.DataFrame, after: pd.DataFrame, *, description: str ) -> ProcessingSummary: """Generate a ProcessingSummary by comparing two dataframes before and after processing.""" initial_columns = set(before.columns) @@ -102,19 +102,39 @@ class ProcessingFunctions: def __init__(self, dataset_id: str): self.dataset_id = dataset_id + self._REPROJECTION_DESCRIPTION_PREFIX = "Reprojected geometries" + self._REPROJECTION_NOT_REQUIRED_DESCRIPTION = ( + "No reprojection required, as source and target crs are the same." + ) + self._SORTED_BY_COLUMNS_DESCRIPTION_PREFIX = "Sorted by columns" + def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> ProcessingResult: - result = transform.reproject_gdf(df, target_crs=target_crs) - summary = make_generic_change_stats( - df, result, description=f"Reprojected geometries to {target_crs}" + starting_crs = df.crs.to_string() + needs_reproject = starting_crs != target_crs + result = ( + transform.reproject_gdf(df, target_crs=target_crs) + if needs_reproject + else df + ) + return ProcessingResult( + df=result, + summary=ProcessingSummary( + row_modifications={"modified": len(df)} if needs_reproject else {}, + description=f"{self._REPROJECTION_DESCRIPTION_PREFIX} from {starting_crs} to {target_crs}" + if needs_reproject + else self._REPROJECTION_NOT_REQUIRED_DESCRIPTION, + ), ) - return ProcessingResult(df=result, summary=summary) def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> ProcessingResult: sorted = df.sort_values(by=by, ascending=ascending).reset_index(drop=True) summary = make_generic_change_stats( - df, sorted, description=f"Sorted by columns: {', '.join(by)}" + df, + sorted, + description=f"{self._SORTED_BY_COLUMNS_DESCRIPTION_PREFIX}: {', '.join(by)}", ) + summary.row_modifications["modified"] = len(df) if not sorted.equals(df) else 0 return ProcessingResult(df=sorted, summary=summary) def filter_rows( @@ -275,17 +295,17 @@ def to_str(obj): result[column] = pd.to_numeric(result[column], errors=errors) case "integer" | "bigint" as t: mapping = {"integer": "Int32", "bigint": "Int64"} - result[column] = pd.array(result[column], dtype=mapping[t]) # type: ignore + result[column] = pd.array(result[column], dtype=mapping[t]) # type: ignore case "string": result[column] = result[column].apply(to_str) case "date": result[column] = pd.to_datetime( result[column], errors=errors ).dt.date - result[column] = result[column].replace(pd.NaT, None) # type: ignore + result[column] = result[column].replace(pd.NaT, None) # type: ignore case "datetime": result[column] = pd.to_datetime(result[column], errors=errors) - result[column] = result[column].replace(pd.NaT, None) # type: ignore + result[column] = result[column].replace(pd.NaT, None) # type: ignore summary = make_generic_change_stats( df, result, description="Coerced column types" ) diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index e8b125f6b..3c049d043 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -203,12 +203,39 @@ def test_reproject(self): target = "EPSG:2263" reprojected = self.proc.reproject(self.gdf, target_crs=target) assert reprojected.df.crs.to_string() == target + assert ( + len(self.gdf) == reprojected.summary.row_modifications["modified"] + ), "All the rows should report as modified" + assert reprojected.summary.description.startswith( + self.proc._REPROJECTION_DESCRIPTION_PREFIX + ) + + def test_reproject_no_changes(self): + starting_crs = self.gdf.crs.to_string() + assert starting_crs == "EPSG:4326" + reprojected = self.proc.reproject(self.gdf, target_crs=starting_crs) + assert reprojected.df.crs.to_string() == starting_crs + assert ( + not reprojected.summary.row_modifications + ), "No rows show report as modified, since the crs didn't change." + assert reprojected.summary.description.startswith( + self.proc._REPROJECTION_NOT_REQUIRED_DESCRIPTION + ) def test_sort(self): sorted = self.proc.sort(self.basic_df, by=["a"]) expected = pd.DataFrame({"a": [1, 2, 3], "b": ["c_3", "b_1", "b_2"]}) assert sorted.df.equals(expected) + # TODO: do we care how many rows are modified... can we tell? Maybe we just check whether it was sorted to begin with? + assert len(sorted.df) == sorted.summary.row_modifications["modified"] + + sorted_again = self.proc.sort(sorted.df, by=["a"]) + assert sorted.df.equals( + sorted_again.df + ), "nothing should have changed from re-sorting the df" + assert 0 == sorted_again.summary.row_modifications["modified"] + def test_filter_rows_equals(self): filtered = self.proc.filter_rows( self.basic_df, type="equals", column_name="a", val=1 From e86f03efed7ef946c987adee75c4d7b29f2cc0e0 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Tue, 17 Dec 2024 11:08:39 -0500 Subject: [PATCH 3/3] WIP --- dcpy/lifecycle/ingest/transform.py | 95 ++++++++++++++------ dcpy/test/lifecycle/ingest/test_transform.py | 71 +++++++++++++-- 2 files changed, 134 insertions(+), 32 deletions(-) diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index 795bbb3c0..343903184 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -21,6 +21,7 @@ class ProcessingSummary(SortedSerializedBase): description: str row_modifications: dict = {} column_modifications: dict = {} + custom: dict = {} class ProcessingResult(SortedSerializedBase, arbitrary_types_allowed=True): @@ -108,7 +109,6 @@ def __init__(self, dataset_id: str): ) self._SORTED_BY_COLUMNS_DESCRIPTION_PREFIX = "Sorted by columns" - def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> ProcessingResult: starting_crs = df.crs.to_string() needs_reproject = starting_crs != target_crs @@ -149,24 +149,40 @@ def filter_rows( else: filter = df[column_name] == val filtered = df[filter].reset_index(drop=True) - summary = make_generic_change_stats(df, filtered, description="Filtered rows") - return ProcessingResult(df=filtered, summary=summary) + return ProcessingResult( + df=filtered, + summary=ProcessingSummary( + description="Filtered Rows", + row_modifications={"removed": len(df) - len(filtered)}, + ), + ) def rename_columns( - self, df: pd.DataFrame, map: dict[str, str], drop_others=False + self, df: pd.DataFrame, drop_others=False, **kwargs ) -> ProcessingResult: + assert "map" in kwargs, "map must be supplied to rename_columns" + col_map: dict[str, str] = kwargs[ + "map" + ] # doing this to avoid shadowing the builtin `map` fn renamed = df.copy() - if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in map: - renamed.rename_geometry(map.pop(renamed.geometry.name), inplace=True) - renamed = renamed.rename(columns=map, errors="raise") + if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in col_map: + renamed.rename_geometry(col_map.pop(renamed.geometry.name), inplace=True) + renamed = renamed.rename(columns=col_map, errors="raise") + removed_cols = [] if drop_others: - renamed = renamed[list(map.values())] - summary = make_generic_change_stats( - df, - renamed, - description=("Renamed columns"), + renamed = renamed[list(col_map.values())] + removed_cols = [ + col + for col in (set(df.columns) - set(renamed.columns)) + if col not in col_map + ] + return ProcessingResult( + df=renamed, + summary=ProcessingSummary( + description="Renamed columns", + column_modifications={"renamed": col_map, "removed": removed_cols}, + ), ) - return ProcessingResult(df=renamed, summary=summary) def clean_column_names( self, @@ -183,10 +199,14 @@ def clean_column_names( if lower: columns = [c.lower() for c in columns] cleaned.columns = pd.Index(columns) - summary = make_generic_change_stats( - df, cleaned, description="Cleaned column names" + renamed_cols = {old: new for old, new in zip(df.columns, columns) if old != new} + return ProcessingResult( + df=cleaned, + summary=ProcessingSummary( + description="Cleaned column names", + column_modifications={"renamed": renamed_cols}, + ), ) - return ProcessingResult(df=cleaned, summary=summary) def update_column( self, @@ -196,8 +216,13 @@ def update_column( ) -> ProcessingResult: updated = df.copy() updated[column_name] = val - summary = make_generic_change_stats(df, updated, description="Updated columns") - return ProcessingResult(df=updated, summary=summary) + return ProcessingResult( + df=updated, + summary=ProcessingSummary( + description=f"Updated column '{column_name}' with value '{val}'", + row_modifications={"updated": len(df)}, # assume we modified all rows + ), + ) def append_prev( self, df: pd.DataFrame, version: str = "latest" @@ -205,8 +230,10 @@ def append_prev( prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version)) appended = pd.concat((prev_df, df)) appended = appended.reset_index(drop=True) - summary = make_generic_change_stats( - prev_df, appended, description="Appended rows" + summary = ProcessingSummary( + description=f"Appended rows from previous version: {version}", + custom={"previous_version": version}, + row_modifications={"added": len(prev_df)}, ) return ProcessingResult(df=appended, summary=summary) @@ -220,6 +247,7 @@ def upsert_column_of_previous_version( ) -> ProcessingResult: assert key, "Must provide non-empty list of columns to be used as keys" prev_df = recipes.read_df(recipes.Dataset(id=self.dataset_id, version=version)) + df_initial_cols = set(df.columns) df = data.upsert_df_columns( prev_df, df, @@ -228,6 +256,17 @@ def upsert_column_of_previous_version( missing_key_behavior=missing_key_behavior, ) summary = make_generic_change_stats(prev_df, df, description="Upserted columns") + # print(set(prev_df.columns)) + # print(set(df.columns)) + summary = ProcessingSummary( + description="Appended rows", + custom={ + "previous_version": version, + }, + column_modifications={ + "added": sorted(list(set(prev_df.columns) - df_initial_cols)) + }, + ) return ProcessingResult(df=df, summary=summary) def deduplicate( @@ -241,8 +280,9 @@ def deduplicate( if sort_columns: deduped = deduped.sort_values(by=sort_columns, ascending=sort_ascending) deduped = deduped.drop_duplicates(by).reset_index(drop=True) - summary = make_generic_change_stats( - df, deduped, description="Removed duplicates" + summary = ProcessingSummary( + description="Removed duplicates", + row_modifications={"dropped": len(df) - len(deduped)}, ) return ProcessingResult(df=deduped, summary=summary) @@ -251,7 +291,9 @@ def drop_columns( ) -> ProcessingResult: columns = [df.columns[i] if isinstance(i, int) else i for i in columns] result = df.drop(columns, axis=1) - summary = make_generic_change_stats(df, result, description="Dropped columns") + summary = ProcessingSummary( + description="Dropped columns", column_modifications={"dropped": columns} + ) return ProcessingResult(df=result, summary=summary) def strip_columns( @@ -265,8 +307,11 @@ def strip_columns( stripped = stripped.apply( lambda x: x.str.strip() if x.dtype == "object" else x ) - summary = make_generic_change_stats( - df, stripped, description="Stripped whitespace" + summary = ProcessingSummary( + description="Stripped Whitespace", + row_modifications={ + "modified": len(stripped.compare(df)) + }, # TODO: evaluate if this is performant ) return ProcessingResult(df=stripped, summary=summary) diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index 3c049d043..edee8c08d 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -179,7 +179,7 @@ class TestProcessors: whitespace_df = pd.DataFrame({"a": [2, 3, 1], "b": [" b_1 ", " b_2", "c_3 "]}) prev_df = pd.DataFrame({"a": [-1], "b": ["z"]}) upsert_df = pd.DataFrame( - {"a": [3, 2, 1], "b": ["d", "d", "d"], "c": [True, False, True]} + {"a": [3, 2, 1], "b": ["d", "d", "d"], "c": [True, False, True], "d": [1, 2, 3]} ) coerce_df = pd.DataFrame( { @@ -242,6 +242,10 @@ def test_filter_rows_equals(self): ) expected = pd.DataFrame({"a": [1], "b": ["c_3"]}) assert filtered.df.equals(expected) + assert ( + len(self.basic_df) - len(expected) + == filtered.summary.row_modifications["removed"] + ), "The correct number of rows should report as filtered out." def test_filter_rows_contains(self): filtered = self.proc.filter_rows( @@ -249,21 +253,38 @@ def test_filter_rows_contains(self): ) expected = pd.DataFrame({"a": [2, 3], "b": ["b_1", "b_2"]}) assert filtered.df.equals(expected) + assert ( + len(self.basic_df) - len(expected) + == filtered.summary.row_modifications["removed"] + ), "The correct number of rows should report as filtered out." def test_rename_columns(self): - renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}).df + col_renames = {"a": "c"} + renamed = self.proc.rename_columns(self.basic_df, map=col_renames) expected = pd.DataFrame({"c": [2, 3, 1], "b": ["b_1", "b_2", "c_3"]}) - assert renamed.equals(expected) + assert renamed.df.equals(expected) + assert col_renames == renamed.summary.column_modifications["renamed"] + # TODO: test cols spec'd as rename, that don't exist def test_rename_columns_drop(self): - renamed = self.proc.rename_columns(self.basic_df, {"a": "c"}, drop_others=True) + col_renames = {"a": "c"} + renamed = self.proc.rename_columns( + self.basic_df, map=col_renames, drop_others=True + ) expected = pd.DataFrame({"c": [2, 3, 1]}) assert renamed.df.equals(expected) + assert { + "renamed": col_renames, + "removed": ["b"], + } == renamed.summary.column_modifications def test_clean_column_names(self): cleaned = self.proc.clean_column_names(self.messy_names_df, replace={"_": "-"}) expected = pd.DataFrame({"Column": [1, 2], "Two-Words": [3, 4]}) assert cleaned.df.equals(expected) + assert { + "Two_Words": "Two-Words", + } == cleaned.summary.column_modifications["renamed"] def test_clean_column_names_lower(self): cleaned = self.proc.clean_column_names( @@ -271,55 +292,91 @@ def test_clean_column_names_lower(self): ) expected = pd.DataFrame({"column": [1, 2], "two-words": [3, 4]}) assert cleaned.df.equals(expected) + assert { + "Column": "column", + "Two_Words": "two-words", + } == cleaned.summary.column_modifications["renamed"] def test_update_column(self): updated = self.proc.update_column(self.basic_df, column_name="a", val=5) expected = pd.DataFrame({"a": [5, 5, 5], "b": ["b_1", "b_2", "c_3"]}) assert updated.df.equals(expected) + assert len(self.basic_df) == updated.summary.row_modifications["updated"] @mock.patch("dcpy.connectors.edm.recipes.read_df") def test_append_prev(self, read_df): + PREV_VERSION = "24c" read_df.return_value = self.prev_df - appended = self.proc.append_prev(self.basic_df) + appended = self.proc.append_prev(self.basic_df, version=PREV_VERSION) expected = pd.DataFrame({"a": [-1, 2, 3, 1], "b": ["z", "b_1", "b_2", "c_3"]}) assert appended.df.equals(expected) + assert PREV_VERSION == appended.summary.custom["previous_version"] + assert len(self.prev_df) == appended.summary.row_modifications["added"] @mock.patch("dcpy.connectors.edm.recipes.read_df") def test_upsert_column_of_previous_version(self, read_df): + PREV_VERSION = "24c" read_df.return_value = self.upsert_df - upserted = self.proc.upsert_column_of_previous_version(self.basic_df, key=["a"]) + upserted = self.proc.upsert_column_of_previous_version( + self.basic_df, key=["a"], version=PREV_VERSION + ) expected = pd.DataFrame( - {"a": [3, 2, 1], "b": ["b_2", "b_1", "c_3"], "c": [True, False, True]} + { + "a": [3, 2, 1], + "b": ["b_2", "b_1", "c_3"], + "c": [True, False, True], + "d": [1, 2, 3], + } ) assert upserted.df.equals(expected) + assert PREV_VERSION == upserted.summary.custom["previous_version"] + assert ["c", "d"] == upserted.summary.column_modifications[ + "added" + ], "The extra columns should have been added" def test_deduplicate(self): deduped = self.proc.deduplicate(self.dupe_df) expected = pd.DataFrame({"a": [1, 1, 2], "b": [3, 1, 2]}) assert deduped.df.equals(expected) + assert ( + len(self.dupe_df) - len(deduped.df) + == deduped.summary.row_modifications["dropped"] + ) def test_deduplicate_by(self): deduped = self.proc.deduplicate(self.dupe_df, by="a") expected = pd.DataFrame({"a": [1, 2], "b": [3, 2]}) assert deduped.df.equals(expected) + assert ( + len(self.dupe_df) - len(deduped.df) + == deduped.summary.row_modifications["dropped"] + ) def test_deduplicate_by_sort(self): deduped = self.proc.deduplicate(self.dupe_df, by="a", sort_columns="b") expected = pd.DataFrame({"a": [1, 2], "b": [1, 2]}) assert deduped.df.equals(expected) + assert ( + len(self.dupe_df) - len(deduped.df) + == deduped.summary.row_modifications["dropped"] + ) def test_drop_columns(self): dropped = self.proc.drop_columns(self.basic_df, columns=["b"]) expected = pd.DataFrame({"a": [2, 3, 1]}) assert dropped.df.equals(expected) + assert ["b"] == dropped.summary.column_modifications["dropped"] def test_strip_columns(self): stripped = self.proc.strip_columns(self.whitespace_df, ["b"]) assert stripped.df.equals(self.basic_df) + assert ["b"] == stripped.summary.column_modifications["modified"] def test_strip_all_columns(self): stripped = self.proc.strip_columns(self.whitespace_df) assert stripped.df.equals(self.basic_df) + assert ["a", "b"] == stripped.summary.column_modifications["modified"] + assert 10 == stripped.summary.row_modifications["rows_modified"] @pytest.mark.parametrize( "original_column, cast, errors, expected_column",