From 7915abce6af43d1521c7aec14d5fd76aca440a82 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 26 Dec 2024 21:28:45 -0500 Subject: [PATCH] fix(ingest): use typing_extensions.Self This was initially added in https://github.com/datahub-project/datahub/pull/7313 and then reverted in https://github.com/datahub-project/datahub/pull/7490. Now that we've dropped support for old Airflow versions in https://github.com/datahub-project/datahub/pull/12056, we can actually do the upgrade. --- metadata-ingestion/scripts/avro_codegen.py | 7 +++---- metadata-ingestion/setup.py | 2 +- .../src/datahub/configuration/common.py | 7 ++----- .../src/datahub/ingestion/api/closeable.py | 6 +++--- .../api/ingestion_job_checkpointing_provider_base.py | 11 ++++------- .../src/datahub/ingestion/api/report.py | 5 ++++- metadata-ingestion/src/datahub/ingestion/api/sink.py | 7 ++++--- .../src/datahub/utilities/urns/_urn_base.py | 12 +++++------- 8 files changed, 26 insertions(+), 31 deletions(-) diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index e5792da32fb5d..2841985ad0780 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -154,7 +154,6 @@ def merge_schemas(schemas_obj: List[dict]) -> str: # Patch add_name method to NOT complain about duplicate names. class NamesWithDups(avro.schema.Names): def add_name(self, name_attr, space_attr, new_schema): - to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace) assert to_add.name assert to_add.space @@ -626,7 +625,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: class {class_name}(_SpecificUrn): ENTITY_TYPE: ClassVar[str] = "{entity_type}" - URN_PARTS: ClassVar[int] = {arg_count} + _URN_PARTS: ClassVar[int] = {arg_count} def __init__(self, {init_args}, *, _allow_coercion: bool = True) -> None: if _allow_coercion: @@ -640,8 +639,8 @@ def __init__(self, {init_args}, *, _allow_coercion: bool = True) -> None: @classmethod def _parse_ids(cls, entity_ids: List[str]) -> "{class_name}": - if len(entity_ids) != cls.URN_PARTS: - raise InvalidUrnError(f"{class_name} should have {{cls.URN_PARTS}} parts, got {{len(entity_ids)}}: {{entity_ids}}") + if len(entity_ids) != cls._URN_PARTS: + raise InvalidUrnError(f"{class_name} should have {{cls._URN_PARTS}} parts, got {{len(entity_ids)}}: {{entity_ids}}") return cls({parse_ids_mapping}, _allow_coercion=False) @classmethod diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index c6994dd6d5aa6..a8268c2f6eaf5 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -15,7 +15,7 @@ base_requirements = { # Our min version of typing_extensions is somewhat constrained by Airflow. - "typing_extensions>=3.10.0.2", + "typing_extensions>=4.2.0", # Actual dependencies. "typing-inspect", # pydantic 1.8.2 is incompatible with mypy 0.910. diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 7df007e087979..08817d9d5fdb9 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -10,7 +10,6 @@ List, Optional, Type, - TypeVar, Union, runtime_checkable, ) @@ -19,14 +18,12 @@ from cached_property import cached_property from pydantic import BaseModel, Extra, ValidationError from pydantic.fields import Field -from typing_extensions import Protocol +from typing_extensions import Protocol, Self from datahub.configuration._config_enum import ConfigEnum as ConfigEnum # noqa: I250 from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2 from datahub.utilities.dedup_list import deduplicate_list -_ConfigSelf = TypeVar("_ConfigSelf", bound="ConfigModel") - REDACT_KEYS = { "password", "token", @@ -109,7 +106,7 @@ def _schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None: schema_extra = _schema_extra @classmethod - def parse_obj_allow_extras(cls: Type[_ConfigSelf], obj: Any) -> _ConfigSelf: + def parse_obj_allow_extras(cls, obj: Any) -> Self: if PYDANTIC_VERSION_2: try: with unittest.mock.patch.dict( diff --git a/metadata-ingestion/src/datahub/ingestion/api/closeable.py b/metadata-ingestion/src/datahub/ingestion/api/closeable.py index 80a5008ed6368..7b8e1a36162c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/closeable.py +++ b/metadata-ingestion/src/datahub/ingestion/api/closeable.py @@ -1,9 +1,9 @@ from abc import abstractmethod from contextlib import AbstractContextManager from types import TracebackType -from typing import Optional, Type, TypeVar +from typing import Optional, Type -_Self = TypeVar("_Self", bound="Closeable") +from typing_extensions import Self class Closeable(AbstractContextManager): @@ -11,7 +11,7 @@ class Closeable(AbstractContextManager): def close(self) -> None: pass - def __enter__(self: _Self) -> _Self: + def __enter__(self) -> Self: # This method is mainly required for type checking. return self diff --git a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py index 3680546d307d9..c1a49ce82e6e0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py +++ b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py @@ -1,6 +1,8 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Any, Dict, NewType, Optional, Type, TypeVar +from typing import Any, Dict, NewType, Optional + +from typing_extensions import Self import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -17,9 +19,6 @@ class IngestionCheckpointingProviderConfig(ConfigModel): pass -_Self = TypeVar("_Self", bound="IngestionCheckpointingProviderBase") - - @dataclass() class IngestionCheckpointingProviderBase(StatefulCommittable[CheckpointJobStatesMap]): """ @@ -32,9 +31,7 @@ def __init__(self, name: str, commit_policy: CommitPolicy = CommitPolicy.ALWAYS) @classmethod @abstractmethod - def create( - cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext - ) -> "_Self": + def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> Self: pass @abstractmethod diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index ade2832f1b669..32810189acd00 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -42,7 +42,10 @@ def to_pure_python_obj(some_val: Any) -> Any: return some_val.as_obj() elif isinstance(some_val, pydantic.BaseModel): return Report.to_pure_python_obj(some_val.dict()) - elif dataclasses.is_dataclass(some_val): + elif dataclasses.is_dataclass(some_val) and not isinstance(some_val, type): + # The `is_dataclass` function returns `True` for both instances and classes. + # We need an extra check to ensure an instance was passed in. + # https://docs.python.org/3/library/dataclasses.html#dataclasses.is_dataclass return dataclasses.asdict(some_val) elif isinstance(some_val, list): return [Report.to_pure_python_obj(v) for v in some_val if v is not None] diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index 62feb7b5a02e6..655e6bb22fa8d 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -3,6 +3,8 @@ from dataclasses import dataclass, field from typing import Any, Generic, Optional, Type, TypeVar, cast +from typing_extensions import Self + from datahub.configuration.common import ConfigModel from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit @@ -79,7 +81,6 @@ def on_failure( SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True) SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True) -Self = TypeVar("Self", bound="Sink") class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta): @@ -90,7 +91,7 @@ class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta): report: SinkReportType @classmethod - def get_config_class(cls: Type[Self]) -> Type[SinkConfig]: + def get_config_class(cls) -> Type[SinkConfig]: config_class = get_class_from_annotation(cls, Sink, ConfigModel) assert config_class, "Sink subclasses must define a config class" return cast(Type[SinkConfig], config_class) @@ -112,7 +113,7 @@ def __post_init__(self) -> None: pass @classmethod - def create(cls: Type[Self], config_dict: dict, ctx: PipelineContext) -> "Self": + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Self": return cls(ctx, cls.get_config_class().parse_obj(config_dict)) def handle_work_unit_start(self, workunit: WorkUnit) -> None: diff --git a/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py b/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py index 7dadd16fb7f1c..7996fe0d7b89b 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py +++ b/metadata-ingestion/src/datahub/utilities/urns/_urn_base.py @@ -1,9 +1,10 @@ import functools import urllib.parse from abc import abstractmethod -from typing import ClassVar, Dict, List, Optional, Type, TypeVar +from typing import ClassVar, Dict, List, Optional, Type from deprecated import deprecated +from typing_extensions import Self from datahub.utilities.urns.error import InvalidUrnError @@ -42,9 +43,6 @@ def _split_entity_id(entity_id: str) -> List[str]: return parts -_UrnSelf = TypeVar("_UrnSelf", bound="Urn") - - @functools.total_ordering class Urn: """ @@ -88,7 +86,7 @@ def entity_ids(self) -> List[str]: return self._entity_ids @classmethod - def from_string(cls: Type[_UrnSelf], urn_str: str) -> "_UrnSelf": + def from_string(cls, urn_str: str) -> Self: """ Creates an Urn from its string representation. @@ -174,7 +172,7 @@ def __hash__(self) -> int: @classmethod @deprecated(reason="prefer .from_string") - def create_from_string(cls: Type[_UrnSelf], urn_str: str) -> "_UrnSelf": + def create_from_string(cls, urn_str: str) -> Self: return cls.from_string(urn_str) @deprecated(reason="prefer .entity_ids") @@ -270,5 +268,5 @@ def underlying_key_aspect_type(cls) -> Type: @classmethod @abstractmethod - def _parse_ids(cls: Type[_UrnSelf], entity_ids: List[str]) -> _UrnSelf: + def _parse_ids(cls, entity_ids: List[str]) -> Self: raise NotImplementedError()