diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index df59cae3fad232..cc1a07020e8d18 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -3,7 +3,7 @@ import re import time from collections import OrderedDict -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from functools import lru_cache from typing import ( @@ -117,6 +117,7 @@ ) from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo from datahub.ingestion.source.tableau.tableau_validation import check_user_role +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -169,6 +170,8 @@ create_lineage_sql_parsed_result, ) from datahub.utilities import config_clean +from datahub.utilities.perf_timer import PerfTimer +from datahub.utilities.stats_collections import TopKDict from datahub.utilities.urns.dataset_urn import DatasetUrn try: @@ -636,12 +639,27 @@ class SiteIdContentUrl: site_content_url: str -class TableauSourceReport(StaleEntityRemovalSourceReport): +class TableauSourceReport( + StaleEntityRemovalSourceReport, + IngestionStageReport, +): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 num_datasource_field_skipped_no_name: int = 0 num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 + # timers + extract_usage_stats_timer: Dict[str, float] = field(default_factory=TopKDict) + fetch_groups_timer: Dict[str, float] = field(default_factory=TopKDict) + populate_database_server_hostname_map_timer: Dict[str, float] = field(default_factory=TopKDict) + populate_projects_registry_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_workbooks_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_sheets_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_dashboards_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_embedded_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_published_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_custom_sql_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_upstream_tables_timer: Dict[str, float] = field(default_factory=TopKDict) # lineage num_tables_with_upstream_lineage: int = 0 num_upstream_table_lineage: int = 0 @@ -3457,33 +3475,64 @@ def _create_workbook_properties( return {"permissions": json.dumps(groups)} if len(groups) > 0 else None def ingest_tableau_site(self): + self.report.report_ingestion_stage_start(f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}") + # Initialise the dictionary to later look-up for chart and dashboard stat if self.config.extract_usage_stats: - self._populate_usage_stat_registry() + with PerfTimer() as timer: + self._populate_usage_stat_registry() + self.report.extract_usage_stats_timer[self.site_id] = round(timer.elapsed_seconds(), 2) if self.config.permission_ingestion: - self._fetch_groups() + with PerfTimer() as timer: + self._fetch_groups() + self.report.fetch_groups_timer[self.site_id] = round(timer.elapsed_seconds(), 2) # Populate the map of database names and database hostnames to be used later to map # databases to platform instances. if self.config.database_hostname_to_platform_instance_map: - self._populate_database_server_hostname_map() + with PerfTimer() as timer: + self._populate_database_server_hostname_map() + self.report.populate_database_server_hostname_map_timer[self.site_id] = round(timer.elapsed_seconds(), 2) - self._populate_projects_registry() + with PerfTimer() as timer: + self._populate_projects_registry() + self.report.populate_projects_registry_timer[self.site_id] = round(timer.elapsed_seconds(), 2) if self.config.add_site_container: yield from self.emit_site_container() yield from self.emit_project_containers() - yield from self.emit_workbooks() + + with PerfTimer() as timer: + yield from self.emit_workbooks() + self.report.emit_workbooks_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.sheet_ids: - yield from self.emit_sheets() + with PerfTimer() as timer: + yield from self.emit_sheets() + self.report.emit_sheets_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.dashboard_ids: - yield from self.emit_dashboards() + with PerfTimer() as timer: + yield from self.emit_dashboards() + self.report.emit_dashboards_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.embedded_datasource_ids_being_used: - yield from self.emit_embedded_datasources() + with PerfTimer() as timer: + yield from self.emit_embedded_datasources() + self.report.emit_embedded_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.datasource_ids_being_used: - yield from self.emit_published_datasources() + with PerfTimer() as timer: + yield from self.emit_published_datasources() + self.report.emit_published_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.custom_sql_ids_being_used: - yield from self.emit_custom_sql_datasources() + with PerfTimer() as timer: + yield from self.emit_custom_sql_datasources() + self.report.emit_custom_sql_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.database_tables: - yield from self.emit_upstream_tables() + with PerfTimer() as timer: + yield from self.emit_upstream_tables() + self.report.emit_upstream_tables_timer[self.site_id] = round(timer.elapsed_seconds(), 2)