Skip to content

Commit

Permalink
fix(ingest/bigquery): All View generation when queries_v2 is turned off
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Dec 19, 2024
1 parent d0b4f7a commit bc291ea
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import functools
import logging
import os
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Set

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -255,10 +255,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

if self.config.use_queries_v2:
# Always ingest View and Snapshot lineage with schema ingestion
if self.config.include_view_lineage:
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")

yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
Expand All @@ -267,6 +265,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.bq_schema_extractor.snapshots_by_ref,
)

if self.config.use_queries_v2:
# if both usage and lineage are disabled then skip queries extractor piece
if (
not self.config.include_usage_statistics
Expand All @@ -276,13 +275,21 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

discovered_tables: Set[str] = set()
if self.config.include_table_lineage:
discovered_tables.update(self.bq_schema_extractor.table_refs)

if self.config.include_view_lineage:
discovered_tables.update(self.bq_schema_extractor.view_snapshot_refs)

with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_lineage=self.config.include_table_lineage
or self.config.include_view_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
top_n_queries=self.config.usage.top_n_queries,
Expand All @@ -292,24 +299,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
discovered_tables=discovered_tables,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()

else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
[p.id for p in projects],
self.bq_schema_extractor.table_refs.union(
self.bq_schema_extractor.view_snapshot_refs
),
)

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def __init__(

# Global store of table identifiers for lineage filtering
self.table_refs: Set[str] = set()
self.view_snapshot_refs: Set[str] = set()

# Maps project -> view_ref, so we can find all views in a project
self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set)
Expand Down Expand Up @@ -233,6 +234,14 @@ def store_table_refs(self):
or self.config.use_queries_v2
)

@property
def store_view_refs(self):
return (
self.config.include_view_lineage
or self.config.include_usage_statistics
or self.config.use_queries_v2
)

def modified_base32decode(self, text_to_decode: str) -> str:
# When we sync from DataHub to BigQuery, we encode the tags as modified base32 strings.
# BiqQuery labels only support lowercase letters, international characters, numbers, or underscores.
Expand Down Expand Up @@ -653,11 +662,11 @@ def _process_view(
self.report.report_dropped(table_identifier.raw_table_name())
return

if self.store_table_refs:
if self.store_view_refs:
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
self.view_snapshot_refs.add(table_ref)
if self.config.lineage_parse_view_ddl and view.view_definition:
self.view_refs_by_project[project_id].add(table_ref)
self.view_definitions[table_ref] = view.view_definition
Expand Down Expand Up @@ -701,11 +710,11 @@ def _process_snapshot(
f"Snapshot doesn't have any column or unable to get columns for snapshot: {table_identifier}"
)

if self.store_table_refs:
if self.store_view_refs:
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
self.view_snapshot_refs.add(table_ref)
if snapshot.base_table_identifier:
self.snapshot_refs_by_project[project_id].add(table_ref)
self.snapshots_by_ref[table_ref] = snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,23 +322,11 @@ def get_lineage_workunits_for_views_and_snapshots(
def get_lineage_workunits(
self,
projects: List[str],
view_refs_by_project: Dict[str, Set[str]],
view_definitions: FileBackedDict[str],
snapshot_refs_by_project: Dict[str, Set[str]],
snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot],
table_refs: Set[str],
) -> Iterable[MetadataWorkUnit]:
if not self._should_ingest_lineage():
return

yield from self.get_lineage_workunits_for_views_and_snapshots(
projects,
view_refs_by_project,
view_definitions,
snapshot_refs_by_project,
snapshots_by_ref,
)

if self.config.use_exported_bigquery_audit_metadata:
projects = ["*"] # project_id not used when using exported metadata

Expand Down

0 comments on commit bc291ea

Please sign in to comment.