diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 8c5a22e56771a0..4c313c4f2e3149 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -105,7 +105,6 @@ TagAssociationClass, ViewPropertiesClass, ) -from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator from datahub.telemetry import telemetry from datahub.utilities.registries.domain_registry import DomainRegistry @@ -338,22 +337,19 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str) ) self.views_failed_parsing: Set[str] = set() - self.schema_resolver: SchemaResolver = SchemaResolver( - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) + self.discovered_datasets: Set[str] = set() self.aggregator = SqlParsingAggregator( platform=self.platform, platform_instance=self.config.platform_instance, env=self.config.env, - schema_resolver=self.schema_resolver, graph=self.ctx.graph, generate_lineage=self.include_lineage, generate_usage_statistics=False, generate_operations=False, + eager_graph_load=False, ) + self.report.sql_aggregator = self.aggregator.report @classmethod def test_connection(cls, config_dict: dict) -> TestConnectionReport: @@ -1056,6 +1052,7 @@ def _process_view( self.config.platform_instance, self.config.env, ) + try: columns = inspector.get_columns(view, schema) except KeyError: @@ -1083,20 +1080,25 @@ def _process_view( view_definition = self._get_view_definition(inspector, schema, view) properties["view_definition"] = view_definition - db_name = self.get_db_name(inspector) if view_definition and self.config.include_view_lineage: + default_db = None + default_schema = None + try: + default_db, default_schema = self.get_db_schema(dataset_name) + except ValueError: + logger.warning(f"Invalid view identifier: {dataset_name}") self.aggregator.add_view_definition( view_urn=dataset_urn, view_definition=view_definition, - default_db=db_name, - default_schema=schema, + default_db=default_db, + default_schema=default_schema, ) dataset_snapshot = DatasetSnapshot( urn=dataset_urn, aspects=[StatusClass(removed=False)], ) - + db_name = self.get_db_name(inspector) yield from self.add_table_to_schema_container( dataset_urn=dataset_urn, db_name=db_name, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py index c445ce44a91449..785972b88a49d7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py @@ -5,6 +5,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalSourceReport, ) +from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport from datahub.utilities.lossy_collections import LossyList from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport from datahub.utilities.stats_collections import TopKDict, int_top_k_dict @@ -52,6 +53,7 @@ class SQLSourceReport( num_view_definitions_failed_parsing: int = 0 num_view_definitions_failed_column_parsing: int = 0 view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList) + sql_aggregator: Optional[SqlAggregatorReport] = None def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: """