From c030f0edf9d60bd5774ef1f75f4bf45c12216370 Mon Sep 17 00:00:00 2001 From: Jos Gesenhues Date: Mon, 18 Nov 2024 12:55:11 +0100 Subject: [PATCH] fix(ingestion/gcs): fix stateful ingestion for GCS source Remove pipeline name before passing context to equivalent s3 source to avoid error "Checkpointing provider DatahubIngestionCheckpointingProvider already registered." --- .../src/datahub/ingestion/source/gcs/gcs_source.py | 6 +++++- metadata-ingestion/tests/unit/test_gcs_source.py | 7 ++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py index 18838af9bdf85f..5151fbc0ad2064 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py @@ -1,3 +1,4 @@ +import copy import logging from typing import Dict, Iterable, List, Optional from urllib.parse import unquote @@ -88,6 +89,7 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext): super().__init__(config, ctx) self.config = config self.report = GCSSourceReport() + self.platform: str = PLATFORM_GCS self.s3_source = self.create_equivalent_s3_source(ctx) @classmethod @@ -135,7 +137,9 @@ def create_equivalent_s3_path_specs(self): def create_equivalent_s3_source(self, ctx: PipelineContext) -> S3Source: config = self.create_equivalent_s3_config() - return self.s3_source_overrides(S3Source(config, ctx)) + s3_ctx = copy.deepcopy(ctx) + s3_ctx.pipeline_name = None + return self.s3_source_overrides(S3Source(config, s3_ctx)) def s3_source_overrides(self, source: S3Source) -> S3Source: source.source_config.platform = PLATFORM_GCS diff --git a/metadata-ingestion/tests/unit/test_gcs_source.py b/metadata-ingestion/tests/unit/test_gcs_source.py index 9d5f4e915b18cf..5b99862c92a04b 100644 --- a/metadata-ingestion/tests/unit/test_gcs_source.py +++ b/metadata-ingestion/tests/unit/test_gcs_source.py @@ -1,13 +1,17 @@ +from unittest import mock + import pytest from pydantic import ValidationError from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.data_lake_common.data_lake_utils import PLATFORM_GCS from datahub.ingestion.source.gcs.gcs_source import GCSSource def test_gcs_source_setup(): - ctx = PipelineContext(run_id="test-gcs") + graph = mock.MagicMock(spec=DataHubGraph) + ctx = PipelineContext(run_id="test-gcs", graph=graph, pipeline_name="test-gcs") # Baseline: valid config source: dict = { @@ -18,6 +22,7 @@ def test_gcs_source_setup(): } ], "credential": {"hmac_access_id": "id", "hmac_access_secret": "secret"}, + "stateful_ingestion": {"enabled": "true"}, } gcs = GCSSource.create(source, ctx) assert gcs.s3_source.source_config.platform == PLATFORM_GCS