Skip to content

Commit

Permalink
chore(tableau): set ingestion stage report and perftimers (#12234)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor authored Jan 9, 2025
1 parent 45450f1 commit 9d9a368
Show file tree
Hide file tree
Showing 27 changed files with 625 additions and 499 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ def as_obj(self) -> dict:
}

def compute_stats(self) -> None:
super().compute_stats()

duration = datetime.datetime.now() - self.start_time
workunits_produced = self.events_produced
if duration.total_seconds() > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

self.report.set_ingestion_stage("*", "View and Snapshot Lineage")
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)
with self.report.new_stage("*: View and Snapshot Lineage"):
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)

if self.config.use_queries_v2:
# if both usage and lineage are disabled then skip queries extractor piece
Expand All @@ -270,31 +270,29 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
):
return

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
top_n_queries=self.config.usage.top_n_queries,
region_qualifiers=self.config.region_qualifiers,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()

with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"):
with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
top_n_queries=self.config.usage.top_n_queries,
region_qualifiers=self.config.region_qualifiers,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,3 @@ class BigQueryV2Report(
num_skipped_external_table_lineage: int = 0

queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ def modified_base32decode(self, text_to_decode: str) -> str:
def get_project_workunits(
self, project: BigqueryProject
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION)
logger.info(f"Processing project: {project.id}")
yield from self._process_project(project)
with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"):
logger.info(f"Processing project: {project.id}")
yield from self._process_project(project)

def get_dataplatform_instance_aspect(
self, dataset_urn: str, project_id: str
Expand Down Expand Up @@ -405,11 +405,11 @@ def _process_project(

if self.config.is_profiling_enabled():
logger.info(f"Starting profiling project {project_id}")
self.report.set_ingestion_stage(project_id, PROFILING)
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
)
with self.report.new_stage(f"{project_id}: {PROFILING}"):
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
)

def _process_project_datasets(
self,
Expand Down Expand Up @@ -1203,9 +1203,9 @@ def get_tables_for_dataset(
report=self.report,
)

self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round(
timer.elapsed_seconds(), 2
)
self.report.metadata_extraction_sec[
f"{project_id}.{dataset.name}"
] = timer.elapsed_seconds(digits=2)

def get_core_table_details(
self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ def get_lineage_workunits(
projects = ["*"] # project_id not used when using exported metadata

for project in projects:
self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION)
yield from self.generate_lineage(
project,
table_refs,
)
with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"):
yield from self.generate_lineage(
project,
table_refs,
)

if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
Expand Down Expand Up @@ -368,8 +368,8 @@ def generate_lineage(
self.report.lineage_metadata_entries[project_id] = len(lineage)
logger.info(f"Built lineage map containing {len(lineage)} entries.")
logger.debug(f"lineage metadata is {lineage}")
self.report.lineage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)
self.report.lineage_mem_size[project_id] = humanfriendly.format_size(
memory_footprint.total_size(lineage)
Expand Down
114 changes: 57 additions & 57 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,62 +495,62 @@ def _ingest_events(
def _generate_operational_workunits(
self, usage_state: BigQueryUsageState, table_refs: Collection[str]
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS)
for audit_event in usage_state.standalone_events():
try:
operational_wu = self._create_operation_workunit(
audit_event, table_refs
)
if operational_wu:
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate operation workunit",
context=f"{audit_event}",
exc=e,
)
with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"):
for audit_event in usage_state.standalone_events():
try:
operational_wu = self._create_operation_workunit(
audit_event, table_refs
)
if operational_wu:
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate operation workunit",
context=f"{audit_event}",
exc=e,
)

def _generate_usage_workunits(
self, usage_state: BigQueryUsageState
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION)
top_n = (
self.config.usage.top_n_queries
if self.config.usage.include_top_n_queries
else 0
)
for entry in usage_state.usage_statistics(top_n=top_n):
try:
query_freq = [
(
self.uuid_to_query.get(
query_hash, usage_state.queries[query_hash]
),
count,
with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"):
top_n = (
self.config.usage.top_n_queries
if self.config.usage.include_top_n_queries
else 0
)
for entry in usage_state.usage_statistics(top_n=top_n):
try:
query_freq = [
(
self.uuid_to_query.get(
query_hash, usage_state.queries[query_hash]
),
count,
)
for query_hash, count in entry.query_freq
]
yield make_usage_workunit(
bucket_start_time=datetime.fromisoformat(entry.timestamp),
resource=BigQueryTableRef.from_string_name(entry.resource),
query_count=entry.query_count,
query_freq=query_freq,
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
queries_character_limit=self.config.usage.queries_character_limit,
)
self.report.num_usage_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate usage statistics workunit",
context=f"{entry.timestamp}, {entry.resource}",
exc=e,
)
for query_hash, count in entry.query_freq
]
yield make_usage_workunit(
bucket_start_time=datetime.fromisoformat(entry.timestamp),
resource=BigQueryTableRef.from_string_name(entry.resource),
query_count=entry.query_count,
query_freq=query_freq,
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
queries_character_limit=self.config.usage.queries_character_limit,
)
self.report.num_usage_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate usage statistics workunit",
context=f"{entry.timestamp}, {entry.resource}",
exc=e,
)

def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
if self.config.use_exported_bigquery_audit_metadata:
Expand All @@ -559,10 +559,10 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
for project_id in projects:
with PerfTimer() as timer:
try:
self.report.set_ingestion_stage(
project_id, USAGE_EXTRACTION_INGESTION
)
yield from self._get_parsed_bigquery_log_events(project_id)
with self.report.new_stage(
f"{project_id}: {USAGE_EXTRACTION_INGESTION}"
):
yield from self._get_parsed_bigquery_log_events(project_id)
except Exception as e:
self.report.usage_failed_extraction.append(project_id)
self.report.warning(
Expand All @@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
)
self.report_status(f"usage-extraction-{project_id}", False)

self.report.usage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)

def _store_usage_event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,30 @@ def get_workunits(
) -> Iterable[MetadataWorkUnit]:
for keyspace_name in cassandra_data.keyspaces:
tables = cassandra_data.tables.get(keyspace_name, [])
self.report.set_ingestion_stage(keyspace_name, PROFILING)
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)
with self.report.new_stage(f"{keyspace_name}: {PROFILING}"):
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)

def generate_profile(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
else:
raise KeyError(f"Unknown entity {ent_type}.")

def set_ingestion_stage(self, keyspace: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{keyspace}: {stage}")

# TODO Need to create seperate common config for profiling report
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,3 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

def set_ingestion_stage(self, dataset: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{dataset}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ def generate_profiles(
env=self.config.env,
platform_instance=self.config.platform_instance,
)
self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING)
yield from self.profiler.get_workunits(dataset_info, dataset_urn)
with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"):
yield from self.profiler.get_workunits(dataset_info, dataset_urn)

def generate_view_lineage(
self, dataset_urn: str, parents: List[str]
Expand Down
Loading

0 comments on commit 9d9a368

Please sign in to comment.