From ae3f0fd5ee4bcdab6e3e8e03009fcaeb932c3fdc Mon Sep 17 00:00:00 2001 From: Shubham Jagtap <132359390+shubhamjagtap639@users.noreply.github.com> Date: Tue, 7 May 2024 17:36:40 +0530 Subject: [PATCH] feat(ingestion): Copy urns from previous checkpoint state on ingestion failure (#10347) --- ...gestion_job_checkpointing_provider_base.py | 4 +- .../state/stale_entity_removal_handler.py | 11 +- .../state/golden_test_checkpoint_state.json | 4 +- ...n_test_checkpoint_state_after_deleted.json | 4 +- ...heckpoint_state_after_deleted_failure.json | 26 ++ .../golden_test_checkpoint_state_failure.json | 26 ++ ...teful_ingestion_after_deleted_failure.json | 34 +++ ...olden_test_stateful_ingestion_failure.json | 50 +++ .../state/test_stateful_ingestion.py | 288 +++++++++++++----- 9 files changed, 368 insertions(+), 79 deletions(-) create mode 100644 metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted_failure.json create mode 100644 metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_failure.json create mode 100644 metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted_failure.json create mode 100644 metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_failure.json diff --git a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py index 285ad9c0884474..3680546d307d97 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py +++ b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py @@ -26,9 +26,7 @@ class IngestionCheckpointingProviderBase(StatefulCommittable[CheckpointJobStates The base class for all checkpointing state provider implementations. """ - def __init__( - self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS - ): + def __init__(self, name: str, commit_policy: CommitPolicy = CommitPolicy.ALWAYS): # Set the initial state to an empty dict. super().__init__(name, commit_policy, {}) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index b80067aa0892cd..0145c922696e89 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -164,6 +164,9 @@ def set_job_id(self, unique_id): def is_checkpointing_enabled(self) -> bool: return self.checkpointing_enabled + def _get_state_obj(self): + return self.state_type_class() + def create_checkpoint(self) -> Optional[Checkpoint]: if self.is_checkpointing_enabled() and not self._ignore_new_state(): assert self.stateful_ingestion_config is not None @@ -172,7 +175,7 @@ def create_checkpoint(self) -> Optional[Checkpoint]: job_name=self.job_id, pipeline_name=self.pipeline_name, run_id=self.run_id, - state=self.state_type_class(), + state=self._get_state_obj(), ) return None @@ -255,9 +258,13 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: # If the source already had a failure, skip soft-deletion. # TODO: Eventually, switch this to check if anything in the pipeline had a failure so far. if self.source.get_report().failures: + for urn in last_checkpoint_state.get_urns_not_in( + type="*", other_checkpoint_state=cur_checkpoint_state + ): + self.add_entity_to_state("", urn) self.source.get_report().report_warning( "stale-entity-removal", - "Skipping stale entity soft-deletion since source already had failures.", + "Skipping stale entity soft-deletion and coping urns from last state since source already had failures.", ) return diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json index 4e62492918bfb9..fcf73d9614f242 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json @@ -16,8 +16,8 @@ "config": "", "state": { "formatVersion": "1.0", - "serde": "base85-bz2-json", - "payload": "LRx4!F+o`-Q(1w>5G4QrYoCBnWH=B60MH7jr`{?c0BA?5L)2-AGyu>6y;V<9hz%Mv0Bt1*)lOMzr>a0|Iq-4VtTsYONQsFPLn1EpdQS;HIy|&CvSAlRvAJwmtCEM+Rx(v_)~sVvkx3V@WX4O`=losC6yZWb2OL0@" + "serde": "utf-8", + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json index 6ecd43483d9483..5477af72a1939c 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json @@ -16,8 +16,8 @@ "config": "", "state": { "formatVersion": "1.0", - "serde": "base85-bz2-json", - "payload": "LRx4!F+o`-Q(317h`0a%NgsevWH1l}0MH7jr`{?c0B9vdZ9%mLfYG4P6;f$2G%+v`9z&~6n|e(JEPC2_Iix~CA_im)jR-zsjEK*yo|HQz#IUUHtf@DYVEme-lUW9{Xmmt~y^2jCdyY95az!{$kf#WUxB" + "serde": "utf-8", + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted_failure.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted_failure.json new file mode 100644 index 00000000000000..fcf73d9614f242 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted_failure.json @@ -0,0 +1,26 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)", + "changeType": "UPSERT", + "aspectName": "datahubIngestionCheckpoint", + "aspect": { + "json": { + "timestampMillis": 1586847600000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "pipelineName": "dummy_stateful", + "platformInstanceId": "", + "config": "", + "state": { + "formatVersion": "1.0", + "serde": "utf-8", + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}" + }, + "runId": "dummy-test-stateful-ingestion" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_failure.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_failure.json new file mode 100644 index 00000000000000..fcf73d9614f242 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_failure.json @@ -0,0 +1,26 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)", + "changeType": "UPSERT", + "aspectName": "datahubIngestionCheckpoint", + "aspect": { + "json": { + "timestampMillis": 1586847600000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "pipelineName": "dummy_stateful", + "platformInstanceId": "", + "config": "", + "state": { + "formatVersion": "1.0", + "serde": "utf-8", + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}" + }, + "runId": "dummy-test-stateful-ingestion" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted_failure.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted_failure.json new file mode 100644 index 00000000000000..a1f5132cac0a3e --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted_failure.json @@ -0,0 +1,34 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_failure.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_failure.json new file mode 100644 index 00000000000000..4a77651c930667 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_failure.json @@ -0,0 +1,50 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py index 2b811d5e5e3a33..783b0fe18b29ab 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -1,7 +1,9 @@ from dataclasses import dataclass, field as dataclass_field from typing import Any, Dict, Iterable, List, Optional, cast +from unittest import mock import pydantic +import pytest from freezegun import freeze_time from pydantic import Field @@ -56,6 +58,10 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="Dummy source Ingestion Config." ) + report_failure: bool = Field( + default=False, + description="Should this dummy source report a failure.", + ) class DummySource(StatefulIngestionSourceBase): @@ -103,10 +109,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: aspect=StatusClass(removed=False), ).as_workunit() + if self.source_config.report_failure: + self.reporter.report_failure("Dummy error", "Error") + def get_report(self) -> SourceReport: return self.reporter +@pytest.fixture(scope="module") +def mock_generic_checkpoint_state(): + with mock.patch( + "datahub.ingestion.source.state.entity_removal_state.GenericCheckpointState" + ) as mock_checkpoint_state: + checkpoint_state = mock_checkpoint_state.return_value + checkpoint_state.serde.return_value = "utf-8" + yield mock_checkpoint_state + + @freeze_time(FROZEN_TIME) def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): # test stateful ingestion using dummy source @@ -148,80 +167,209 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): }, } - pipeline_run1 = None - pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore - base_pipeline_config # type: ignore + with mock.patch( + "datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj" + ) as mock_state: + mock_state.return_value = GenericCheckpointState(serde="utf-8") + pipeline_run1 = None + pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore + base_pipeline_config # type: ignore + ) + pipeline_run1_config["sink"]["config"][ + "filename" + ] = f"{tmp_path}/{output_file_name}" + pipeline_run1 = Pipeline.create(pipeline_run1_config) + pipeline_run1.run() + pipeline_run1.raise_from_status() + pipeline_run1.pretty_print_summary() + + # validate both dummy source mces and checkpoint state mces files + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / output_file_name, + golden_path=f"{test_resources_dir}/{golden_file_name}", + ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / state_file_name, + golden_path=f"{test_resources_dir}/{golden_state_file_name}", + ) + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) + assert checkpoint1 + assert checkpoint1.state + + with mock.patch( + "datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj" + ) as mock_state: + mock_state.return_value = GenericCheckpointState(serde="utf-8") + pipeline_run2 = None + pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore + pipeline_run2_config["source"]["config"]["dataset_patterns"] = { + "allow": ["dummy_dataset1", "dummy_dataset2"], + } + pipeline_run2_config["sink"]["config"][ + "filename" + ] = f"{tmp_path}/{output_file_name_after_deleted}" + pipeline_run2 = Pipeline.create(pipeline_run2_config) + pipeline_run2.run() + pipeline_run2.raise_from_status() + pipeline_run2.pretty_print_summary() + + # validate both updated dummy source mces and checkpoint state mces files after deleting dataset + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / output_file_name_after_deleted, + golden_path=f"{test_resources_dir}/{golden_file_name_after_deleted}", + ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / state_file_name, + golden_path=f"{test_resources_dir}/{golden_state_file_name_after_deleted}", + ) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) + assert checkpoint2 + assert checkpoint2.state + + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run2, expected_providers=1 + ) + + # Perform all assertions on the states. The deleted table should not be + # part of the second state + state1 = cast(GenericCheckpointState, checkpoint1.state) + state2 = cast(GenericCheckpointState, checkpoint2.state) + + difference_dataset_urns = list( + state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) + ) + # the difference in dataset urns is the dataset which is not allowed to ingest + assert len(difference_dataset_urns) == 1 + deleted_dataset_urns: List[str] = [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + ] + assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) + + +@freeze_time(FROZEN_TIME) +def test_stateful_ingestion_failure(pytestconfig, tmp_path, mock_time): + # test stateful ingestion using dummy source with pipeline execution failed in second ingestion + state_file_name: str = "checkpoint_state_mces_failure.json" + golden_state_file_name: str = "golden_test_checkpoint_state_failure.json" + golden_state_file_name_after_deleted: str = ( + "golden_test_checkpoint_state_after_deleted_failure.json" ) - pipeline_run1_config["sink"]["config"][ - "filename" - ] = f"{tmp_path}/{output_file_name}" - pipeline_run1 = Pipeline.create(pipeline_run1_config) - pipeline_run1.run() - pipeline_run1.raise_from_status() - pipeline_run1.pretty_print_summary() - - # validate both dummy source mces and checkpoint state mces files - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / output_file_name, - golden_path=f"{test_resources_dir}/{golden_file_name}", + output_file_name: str = "dummy_mces_failure.json" + golden_file_name: str = "golden_test_stateful_ingestion_failure.json" + output_file_name_after_deleted: str = ( + "dummy_mces_stateful_after_deleted_failure.json" ) - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / state_file_name, - golden_path=f"{test_resources_dir}/{golden_state_file_name}", + golden_file_name_after_deleted: str = ( + "golden_test_stateful_ingestion_after_deleted_failure.json" ) - checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) - assert checkpoint1 - assert checkpoint1.state - - pipeline_run2 = None - pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore - pipeline_run2_config["source"]["config"]["dataset_patterns"] = { - "allow": ["dummy_dataset1", "dummy_dataset2"], + + test_resources_dir = pytestconfig.rootpath / "tests/unit/stateful_ingestion/state" + + base_pipeline_config = { + "run_id": "dummy-test-stateful-ingestion", + "pipeline_name": "dummy_stateful", + "source": { + "type": "tests.unit.stateful_ingestion.state.test_stateful_ingestion.DummySource", + "config": { + "stateful_ingestion": { + "enabled": True, + "remove_stale_metadata": True, + "state_provider": { + "type": "file", + "config": { + "filename": f"{tmp_path}/{state_file_name}", + }, + }, + }, + }, + }, + "sink": { + "type": "file", + "config": {}, + }, } - pipeline_run2_config["sink"]["config"][ - "filename" - ] = f"{tmp_path}/{output_file_name_after_deleted}" - pipeline_run2 = Pipeline.create(pipeline_run2_config) - pipeline_run2.run() - pipeline_run2.raise_from_status() - pipeline_run2.pretty_print_summary() - - # validate both updated dummy source mces and checkpoint state mces files after deleting dataset - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / output_file_name_after_deleted, - golden_path=f"{test_resources_dir}/{golden_file_name_after_deleted}", - ) - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / state_file_name, - golden_path=f"{test_resources_dir}/{golden_state_file_name_after_deleted}", - ) - checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) - assert checkpoint2 - assert checkpoint2.state - # Validate that all providers have committed successfully. - validate_all_providers_have_committed_successfully( - pipeline=pipeline_run1, expected_providers=1 - ) - validate_all_providers_have_committed_successfully( - pipeline=pipeline_run2, expected_providers=1 - ) + with mock.patch( + "datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj" + ) as mock_state: + mock_state.return_value = GenericCheckpointState(serde="utf-8") + pipeline_run1 = None + pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore + base_pipeline_config # type: ignore + ) + pipeline_run1_config["sink"]["config"][ + "filename" + ] = f"{tmp_path}/{output_file_name}" + pipeline_run1 = Pipeline.create(pipeline_run1_config) + pipeline_run1.run() + pipeline_run1.raise_from_status() + pipeline_run1.pretty_print_summary() + + # validate both dummy source mces and checkpoint state mces files + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / output_file_name, + golden_path=f"{test_resources_dir}/{golden_file_name}", + ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / state_file_name, + golden_path=f"{test_resources_dir}/{golden_state_file_name}", + ) + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) + assert checkpoint1 + assert checkpoint1.state - # Perform all assertions on the states. The deleted table should not be - # part of the second state - state1 = cast(GenericCheckpointState, checkpoint1.state) - state2 = cast(GenericCheckpointState, checkpoint2.state) + with mock.patch( + "datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj" + ) as mock_state: + mock_state.return_value = GenericCheckpointState(serde="utf-8") + pipeline_run2 = None + pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore + pipeline_run2_config["source"]["config"]["dataset_patterns"] = { + "allow": ["dummy_dataset1", "dummy_dataset2"], + } + pipeline_run2_config["source"]["config"]["report_failure"] = True + pipeline_run2_config["sink"]["config"][ + "filename" + ] = f"{tmp_path}/{output_file_name_after_deleted}" + pipeline_run2 = Pipeline.create(pipeline_run2_config) + pipeline_run2.run() + pipeline_run2.pretty_print_summary() - difference_dataset_urns = list( - state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) - ) - # the difference in dataset urns is the dataset which is not allowed to ingest - assert len(difference_dataset_urns) == 1 - deleted_dataset_urns: List[str] = [ - "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", - ] - assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) + # validate both updated dummy source mces and checkpoint state mces files after deleting dataset + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / output_file_name_after_deleted, + golden_path=f"{test_resources_dir}/{golden_file_name_after_deleted}", + ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / state_file_name, + golden_path=f"{test_resources_dir}/{golden_state_file_name_after_deleted}", + ) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) + assert checkpoint2 + assert checkpoint2.state + + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run2, expected_providers=1 + ) + + # Perform assertions on the states. The deleted table should be + # still part of the second state as pipeline run failed + state1 = cast(GenericCheckpointState, checkpoint1.state) + state2 = cast(GenericCheckpointState, checkpoint2.state) + assert state1 == state2