Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add parse_ts_millis helper #12231

Merged
merged 4 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from pydantic import Field
Expand All @@ -10,6 +10,7 @@
CircuitBreakerConfig,
)
from datahub.api.graphql import Assertion, Operation
from datahub.emitter.mce_builder import parse_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,7 +50,7 @@
if not operations:
return None
else:
return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000)
return parse_ts_millis(operations[0]["lastUpdatedTimestamp"])

def _check_if_assertion_failed(
self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None
Expand Down Expand Up @@ -93,7 +94,7 @@
logger.info(f"Found successful assertion: {assertion_urn}")
result = False
if last_updated is not None:
last_run = datetime.fromtimestamp(last_assertion.time / 1000)
last_run = parse_ts_millis(last_assertion.time)
if last_updated > last_run:
logger.error(
f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}"
Expand All @@ -117,7 +118,7 @@
)

if not last_updated:
last_updated = datetime.now() - self.config.time_delta
last_updated = datetime.now(tz=timezone.utc) - self.config.time_delta

Check warning on line 121 in metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py#L121

Added line #L121 was not covered by tests
logger.info(
f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}"
)
Expand Down
18 changes: 17 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import re
import time
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -103,6 +103,22 @@
return int(ts.timestamp() * 1000)


@overload
def parse_ts_millis(ts: float) -> datetime:
...

Check warning on line 108 in metadata-ingestion/src/datahub/emitter/mce_builder.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/mce_builder.py#L108

Added line #L108 was not covered by tests


@overload
def parse_ts_millis(ts: None) -> None:
...

Check warning on line 113 in metadata-ingestion/src/datahub/emitter/mce_builder.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/mce_builder.py#L113

Added line #L113 was not covered by tests


def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
if ts is None:
return None
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc)


def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
Expand Down
9 changes: 2 additions & 7 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pydantic.main import BaseModel

from datahub.cli.env_utils import get_boolean_env_variable
from datahub.emitter.enum_helpers import get_enum_options
from datahub.emitter.mce_builder import (
ALL_ENV_TYPES,
Aspect,
datahub_guid,
make_container_urn,
Expand All @@ -25,7 +25,6 @@
ContainerClass,
DomainsClass,
EmbedClass,
FabricTypeClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
Expand Down Expand Up @@ -206,11 +205,7 @@ def gen_containers(
# Extra validation on the env field.
# In certain cases (mainly for backwards compatibility), the env field will actually
# have a platform instance name.
env = (
container_key.env
if container_key.env in get_enum_options(FabricTypeClass)
else None
)
env = container_key.env if container_key.env in ALL_ENV_TYPES else None

container_urn = container_key.as_urn()

Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
from typing import Any, Dict, List, Optional, Sequence, Union

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
Expand Down Expand Up @@ -75,7 +75,7 @@ def _add_patch(
# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))

def build(self) -> Iterable[MetadataChangeProposalClass]:
def build(self) -> List[MetadataChangeProposalClass]:
return [
MetadataChangeProposalClass(
entityUrn=self.urn,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union

import requests
from deprecated import deprecated
Expand Down Expand Up @@ -288,7 +288,7 @@ def emit_mcp(

def emit_mcps(
self,
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Dict,
Expand All @@ -14,7 +13,7 @@
)

from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mce_builder import make_dataplatform_instance_urn, parse_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -479,10 +478,7 @@ def auto_empty_dataset_usage_statistics(
if invalid_timestamps:
logger.warning(
f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n"
", ".join(
str(datetime.fromtimestamp(ts / 1000, tz=timezone.utc))
for ts in invalid_timestamps
)
", ".join(str(parse_ts_millis(ts)) for ts in invalid_timestamps)
)

for bucket in bucket_timestamps:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, Optional

Expand All @@ -15,6 +15,7 @@
TimePartitioningType,
)

from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels
Expand Down Expand Up @@ -393,13 +394,7 @@ def _make_bigquery_table(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
)
if table.get("last_altered") is not None
else None
),
last_altered=parse_ts_millis(table.get("last_altered")),
size_in_bytes=table.get("bytes"),
rows_count=table.get("row_count"),
comment=table.comment,
Expand Down Expand Up @@ -460,11 +455,7 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
return BigqueryView(
name=view.table_name,
created=view.created,
last_altered=(
datetime.fromtimestamp(view.get("last_altered") / 1000, tz=timezone.utc)
if view.get("last_altered") is not None
else None
),
last_altered=(parse_ts_millis(view.get("last_altered"))),
comment=view.comment,
view_definition=view.view_definition,
materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW,
Expand Down Expand Up @@ -705,13 +696,7 @@ def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapsh
return BigqueryTableSnapshot(
name=snapshot.table_name,
created=snapshot.created,
last_altered=(
datetime.fromtimestamp(
snapshot.get("last_altered") / 1000, tz=timezone.utc
)
if snapshot.get("last_altered") is not None
else None
),
last_altered=parse_ts_millis(snapshot.get("last_altered")),
comment=snapshot.comment,
ddl=snapshot.ddl,
snapshot_time=snapshot.snapshot_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from confluent_kafka.schema_registry.avro import AvroDeserializer

from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.emitter.mce_builder import parse_ts_millis

Check warning on line 15 in metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py#L15

Added line #L15 was not covered by tests
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
Expand Down Expand Up @@ -92,7 +93,7 @@
if mcl.created and mcl.created.time > stop_time.timestamp() * 1000:
logger.info(
f"Stopped reading from kafka, reached MCL "
f"with audit stamp {datetime.fromtimestamp(mcl.created.time / 1000)}"
f"with audit stamp {parse_ts_millis(mcl.created.time)}"
)
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
parse_ts_millis,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -245,11 +248,7 @@ def is_dataset_eligible_for_profiling(
# If profiling state exists we have to carry over to the new state
self.state_handler.add_to_state(dataset_urn, last_profiled)

threshold_time: Optional[datetime] = (
datetime.fromtimestamp(last_profiled / 1000, timezone.utc)
if last_profiled
else None
)
threshold_time: Optional[datetime] = parse_ts_millis(last_profiled)
if (
not threshold_time
and self.config.profiling.profile_if_updated_since_days is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pydantic

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.metadata.schema_classes import (
DatahubIngestionCheckpointClass,
IngestionCheckpointStateClass,
Expand Down Expand Up @@ -144,7 +145,7 @@ def create_from_checkpoint_aspect(
)
logger.info(
f"Successfully constructed last checkpoint state for job {job_name} "
f"with timestamp {datetime.fromtimestamp(checkpoint_aspect.timestampMillis/1000, tz=timezone.utc)}"
f"with timestamp {parse_ts_millis(checkpoint_aspect.timestampMillis)}"
)
return checkpoint
return None
Expand Down
35 changes: 8 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import dataclasses
import logging
from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Union, cast
from unittest.mock import patch

Expand All @@ -27,6 +27,7 @@
from databricks.sdk.service.workspace import ObjectType

import datahub
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.source.unity.hive_metastore_proxy import HiveMetastoreProxy
from datahub.ingestion.source.unity.proxy_profiling import (
UnityCatalogProxyProfilingMixin,
Expand Down Expand Up @@ -211,16 +212,8 @@ def workspace_notebooks(self) -> Iterable[Notebook]:
id=obj.object_id,
path=obj.path,
language=obj.language,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
modified_at=(
datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc)
if obj.modified_at
else None
),
created_at=parse_ts_millis(obj.created_at),
modified_at=parse_ts_millis(obj.modified_at),
)

def query_history(
Expand Down Expand Up @@ -452,17 +445,9 @@ def _create_table(
properties=obj.properties or {},
owner=obj.owner,
generation=obj.generation,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
created_at=(parse_ts_millis(obj.created_at) if obj.created_at else None),
created_by=obj.created_by,
updated_at=(
datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc)
if obj.updated_at
else None
),
updated_at=(parse_ts_millis(obj.updated_at) if obj.updated_at else None),
updated_by=obj.updated_by,
table_id=obj.table_id,
comment=obj.comment,
Expand Down Expand Up @@ -500,12 +485,8 @@ def _create_query(self, info: QueryInfo) -> Optional[Query]:
query_id=info.query_id,
query_text=info.query_text,
statement_type=info.statement_type,
start_time=datetime.fromtimestamp(
info.query_start_time_ms / 1000, tz=timezone.utc
),
end_time=datetime.fromtimestamp(
info.query_end_time_ms / 1000, tz=timezone.utc
),
start_time=parse_ts_millis(info.query_start_time_ms),
end_time=parse_ts_millis(info.query_end_time_ms),
user_id=info.user_id,
user_name=info.user_name,
executed_as_user_id=info.executed_as_user_id,
Expand Down
11 changes: 8 additions & 3 deletions metadata-ingestion/src/datahub/utilities/time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import datetime

from datahub.emitter.mce_builder import make_ts_millis, parse_ts_millis


def get_current_time_in_seconds() -> int:
Expand All @@ -9,12 +11,15 @@ def get_current_time_in_seconds() -> int:

def ts_millis_to_datetime(ts_millis: int) -> datetime:
"""Converts input timestamp in milliseconds to a datetime object with UTC timezone"""
return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc)
return parse_ts_millis(ts_millis)


def datetime_to_ts_millis(dt: datetime) -> int:
"""Converts a datetime object to timestamp in milliseconds"""
return int(round(dt.timestamp() * 1000))
# TODO: Deprecate these helpers in favor of make_ts_millis and parse_ts_millis.
# The other ones support None with a typing overload.
# Also possibly move those helpers to this file.
return make_ts_millis(dt)


@dataclass
Expand Down
Loading
Loading