Skip to content

Commit

Permalink
Merge branch 'master' into platform-events-business-attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgarg-visa authored Dec 27, 2024
2 parents e8a054c + 4e3103e commit fe5f2cc
Show file tree
Hide file tree
Showing 44 changed files with 196 additions and 217 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/dagster-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ jobs:
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.9", "3.10"]
python-version: ["3.9", "3.11"]
include:
- python-version: "3.9"
extraPythonRequirement: "dagster>=1.3.3"
- python-version: "3.10"
- python-version: "3.11"
extraPythonRequirement: "dagster>=1.3.3"
fail-fast: false
steps:
Expand All @@ -57,7 +57,7 @@ jobs:
if: always()
run: source metadata-ingestion-modules/dagster-plugin/venv/bin/activate && uv pip freeze
- uses: actions/upload-artifact@v4
if: ${{ always() && matrix.python-version == '3.10' && matrix.extraPythonRequirement == 'dagster>=1.3.3' }}
if: ${{ always() && matrix.python-version == '3.11' && matrix.extraPythonRequirement == 'dagster>=1.3.3' }}
with:
name: Test Results (dagster Plugin ${{ matrix.python-version}})
path: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
# DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
strategy:
matrix:
python-version: ["3.8", "3.10"]
python-version: ["3.8", "3.11"]
command:
[
"testQuick",
Expand All @@ -43,7 +43,7 @@ jobs:
]
include:
- python-version: "3.8"
- python-version: "3.10"
- python-version: "3.11"
fail-fast: false
steps:
- name: Free up disk space
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/prefect-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.11"]
fail-fast: false
steps:
- name: Set up JDK 17
Expand All @@ -52,7 +52,7 @@ jobs:
if: always()
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && uv pip freeze
- uses: actions/upload-artifact@v4
if: ${{ always() && matrix.python-version == '3.10'}}
if: ${{ always() && matrix.python-version == '3.11'}}
with:
name: Test Results (Prefect Plugin ${{ matrix.python-version}})
path: |
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ def get_long_description():
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion-modules/dagster-plugin/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Datahub Dagster Plugin

See the DataHub Dagster docs for details.

See the [DataHub Dagster docs](https://datahubproject.io/docs/lineage/dagster/) for details.
3 changes: 0 additions & 3 deletions metadata-ingestion-modules/dagster-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ def get_long_description():
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion-modules/gx-plugin/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Datahub GX Plugin

See the DataHub GX docs for details.

See the [DataHub GX docs](https://datahubproject.io/docs/metadata-ingestion/integration_docs/great-expectations) for details.
3 changes: 0 additions & 3 deletions metadata-ingestion-modules/gx-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ def get_long_description():
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/prefect-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The `prefect-datahub` collection allows you to easily integrate DataHub's metada

## Prerequisites

- Python 3.7+
- Python 3.8+
- Prefect 2.0.0+ and < 3.0.0+
- A running instance of DataHub

Expand Down
6 changes: 1 addition & 5 deletions metadata-ingestion-modules/prefect-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ def get_long_description():
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand All @@ -120,7 +116,7 @@ def get_long_description():
],
# Package info.
zip_safe=False,
python_requires=">=3.7",
python_requires=">=3.8",
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="./src"),
entry_points=entry_points,
Expand Down
7 changes: 3 additions & 4 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def merge_schemas(schemas_obj: List[dict]) -> str:
# Patch add_name method to NOT complain about duplicate names.
class NamesWithDups(avro.schema.Names):
def add_name(self, name_attr, space_attr, new_schema):

to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
assert to_add.name
assert to_add.space
Expand Down Expand Up @@ -626,7 +625,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str:
class {class_name}(_SpecificUrn):
ENTITY_TYPE: ClassVar[str] = "{entity_type}"
URN_PARTS: ClassVar[int] = {arg_count}
_URN_PARTS: ClassVar[int] = {arg_count}
def __init__(self, {init_args}, *, _allow_coercion: bool = True) -> None:
if _allow_coercion:
Expand All @@ -640,8 +639,8 @@ def __init__(self, {init_args}, *, _allow_coercion: bool = True) -> None:
@classmethod
def _parse_ids(cls, entity_ids: List[str]) -> "{class_name}":
if len(entity_ids) != cls.URN_PARTS:
raise InvalidUrnError(f"{class_name} should have {{cls.URN_PARTS}} parts, got {{len(entity_ids)}}: {{entity_ids}}")
if len(entity_ids) != cls._URN_PARTS:
raise InvalidUrnError(f"{class_name} should have {{cls._URN_PARTS}} parts, got {{len(entity_ids)}}: {{entity_ids}}")
return cls({parse_ids_mapping}, _allow_coercion=False)
@classmethod
Expand Down
12 changes: 5 additions & 7 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

base_requirements = {
# Our min version of typing_extensions is somewhat constrained by Airflow.
"typing_extensions>=3.10.0.2",
"typing_extensions>=4.2.0",
# Actual dependencies.
"typing-inspect",
# pydantic 1.8.2 is incompatible with mypy 0.910.
Expand Down Expand Up @@ -298,8 +298,8 @@
}

data_lake_profiling = {
"pydeequ~=1.1.0",
"pyspark~=3.3.0",
"pydeequ>=1.1.0",
"pyspark~=3.5.0",
}

delta_lake = {
Expand All @@ -318,7 +318,7 @@
# 0.1.11 appears to have authentication issues with azure databricks
# 0.22.0 has support for `include_browse` in metadata list apis
"databricks-sdk>=0.30.0",
"pyspark~=3.3.0",
"pyspark~=3.5.0",
"requests",
# Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes
# Version 3.0.0 required SQLAlchemy > 2.0.21
Expand Down Expand Up @@ -874,9 +874,6 @@
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand Down Expand Up @@ -917,6 +914,7 @@
"sync-file-emitter",
"sql-parser",
"iceberg",
"feast",
}
else set()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from pydantic import Field
Expand All @@ -10,6 +10,7 @@
CircuitBreakerConfig,
)
from datahub.api.graphql import Assertion, Operation
from datahub.emitter.mce_builder import parse_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,7 +50,7 @@ def get_last_updated(self, urn: str) -> Optional[datetime]:
if not operations:
return None
else:
return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000)
return parse_ts_millis(operations[0]["lastUpdatedTimestamp"])

def _check_if_assertion_failed(
self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None
Expand Down Expand Up @@ -93,7 +94,7 @@ class AssertionResult:
logger.info(f"Found successful assertion: {assertion_urn}")
result = False
if last_updated is not None:
last_run = datetime.fromtimestamp(last_assertion.time / 1000)
last_run = parse_ts_millis(last_assertion.time)
if last_updated > last_run:
logger.error(
f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}"
Expand All @@ -117,7 +118,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool:
)

if not last_updated:
last_updated = datetime.now() - self.config.time_delta
last_updated = datetime.now(tz=timezone.utc) - self.config.time_delta
logger.info(
f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}"
)
Expand Down
7 changes: 2 additions & 5 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
List,
Optional,
Type,
TypeVar,
Union,
runtime_checkable,
)
Expand All @@ -19,14 +18,12 @@
from cached_property import cached_property
from pydantic import BaseModel, Extra, ValidationError
from pydantic.fields import Field
from typing_extensions import Protocol
from typing_extensions import Protocol, Self

from datahub.configuration._config_enum import ConfigEnum as ConfigEnum # noqa: I250
from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2
from datahub.utilities.dedup_list import deduplicate_list

_ConfigSelf = TypeVar("_ConfigSelf", bound="ConfigModel")

REDACT_KEYS = {
"password",
"token",
Expand Down Expand Up @@ -109,7 +106,7 @@ def _schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None:
schema_extra = _schema_extra

@classmethod
def parse_obj_allow_extras(cls: Type[_ConfigSelf], obj: Any) -> _ConfigSelf:
def parse_obj_allow_extras(cls, obj: Any) -> Self:
if PYDANTIC_VERSION_2:
try:
with unittest.mock.patch.dict(
Expand Down
18 changes: 17 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import re
import time
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -103,6 +103,22 @@ def make_ts_millis(ts: Optional[datetime]) -> Optional[int]:
return int(ts.timestamp() * 1000)


@overload
def parse_ts_millis(ts: float) -> datetime:
...


@overload
def parse_ts_millis(ts: None) -> None:
...


def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
if ts is None:
return None
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc)


def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
Expand Down
9 changes: 2 additions & 7 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pydantic.main import BaseModel

from datahub.cli.env_utils import get_boolean_env_variable
from datahub.emitter.enum_helpers import get_enum_options
from datahub.emitter.mce_builder import (
ALL_ENV_TYPES,
Aspect,
datahub_guid,
make_container_urn,
Expand All @@ -25,7 +25,6 @@
ContainerClass,
DomainsClass,
EmbedClass,
FabricTypeClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
Expand Down Expand Up @@ -206,11 +205,7 @@ def gen_containers(
# Extra validation on the env field.
# In certain cases (mainly for backwards compatibility), the env field will actually
# have a platform instance name.
env = (
container_key.env
if container_key.env in get_enum_options(FabricTypeClass)
else None
)
env = container_key.env if container_key.env in ALL_ENV_TYPES else None

container_urn = container_key.as_urn()

Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
from typing import Any, Dict, List, Optional, Sequence, Union

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
Expand Down Expand Up @@ -75,7 +75,7 @@ def _add_patch(
# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))

def build(self) -> Iterable[MetadataChangeProposalClass]:
def build(self) -> List[MetadataChangeProposalClass]:
return [
MetadataChangeProposalClass(
entityUrn=self.urn,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union

import requests
from deprecated import deprecated
Expand Down Expand Up @@ -288,7 +288,7 @@ def emit_mcp(

def emit_mcps(
self,
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
Expand Down
Loading

0 comments on commit fe5f2cc

Please sign in to comment.