Skip to content

Commit

Permalink
fix: added sql-aggregator-report
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Dec 26, 2024
1 parent 3814a60 commit dfe821d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
24 changes: 13 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
TagAssociationClass,
ViewPropertiesClass,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.telemetry import telemetry
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand Down Expand Up @@ -338,22 +337,19 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str)
)

self.views_failed_parsing: Set[str] = set()
self.schema_resolver: SchemaResolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)

self.discovered_datasets: Set[str] = set()
self.aggregator = SqlParsingAggregator(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
schema_resolver=self.schema_resolver,
graph=self.ctx.graph,
generate_lineage=self.include_lineage,
generate_usage_statistics=False,
generate_operations=False,
eager_graph_load=False,
)
self.report.sql_aggregator = self.aggregator.report

@classmethod
def test_connection(cls, config_dict: dict) -> TestConnectionReport:
Expand Down Expand Up @@ -1056,6 +1052,7 @@ def _process_view(
self.config.platform_instance,
self.config.env,
)

try:
columns = inspector.get_columns(view, schema)
except KeyError:
Expand Down Expand Up @@ -1083,20 +1080,25 @@ def _process_view(

view_definition = self._get_view_definition(inspector, schema, view)
properties["view_definition"] = view_definition
db_name = self.get_db_name(inspector)
if view_definition and self.config.include_view_lineage:
default_db = None
default_schema = None
try:
default_db, default_schema = self.get_db_schema(dataset_name)
except ValueError:
logger.warning(f"Invalid view identifier: {dataset_name}")
self.aggregator.add_view_definition(
view_urn=dataset_urn,
view_definition=view_definition,
default_db=db_name,
default_schema=schema,
default_db=default_db,
default_schema=default_schema,
)

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)

db_name = self.get_db_name(inspector)
yield from self.add_table_to_schema_container(
dataset_urn=dataset_urn,
db_name=db_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
from datahub.utilities.stats_collections import TopKDict, int_top_k_dict
Expand Down Expand Up @@ -52,6 +53,7 @@ class SQLSourceReport(
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList)
sql_aggregator: Optional[SqlAggregatorReport] = None

def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
"""
Expand Down

0 comments on commit dfe821d

Please sign in to comment.