From 3b00fd76469bc971311a215f63f07c35b6b894f8 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 26 Nov 2024 21:02:28 -0600 Subject: [PATCH 1/3] fix(shadowJar): fix shadowJar (#11968) --- .github/workflows/check-datahub-jars.yml | 1 + .../java/datahub-protobuf/scripts/check_jar.sh | 4 +++- metadata-integration/java/datahub-schematron/lib/build.gradle | 4 ---- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/check-datahub-jars.yml b/.github/workflows/check-datahub-jars.yml index becf8126dc45ba..7a49f32729ec1f 100644 --- a/.github/workflows/check-datahub-jars.yml +++ b/.github/workflows/check-datahub-jars.yml @@ -40,4 +40,5 @@ jobs: - name: check ${{ matrix.command }} jar run: | ./gradlew :metadata-integration:java:${{ matrix.command }}:build --info + ./gradlew :metadata-integration:java:${{ matrix.command }}:checkShadowJar ./gradlew :metadata-integration:java:${{ matrix.command }}:javadoc diff --git a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh index bd0c28f0f86988..66c70f0b857692 100755 --- a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh @@ -44,7 +44,9 @@ jar -tvf $jarFile |\ grep -v "mime.types" |\ grep -v "com/ibm/.*" |\ grep -v "org/glassfish/" |\ - grep -v "LICENSE" + grep -v "LICENSE" |\ + grep -v "org/apache/avro" |\ + grep -v "org/apache" if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" diff --git a/metadata-integration/java/datahub-schematron/lib/build.gradle b/metadata-integration/java/datahub-schematron/lib/build.gradle index 83dec1039f7be0..3ba22ff4cb7b5d 100644 --- a/metadata-integration/java/datahub-schematron/lib/build.gradle +++ b/metadata-integration/java/datahub-schematron/lib/build.gradle @@ -45,10 +45,6 @@ jacocoTestReport { test.finalizedBy jacocoTestReport -task checkShadowJar(type: Exec) { - commandLine 'sh', '-c', 'scripts/check_jar.sh' -} - configurations { provided implementation.extendsFrom provided From b6ccb8c9bf72608fa902e7ee694f3a19cb20f72f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 26 Nov 2024 22:20:50 -0500 Subject: [PATCH 2/3] fix(ingest): ensure sentry is initialized with graph tags (#11949) --- .../src/datahub/ingestion/graph/client.py | 2 ++ .../src/datahub/ingestion/run/pipeline.py | 9 +++--- .../src/datahub/telemetry/telemetry.py | 32 +++++++++++++------ 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 759aebcfd46b0a..4aa937639e9590 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -67,6 +67,7 @@ SystemMetadataClass, TelemetryClientIdClass, ) +from datahub.telemetry.telemetry import telemetry_instance from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.str_enum import StrEnum from datahub.utilities.urns.urn import Urn, guess_entity_type @@ -1819,4 +1820,5 @@ def get_default_graph() -> DataHubGraph: graph_config = config_utils.load_client_config() graph = DataHubGraph(graph_config) graph.test_connection() + telemetry_instance.set_context(server=graph) return graph diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 7c3a42c3e08931..667129ff83584a 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -44,7 +44,8 @@ ) from datahub.ingestion.transformer.transform_registry import transform_registry from datahub.metadata.schema_classes import MetadataChangeProposalClass -from datahub.telemetry import stats, telemetry +from datahub.telemetry import stats +from datahub.telemetry.telemetry import telemetry_instance from datahub.utilities._custom_package_loader import model_version_name from datahub.utilities.global_warning_util import ( clear_global_warnings, @@ -273,8 +274,9 @@ def __init__( if self.graph is None and isinstance(self.sink, DatahubRestSink): with _add_init_error_context("setup default datahub client"): self.graph = self.sink.emitter.to_graph() + self.graph.test_connection() self.ctx.graph = self.graph - telemetry.telemetry_instance.update_capture_exception_context(server=self.graph) + telemetry_instance.set_context(server=self.graph) with set_graph_context(self.graph): with _add_init_error_context("configure reporters"): @@ -615,7 +617,7 @@ def log_ingestion_stats(self) -> None: sink_warnings = len(self.sink.get_report().warnings) global_warnings = len(get_global_warnings()) - telemetry.telemetry_instance.ping( + telemetry_instance.ping( "ingest_stats", { "source_type": self.source_type, @@ -637,7 +639,6 @@ def log_ingestion_stats(self) -> None: ), "has_pipeline_name": bool(self.config.pipeline_name), }, - self.ctx.graph, ) def _approx_all_vals(self, d: LossyList[Any]) -> int: diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 4faf04ee2d2c76..22b2cb6a101af9 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -7,7 +7,7 @@ import uuid from functools import wraps from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar from mixpanel import Consumer, Mixpanel from typing_extensions import ParamSpec @@ -16,10 +16,12 @@ from datahub.cli.config_utils import DATAHUB_ROOT_FOLDER from datahub.cli.env_utils import get_boolean_env_variable from datahub.configuration.common import ExceptionWithProps -from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import _custom_package_path from datahub.utilities.perf_timer import PerfTimer +if TYPE_CHECKING: + from datahub.ingestion.graph.client import DataHubGraph + logger = logging.getLogger(__name__) DATAHUB_FOLDER = Path(DATAHUB_ROOT_FOLDER) @@ -117,7 +119,11 @@ class Telemetry: tracking_init: bool = False sentry_enabled: bool = False + context_properties: Dict[str, Any] = {} + def __init__(self): + self.context_properties = {} + if SENTRY_DSN: self.sentry_enabled = True try: @@ -157,6 +163,9 @@ def __init__(self): except Exception as e: logger.debug(f"Error connecting to mixpanel: {e}") + # Initialize the default properties for all events. + self.set_context() + def update_config(self) -> bool: """ Update the config file with the current client ID and enabled status. @@ -238,18 +247,22 @@ def load_config(self) -> bool: return False - def update_capture_exception_context( + def set_context( self, - server: Optional[DataHubGraph] = None, + server: Optional["DataHubGraph"] = None, properties: Optional[Dict[str, Any]] = None, ) -> None: + self.context_properties = { + **self._server_props(server), + **(properties or {}), + } + if self.sentry_enabled: from sentry_sdk import set_tag properties = { **_default_telemetry_properties(), - **self._server_props(server), - **(properties or {}), + **self.context_properties, } for key in properties: @@ -297,7 +310,6 @@ def ping( self, event_name: str, properties: Optional[Dict[str, Any]] = None, - server: Optional[DataHubGraph] = None, ) -> None: """ Send a single telemetry event. @@ -323,14 +335,15 @@ def ping( properties = { **_default_telemetry_properties(), - **self._server_props(server), + **self.context_properties, **properties, } self.mp.track(self.client_id, event_name, properties) except Exception as e: logger.debug(f"Error reporting telemetry: {e}") - def _server_props(self, server: Optional[DataHubGraph]) -> Dict[str, str]: + @classmethod + def _server_props(cls, server: Optional["DataHubGraph"]) -> Dict[str, str]: if not server: return { "server_type": "n/a", @@ -435,6 +448,7 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: **call_props, "status": "error", **_error_props(e), + "code": e.code, }, ) telemetry_instance.capture_exception(e) From 48d711b19873794a423d2bd63034e959a23615d0 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 27 Nov 2024 16:49:01 +0530 Subject: [PATCH 3/3] fix(ingest): more error handling (#11969) --- .../datahub/ingestion/source/gc/datahub_gc.py | 27 +++++++++++++++---- .../source/gc/dataprocess_cleanup.py | 4 ++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index c4b4186f45fc38..52807ca2a3f026 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -144,15 +144,32 @@ def get_workunits_internal( self, ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: - self.revoke_expired_tokens() + try: + self.revoke_expired_tokens() + except Exception as e: + self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: - self.truncate_indices() + try: + self.truncate_indices() + except Exception as e: + self.report.failure("While trying to truncate indices ", exc=e) if self.dataprocess_cleanup: - yield from self.dataprocess_cleanup.get_workunits_internal() + try: + yield from self.dataprocess_cleanup.get_workunits_internal() + except Exception as e: + self.report.failure("While trying to cleanup data process ", exc=e) if self.soft_deleted_entities_cleanup: - self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + try: + self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + except Exception as e: + self.report.failure( + "While trying to cleanup soft deleted entities ", exc=e + ) if self.execution_request_cleanup: - self.execution_request_cleanup.run() + try: + self.execution_request_cleanup.run() + except Exception as e: + self.report.failure("While trying to cleanup execution request ", exc=e) yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 130f2c9c2e12fc..0f35e1a67fede7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -404,7 +404,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: self.delete_dpi_from_datajobs(datajob_entity) except Exception as e: - logger.error(f"While trying to delete {datajob_entity} got {e}") + self.report.failure( + f"While trying to delete {datajob_entity} ", exc=e + ) if ( datajob_entity.total_runs == 0 and self.config.delete_empty_data_jobs