diff --git a/dcpy/connectors/edm/recipes.py b/dcpy/connectors/edm/recipes.py index 211234e83..eb30f8cac 100644 --- a/dcpy/connectors/edm/recipes.py +++ b/dcpy/connectors/edm/recipes.py @@ -65,7 +65,7 @@ def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> No ) with TemporaryDirectory() as tmp_dir: tmp_dir_path = Path(tmp_dir) - shutil.copy(file_path, tmp_dir_path) + shutil.copy(file_path, tmp_dir_path / config.filename) with open(tmp_dir_path / "config.json", "w") as f: f.write( json.dumps(config.model_dump(exclude_none=True, mode="json"), indent=4) @@ -138,6 +138,14 @@ def get_config(name: str, version="latest") -> library.Config | ingest.Config: return ingest.Config(**config) +def try_get_config(dataset: Dataset) -> library.Config | ingest.Config | None: + """Retrieve a recipe config object, if exists""" + if not exists(dataset): + return None + else: + return get_config(dataset.id, dataset.version) + + def get_parquet_metadata(id: str, version="latest") -> parquet.FileMetaData: s3_fs = s3.pyarrow_fs() ds = parquet.ParquetDataset( diff --git a/dcpy/lifecycle/ingest/run.py b/dcpy/lifecycle/ingest/run.py index 0ed41c30d..05ce27636 100644 --- a/dcpy/lifecycle/ingest/run.py +++ b/dcpy/lifecycle/ingest/run.py @@ -1,46 +1,14 @@ import json -import pandas as pd from pathlib import Path import typer from dcpy.models.lifecycle.ingest import Config from dcpy.connectors.edm import recipes -from . import configure, extract, transform +from . import configure, extract, transform, validate TMP_DIR = Path("tmp") -def update_freshness( - config: Config, - staging_dir: Path, - *, - latest: bool, -) -> Config: - """ - This function is called after a dataset has been preprocessed, just before archival - It's called in the case that the version of the dataset in the config (either provided or calculated) - already exists - - The last archived dataset with the same version is pulled in by pandas and compared to what was just processed - If they are identical, the last archived dataset has its config updated to reflect that it was checked but not re-archived - If they differ, the version is "patched" and a new patched version is archived - """ - new = pd.read_parquet(staging_dir / config.filename) - comparison = recipes.read_df(config.dataset) - if new.equals(comparison): - original_archival_timestamp = recipes.update_freshness( - config.dataset_key, config.archival.archival_timestamp - ) - config.archival.archival_timestamp = original_archival_timestamp - if latest: - recipes.set_latest(config.dataset_key, config.archival.acl) - return config - else: - raise FileExistsError( - f"Archived dataset '{config.dataset_key}' already exists and has different data." - ) - - def run( dataset_id: str, version: str | None = None, @@ -95,16 +63,26 @@ def run( output_csv=output_csv, ) - if not skip_archival: - if recipes.exists(config.dataset): - config = update_freshness(config, staging_dir, latest=latest) - else: - recipes.archive_dataset( - config, staging_dir / config.filename, latest=latest - ) - with open(staging_dir / "config.json", "w") as f: json.dump(config.model_dump(mode="json"), f, indent=4) + + action = validate.validate_against_existing_versions( + config.dataset, staging_dir / config.filename + ) + if not skip_archival: + match action: + case validate.ArchiveAction.push: + recipes.archive_dataset( + config, staging_dir / config.filename, latest=latest + ) + case validate.ArchiveAction.update_freshness: + recipes.update_freshness( + config.dataset_key, config.archival.archival_timestamp + ) + if latest: + recipes.set_latest(config.dataset_key, config.archival.acl) + case _: + pass return config diff --git a/dcpy/lifecycle/ingest/validate.py b/dcpy/lifecycle/ingest/validate.py new file mode 100644 index 000000000..34e1fa891 --- /dev/null +++ b/dcpy/lifecycle/ingest/validate.py @@ -0,0 +1,52 @@ +from enum import StrEnum +import pandas as pd +from pathlib import Path + +from dcpy.models.lifecycle.ingest import Config +from dcpy.connectors.edm import recipes +from dcpy.utils.logging import logger + + +class ArchiveAction(StrEnum): + push = "push" + update_freshness = "update_freshness" + do_nothing = "do_nothing" + + +def validate_against_existing_versions( + ds: recipes.Dataset, filepath: Path +) -> ArchiveAction: + """ + This function is called after a dataset has been preprocessed, just before archival + It's called in the case that the version of the dataset in the config (either provided or calculated) + already exists + + The last archived dataset with the same version is pulled in by pandas and compared to what was just processed + If they are identical, the last archived dataset has its config updated to reflect that it was checked but not re-archived + If they differ, the version is "patched" and a new patched version is archived + """ + existing_config = recipes.try_get_config(ds) + if not existing_config: + logger.info(f"Dataset '{ds.key}' does not exist in recipes bucket") + return ArchiveAction.push + else: + if isinstance(existing_config, Config): + new = pd.read_parquet(filepath) + comparison = recipes.read_df(ds) + if new.equals(comparison): + logger.info( + f"Dataset '{ds.key}' already exists and matches newly processed data" + ) + return ArchiveAction.update_freshness + else: + raise FileExistsError( + f"Archived dataset '{ds.key}' already exists and has different data." + ) + + # if previous was archived with library, we both expect some potential slight changes + # and are not able to update "freshness" + else: + logger.warning( + f"previous version of '{ds.key}' archived is from library. cannot update freshness" + ) + return ArchiveAction.do_nothing diff --git a/dcpy/test/lifecycle/ingest/__init__.py b/dcpy/test/lifecycle/ingest/__init__.py index f38712956..e69de29bb 100644 --- a/dcpy/test/lifecycle/ingest/__init__.py +++ b/dcpy/test/lifecycle/ingest/__init__.py @@ -1,50 +0,0 @@ -from pathlib import Path - - -from dcpy.models.connectors.edm.publishing import GisDataset -from dcpy.models.connectors import socrata, web -from dcpy.models.lifecycle.ingest import ( - LocalFileSource, - ScriptSource, - S3Source, - DEPublished, -) -from dcpy.test.conftest import RECIPES_BUCKET - -RESOURCES = Path(__file__).parent / "resources" -TEMPLATE_DIR = RESOURCES / "templates" -TEST_DATA_DIR = "test_data" -TEST_DATASET_NAME = "test_dataset" -FAKE_VERSION = "20240101" - - -class Sources: - local_file = LocalFileSource(type="local_file", path=Path("subfolder/dummy.txt")) - gis = GisDataset(type="edm_publishing_gis_dataset", name=TEST_DATASET_NAME) - script = ScriptSource(type="script", connector="web", function="get_df") - file_download = web.FileDownloadSource( - type="file_download", - url="https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/pad_24a.zip", - ) - api = web.GenericApiSource( - type="api", - endpoint="https://www.bklynlibrary.org/locations/json", - format="json", - ) - socrata = socrata.Source( - type="socrata", org=socrata.Org.nyc, uid="w7w3-xahh", format="csv" - ) - s3 = S3Source(type="s3", bucket=RECIPES_BUCKET, key="inbox/test/test.txt") - de_publish = DEPublished( - type="de-published", product=TEST_DATASET_NAME, filename="file.csv" - ) - - -SOURCE_FILENAMES = [ - (Sources.local_file, "dummy.txt"), - (Sources.gis, f"{TEST_DATASET_NAME}.zip"), - (Sources.file_download, "pad_24a.zip"), - (Sources.api, f"{TEST_DATASET_NAME}.json"), - (Sources.socrata, f"{TEST_DATASET_NAME}.csv"), - (Sources.s3, "test.txt"), -] diff --git a/dcpy/test/lifecycle/ingest/shared.py b/dcpy/test/lifecycle/ingest/shared.py new file mode 100644 index 000000000..98aef2cbf --- /dev/null +++ b/dcpy/test/lifecycle/ingest/shared.py @@ -0,0 +1,86 @@ +from datetime import datetime +from pathlib import Path + +from dcpy.models.connectors.edm.publishing import GisDataset +from dcpy.models.connectors.edm.recipes import Dataset +from dcpy.models import file, library +from dcpy.models.connectors import socrata, web +from dcpy.models.lifecycle.ingest import ( + LocalFileSource, + ScriptSource, + S3Source, + DEPublished, + DatasetAttributes, + ArchivalMetadata, + Ingestion, + Config, +) +from dcpy.utils.metadata import get_run_details +from dcpy.test.conftest import RECIPES_BUCKET + +RESOURCES = Path(__file__).parent / "resources" +TEMPLATE_DIR = RESOURCES / "templates" +TEST_DATA_DIR = "test_data" +TEST_OUTPUT = RESOURCES / TEST_DATA_DIR / "output.parquet" +TEST_DATASET_NAME = "test_dataset" +FAKE_VERSION = "20240101" +TEST_DATASET = Dataset(id=TEST_DATASET_NAME, version=FAKE_VERSION) + + +class Sources: + local_file = LocalFileSource(type="local_file", path=Path("subfolder/dummy.txt")) + gis = GisDataset(type="edm_publishing_gis_dataset", name=TEST_DATASET_NAME) + script = ScriptSource(type="script", connector="web", function="get_df") + file_download = web.FileDownloadSource( + type="file_download", + url="https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/pad_24a.zip", + ) + api = web.GenericApiSource( + type="api", + endpoint="https://www.bklynlibrary.org/locations/json", + format="json", + ) + socrata = socrata.Source( + type="socrata", org=socrata.Org.nyc, uid="w7w3-xahh", format="csv" + ) + s3 = S3Source(type="s3", bucket=RECIPES_BUCKET, key="inbox/test/test.txt") + de_publish = DEPublished( + type="de-published", product=TEST_DATASET_NAME, filename="file.csv" + ) + + +BASIC_CONFIG = Config( + id=TEST_DATASET_NAME, + version=FAKE_VERSION, + attributes=DatasetAttributes(name=TEST_DATASET_NAME), + archival=ArchivalMetadata( + archival_timestamp=datetime(2024, 1, 1), + raw_filename="dummy.txt", + acl="public-read", + ), + ingestion=Ingestion(source=Sources.local_file, file_format=file.Csv(type="csv")), + run_details=get_run_details(), +) + +BASIC_LIBRARY_CONFIG = library.Config( + dataset=library.DatasetDefinition( + name=TEST_DATASET_NAME, + version=FAKE_VERSION, + acl="public-read", + source=library.DatasetDefinition.SourceSection(), + destination=library.DatasetDefinition.DestinationSection( + geometry=library.GeometryType(SRS="NONE", type="NONE") + ), + ), + execution_details=get_run_details(), +) + + +SOURCE_FILENAMES = [ + (Sources.local_file, "dummy.txt"), + (Sources.gis, f"{TEST_DATASET_NAME}.zip"), + (Sources.file_download, "pad_24a.zip"), + (Sources.api, f"{TEST_DATASET_NAME}.json"), + (Sources.socrata, f"{TEST_DATASET_NAME}.csv"), + (Sources.s3, "test.txt"), +] diff --git a/dcpy/test/lifecycle/ingest/test_configure.py b/dcpy/test/lifecycle/ingest/test_configure.py index a2d6e759c..d3a1cd3fd 100644 --- a/dcpy/test/lifecycle/ingest/test_configure.py +++ b/dcpy/test/lifecycle/ingest/test_configure.py @@ -12,7 +12,13 @@ from dcpy.lifecycle.ingest import configure from dcpy.test.conftest import mock_request_get -from . import RESOURCES, TEST_DATASET_NAME, Sources, SOURCE_FILENAMES, TEMPLATE_DIR +from .shared import ( + RESOURCES, + TEST_DATASET_NAME, + Sources, + SOURCE_FILENAMES, + TEMPLATE_DIR, +) def test_jinja_vars(): diff --git a/dcpy/test/lifecycle/ingest/test_extract.py b/dcpy/test/lifecycle/ingest/test_extract.py index 7303208f9..9e60949c5 100644 --- a/dcpy/test/lifecycle/ingest/test_extract.py +++ b/dcpy/test/lifecycle/ingest/test_extract.py @@ -9,8 +9,8 @@ from dcpy.connectors import web from dcpy.lifecycle.ingest import extract -from . import TEST_DATASET_NAME, FAKE_VERSION, SOURCE_FILENAMES, Sources from dcpy.test.conftest import mock_request_get, PUBLISHING_BUCKET +from .shared import TEST_DATASET_NAME, FAKE_VERSION, SOURCE_FILENAMES, Sources web.get_df = mock.MagicMock(return_value=pd.DataFrame()) # type: ignore diff --git a/dcpy/test/lifecycle/ingest/test_ingest.py b/dcpy/test/lifecycle/ingest/test_ingest.py deleted file mode 100644 index 5af6cbc6b..000000000 --- a/dcpy/test/lifecycle/ingest/test_ingest.py +++ /dev/null @@ -1,16 +0,0 @@ -import yaml -from dcpy.models.lifecycle.ingest import Template - -from dcpy.lifecycle.ingest import configure, transform - - -def test_validate_all_datasets(): - templates = [t for t in configure.TEMPLATE_DIR.glob("*")] - assert len(templates) > 0 - for file in templates: - with open(file, "r") as f: - s = yaml.safe_load(f) - template = Template(**s) - transform.validate_processing_steps( - template.id, template.ingestion.processing_steps - ) diff --git a/dcpy/test/lifecycle/ingest/test_run.py b/dcpy/test/lifecycle/ingest/test_run.py index ed469525e..77d5807d9 100644 --- a/dcpy/test/lifecycle/ingest/test_run.py +++ b/dcpy/test/lifecycle/ingest/test_run.py @@ -9,7 +9,7 @@ from dcpy.lifecycle.ingest.run import run, TMP_DIR from dcpy.test.conftest import mock_request_get -from . import FAKE_VERSION, TEMPLATE_DIR +from .shared import FAKE_VERSION, TEMPLATE_DIR DATASET = "bpl_libraries" S3_PATH = f"datasets/{DATASET}/{FAKE_VERSION}/{DATASET}.parquet" diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index 48ff2a0bc..d89afb6b8 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -16,7 +16,7 @@ from dcpy.utils.geospatial import parquet as geoparquet from dcpy.lifecycle.ingest import transform -from . import RESOURCES, TEST_DATA_DIR, TEST_DATASET_NAME +from .shared import RESOURCES, TEST_DATA_DIR, TEST_DATASET_NAME class FakeConfig(BaseModel): diff --git a/dcpy/test/lifecycle/ingest/test_validate.py b/dcpy/test/lifecycle/ingest/test_validate.py new file mode 100644 index 000000000..13e74eb4a --- /dev/null +++ b/dcpy/test/lifecycle/ingest/test_validate.py @@ -0,0 +1,68 @@ +from io import BytesIO +import json +import pytest +import yaml + +from dcpy.test.conftest import RECIPES_BUCKET +from dcpy.models.lifecycle.ingest import Template +from dcpy.utils import s3 +from dcpy.connectors.edm import recipes +from dcpy.lifecycle.ingest import configure, transform, validate + +from .shared import ( + TEST_DATASET, + TEST_OUTPUT, + BASIC_CONFIG, + BASIC_LIBRARY_CONFIG, +) + + +@pytest.mark.parametrize("dataset", [t.name for t in configure.TEMPLATE_DIR.glob("*")]) +def test_validate_all_templates(dataset): + with open(configure.TEMPLATE_DIR / dataset, "r") as f: + s = yaml.safe_load(f) + template = Template(**s) + transform.validate_processing_steps( + template.id, template.ingestion.processing_steps + ) + + +class TestValidateAgainstExistingVersions: + def test_new(self, create_buckets): + assert ( + validate.validate_against_existing_versions(TEST_DATASET, TEST_OUTPUT) + == validate.ArchiveAction.push + ) + + def test_existing_library(self, create_buckets): + ds = BASIC_LIBRARY_CONFIG.sparse_dataset + config_str = json.dumps(BASIC_LIBRARY_CONFIG.model_dump(mode="json")) + s3.upload_file_obj( + BytesIO(config_str.encode()), + RECIPES_BUCKET, + f"{recipes.s3_folder_path(ds)}/config.json", + BASIC_LIBRARY_CONFIG.dataset.acl, + ) + assert recipes.exists(ds) + assert ( + validate.validate_against_existing_versions(ds, TEST_OUTPUT) + == validate.ArchiveAction.do_nothing + ) + + def test_existing(self, create_buckets): + ds = BASIC_CONFIG.dataset + recipes.archive_dataset(BASIC_CONFIG, TEST_OUTPUT) + assert recipes.exists(ds) + assert ( + validate.validate_against_existing_versions(ds, TEST_OUTPUT) + == validate.ArchiveAction.update_freshness + ) + + def test_existing_data_diffs(self, create_buckets): + ds = BASIC_CONFIG.dataset + recipes.archive_dataset(BASIC_CONFIG, TEST_OUTPUT) + assert recipes.exists(ds) + with pytest.raises(FileExistsError): + validate.validate_against_existing_versions( + ds, TEST_OUTPUT.parent / "test.parquet" + )