Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework "update freshness" to not error with library -> ingest #1339

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion dcpy/connectors/edm/recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> No
)
with TemporaryDirectory() as tmp_dir:
tmp_dir_path = Path(tmp_dir)
shutil.copy(file_path, tmp_dir_path)
shutil.copy(file_path, tmp_dir_path / config.filename)
with open(tmp_dir_path / "config.json", "w") as f:
f.write(
json.dumps(config.model_dump(exclude_none=True, mode="json"), indent=4)
Expand Down Expand Up @@ -138,6 +138,14 @@ def get_config(name: str, version="latest") -> library.Config | ingest.Config:
return ingest.Config(**config)


def try_get_config(dataset: Dataset) -> library.Config | ingest.Config | None:
"""Retrieve a recipe config object, if exists"""
if not exists(dataset):
return None
else:
return get_config(dataset.id, dataset.version)


def get_parquet_metadata(id: str, version="latest") -> parquet.FileMetaData:
s3_fs = s3.pyarrow_fs()
ds = parquet.ParquetDataset(
Expand Down
60 changes: 19 additions & 41 deletions dcpy/lifecycle/ingest/run.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,14 @@
import json
import pandas as pd
from pathlib import Path
import typer

from dcpy.models.lifecycle.ingest import Config
from dcpy.connectors.edm import recipes
from . import configure, extract, transform
from . import configure, extract, transform, validate

TMP_DIR = Path("tmp")


def update_freshness(
config: Config,
staging_dir: Path,
*,
latest: bool,
) -> Config:
"""
This function is called after a dataset has been preprocessed, just before archival
It's called in the case that the version of the dataset in the config (either provided or calculated)
already exists

The last archived dataset with the same version is pulled in by pandas and compared to what was just processed
If they are identical, the last archived dataset has its config updated to reflect that it was checked but not re-archived
If they differ, the version is "patched" and a new patched version is archived
"""
new = pd.read_parquet(staging_dir / config.filename)
comparison = recipes.read_df(config.dataset)
if new.equals(comparison):
original_archival_timestamp = recipes.update_freshness(
config.dataset_key, config.archival.archival_timestamp
)
config.archival.archival_timestamp = original_archival_timestamp
if latest:
recipes.set_latest(config.dataset_key, config.archival.acl)
return config
else:
raise FileExistsError(
f"Archived dataset '{config.dataset_key}' already exists and has different data."
)


def run(
dataset_id: str,
version: str | None = None,
Expand Down Expand Up @@ -95,16 +63,26 @@
output_csv=output_csv,
)

if not skip_archival:
if recipes.exists(config.dataset):
config = update_freshness(config, staging_dir, latest=latest)
else:
recipes.archive_dataset(
config, staging_dir / config.filename, latest=latest
)

with open(staging_dir / "config.json", "w") as f:
json.dump(config.model_dump(mode="json"), f, indent=4)

action = validate.validate_against_existing_versions(
config.dataset, staging_dir / config.filename
)
if not skip_archival:
match action:
case validate.ArchiveAction.push:
recipes.archive_dataset(
config, staging_dir / config.filename, latest=latest
)
case validate.ArchiveAction.update_freshness:
recipes.update_freshness(
config.dataset_key, config.archival.archival_timestamp
)
if latest:
recipes.set_latest(config.dataset_key, config.archival.acl)
case _:
pass

Check warning on line 85 in dcpy/lifecycle/ingest/run.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/ingest/run.py#L84-L85

Added lines #L84 - L85 were not covered by tests
return config


Expand Down
52 changes: 52 additions & 0 deletions dcpy/lifecycle/ingest/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from enum import StrEnum
import pandas as pd
from pathlib import Path

from dcpy.models.lifecycle.ingest import Config
from dcpy.connectors.edm import recipes
from dcpy.utils.logging import logger


class ArchiveAction(StrEnum):
push = "push"
update_freshness = "update_freshness"
do_nothing = "do_nothing"


def validate_against_existing_versions(
ds: recipes.Dataset, filepath: Path
) -> ArchiveAction:
"""
This function is called after a dataset has been preprocessed, just before archival
It's called in the case that the version of the dataset in the config (either provided or calculated)
already exists

The last archived dataset with the same version is pulled in by pandas and compared to what was just processed
If they are identical, the last archived dataset has its config updated to reflect that it was checked but not re-archived
If they differ, the version is "patched" and a new patched version is archived
"""
existing_config = recipes.try_get_config(ds)
if not existing_config:
logger.info(f"Dataset '{ds.key}' does not exist in recipes bucket")
return ArchiveAction.push
else:
if isinstance(existing_config, Config):
new = pd.read_parquet(filepath)
comparison = recipes.read_df(ds)
if new.equals(comparison):
logger.info(
f"Dataset '{ds.key}' already exists and matches newly processed data"
)
return ArchiveAction.update_freshness
else:
raise FileExistsError(
f"Archived dataset '{ds.key}' already exists and has different data."
)

# if previous was archived with library, we both expect some potential slight changes
# and are not able to update "freshness"
else:
logger.warning(
f"previous version of '{ds.key}' archived is from library. cannot update freshness"
)
return ArchiveAction.do_nothing
50 changes: 0 additions & 50 deletions dcpy/test/lifecycle/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,50 +0,0 @@
from pathlib import Path


from dcpy.models.connectors.edm.publishing import GisDataset
from dcpy.models.connectors import socrata, web
from dcpy.models.lifecycle.ingest import (
LocalFileSource,
ScriptSource,
S3Source,
DEPublished,
)
from dcpy.test.conftest import RECIPES_BUCKET

RESOURCES = Path(__file__).parent / "resources"
TEMPLATE_DIR = RESOURCES / "templates"
TEST_DATA_DIR = "test_data"
TEST_DATASET_NAME = "test_dataset"
FAKE_VERSION = "20240101"


class Sources:
local_file = LocalFileSource(type="local_file", path=Path("subfolder/dummy.txt"))
gis = GisDataset(type="edm_publishing_gis_dataset", name=TEST_DATASET_NAME)
script = ScriptSource(type="script", connector="web", function="get_df")
file_download = web.FileDownloadSource(
type="file_download",
url="https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/pad_24a.zip",
)
api = web.GenericApiSource(
type="api",
endpoint="https://www.bklynlibrary.org/locations/json",
format="json",
)
socrata = socrata.Source(
type="socrata", org=socrata.Org.nyc, uid="w7w3-xahh", format="csv"
)
s3 = S3Source(type="s3", bucket=RECIPES_BUCKET, key="inbox/test/test.txt")
de_publish = DEPublished(
type="de-published", product=TEST_DATASET_NAME, filename="file.csv"
)


SOURCE_FILENAMES = [
(Sources.local_file, "dummy.txt"),
(Sources.gis, f"{TEST_DATASET_NAME}.zip"),
(Sources.file_download, "pad_24a.zip"),
(Sources.api, f"{TEST_DATASET_NAME}.json"),
(Sources.socrata, f"{TEST_DATASET_NAME}.csv"),
(Sources.s3, "test.txt"),
]
86 changes: 86 additions & 0 deletions dcpy/test/lifecycle/ingest/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from datetime import datetime
from pathlib import Path

from dcpy.models.connectors.edm.publishing import GisDataset
from dcpy.models.connectors.edm.recipes import Dataset
from dcpy.models import file, library
from dcpy.models.connectors import socrata, web
from dcpy.models.lifecycle.ingest import (
LocalFileSource,
ScriptSource,
S3Source,
DEPublished,
DatasetAttributes,
ArchivalMetadata,
Ingestion,
Config,
)
from dcpy.utils.metadata import get_run_details
from dcpy.test.conftest import RECIPES_BUCKET

RESOURCES = Path(__file__).parent / "resources"
TEMPLATE_DIR = RESOURCES / "templates"
TEST_DATA_DIR = "test_data"
TEST_OUTPUT = RESOURCES / TEST_DATA_DIR / "output.parquet"
TEST_DATASET_NAME = "test_dataset"
FAKE_VERSION = "20240101"
TEST_DATASET = Dataset(id=TEST_DATASET_NAME, version=FAKE_VERSION)


class Sources:
local_file = LocalFileSource(type="local_file", path=Path("subfolder/dummy.txt"))
gis = GisDataset(type="edm_publishing_gis_dataset", name=TEST_DATASET_NAME)
script = ScriptSource(type="script", connector="web", function="get_df")
file_download = web.FileDownloadSource(
type="file_download",
url="https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/pad_24a.zip",
)
api = web.GenericApiSource(
type="api",
endpoint="https://www.bklynlibrary.org/locations/json",
format="json",
)
socrata = socrata.Source(
type="socrata", org=socrata.Org.nyc, uid="w7w3-xahh", format="csv"
)
s3 = S3Source(type="s3", bucket=RECIPES_BUCKET, key="inbox/test/test.txt")
de_publish = DEPublished(
type="de-published", product=TEST_DATASET_NAME, filename="file.csv"
)


BASIC_CONFIG = Config(
id=TEST_DATASET_NAME,
version=FAKE_VERSION,
attributes=DatasetAttributes(name=TEST_DATASET_NAME),
archival=ArchivalMetadata(
archival_timestamp=datetime(2024, 1, 1),
raw_filename="dummy.txt",
acl="public-read",
),
ingestion=Ingestion(source=Sources.local_file, file_format=file.Csv(type="csv")),
run_details=get_run_details(),
)

BASIC_LIBRARY_CONFIG = library.Config(
dataset=library.DatasetDefinition(
name=TEST_DATASET_NAME,
version=FAKE_VERSION,
acl="public-read",
source=library.DatasetDefinition.SourceSection(),
destination=library.DatasetDefinition.DestinationSection(
geometry=library.GeometryType(SRS="NONE", type="NONE")
),
),
execution_details=get_run_details(),
)


SOURCE_FILENAMES = [
(Sources.local_file, "dummy.txt"),
(Sources.gis, f"{TEST_DATASET_NAME}.zip"),
(Sources.file_download, "pad_24a.zip"),
(Sources.api, f"{TEST_DATASET_NAME}.json"),
(Sources.socrata, f"{TEST_DATASET_NAME}.csv"),
(Sources.s3, "test.txt"),
]
8 changes: 7 additions & 1 deletion dcpy/test/lifecycle/ingest/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
from dcpy.lifecycle.ingest import configure

from dcpy.test.conftest import mock_request_get
from . import RESOURCES, TEST_DATASET_NAME, Sources, SOURCE_FILENAMES, TEMPLATE_DIR
from .shared import (
RESOURCES,
TEST_DATASET_NAME,
Sources,
SOURCE_FILENAMES,
TEMPLATE_DIR,
)


def test_jinja_vars():
Expand Down
2 changes: 1 addition & 1 deletion dcpy/test/lifecycle/ingest/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from dcpy.connectors import web
from dcpy.lifecycle.ingest import extract

from . import TEST_DATASET_NAME, FAKE_VERSION, SOURCE_FILENAMES, Sources
from dcpy.test.conftest import mock_request_get, PUBLISHING_BUCKET
from .shared import TEST_DATASET_NAME, FAKE_VERSION, SOURCE_FILENAMES, Sources

web.get_df = mock.MagicMock(return_value=pd.DataFrame()) # type: ignore

Expand Down
16 changes: 0 additions & 16 deletions dcpy/test/lifecycle/ingest/test_ingest.py

This file was deleted.

2 changes: 1 addition & 1 deletion dcpy/test/lifecycle/ingest/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dcpy.lifecycle.ingest.run import run, TMP_DIR

from dcpy.test.conftest import mock_request_get
from . import FAKE_VERSION, TEMPLATE_DIR
from .shared import FAKE_VERSION, TEMPLATE_DIR

DATASET = "bpl_libraries"
S3_PATH = f"datasets/{DATASET}/{FAKE_VERSION}/{DATASET}.parquet"
Expand Down
2 changes: 1 addition & 1 deletion dcpy/test/lifecycle/ingest/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dcpy.utils.geospatial import parquet as geoparquet
from dcpy.lifecycle.ingest import transform

from . import RESOURCES, TEST_DATA_DIR, TEST_DATASET_NAME
from .shared import RESOURCES, TEST_DATA_DIR, TEST_DATASET_NAME


class FakeConfig(BaseModel):
Expand Down
Loading
Loading