diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 75dc980e234ac..53cb1b0ecad4e 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -334,6 +334,8 @@ def as_obj(self) -> dict: } def compute_stats(self) -> None: + super().compute_stats() + duration = datetime.datetime.now() - self.start_time workunits_produced = self.events_produced if duration.total_seconds() > 0: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index db7b0540e49e7..508b4bbaa277d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -253,14 +253,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for project in projects: yield from self.bq_schema_extractor.get_project_workunits(project) - self.report.set_ingestion_stage("*", "View and Snapshot Lineage") - yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( - [p.id for p in projects], - self.bq_schema_extractor.view_refs_by_project, - self.bq_schema_extractor.view_definitions, - self.bq_schema_extractor.snapshot_refs_by_project, - self.bq_schema_extractor.snapshots_by_ref, - ) + with self.report.new_stage("*: View and Snapshot Lineage"): + yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( + [p.id for p in projects], + self.bq_schema_extractor.view_refs_by_project, + self.bq_schema_extractor.view_definitions, + self.bq_schema_extractor.snapshot_refs_by_project, + self.bq_schema_extractor.snapshots_by_ref, + ) if self.config.use_queries_v2: # if both usage and lineage are disabled then skip queries extractor piece @@ -270,31 +270,29 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ): return - self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - - with BigQueryQueriesExtractor( - connection=self.config.get_bigquery_client(), - schema_api=self.bq_schema_extractor.schema_api, - config=BigQueryQueriesExtractorConfig( - window=self.config, - user_email_pattern=self.config.usage.user_email_pattern, - include_lineage=self.config.include_table_lineage, - include_usage_statistics=self.config.include_usage_statistics, - include_operations=self.config.usage.include_operational_stats, - include_queries=self.config.include_queries, - include_query_usage_statistics=self.config.include_query_usage_statistics, - top_n_queries=self.config.usage.top_n_queries, - region_qualifiers=self.config.region_qualifiers, - ), - structured_report=self.report, - filters=self.filters, - identifiers=self.identifiers, - schema_resolver=self.sql_parser_schema_resolver, - discovered_tables=self.bq_schema_extractor.table_refs, - ) as queries_extractor: - self.report.queries_extractor = queries_extractor.report - yield from queries_extractor.get_workunits_internal() - + with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"): + with BigQueryQueriesExtractor( + connection=self.config.get_bigquery_client(), + schema_api=self.bq_schema_extractor.schema_api, + config=BigQueryQueriesExtractorConfig( + window=self.config, + user_email_pattern=self.config.usage.user_email_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_statistics, + include_operations=self.config.usage.include_operational_stats, + include_queries=self.config.include_queries, + include_query_usage_statistics=self.config.include_query_usage_statistics, + top_n_queries=self.config.usage.top_n_queries, + region_qualifiers=self.config.region_qualifiers, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=self.sql_parser_schema_resolver, + discovered_tables=self.bq_schema_extractor.table_refs, + ) as queries_extractor: + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() else: if self.config.include_usage_statistics: yield from self.usage_extractor.get_usage_workunits( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 06842da67f76c..8e55d81aac5fe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -190,6 +190,3 @@ class BigQueryV2Report( num_skipped_external_table_lineage: int = 0 queries_extractor: Optional[BigQueryQueriesExtractorReport] = None - - def set_ingestion_stage(self, project_id: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{project_id}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index bc2688e6b481a..56e930dfb811f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -248,9 +248,9 @@ def modified_base32decode(self, text_to_decode: str) -> str: def get_project_workunits( self, project: BigqueryProject ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION) - logger.info(f"Processing project: {project.id}") - yield from self._process_project(project) + with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"): + logger.info(f"Processing project: {project.id}") + yield from self._process_project(project) def get_dataplatform_instance_aspect( self, dataset_urn: str, project_id: str @@ -405,11 +405,11 @@ def _process_project( if self.config.is_profiling_enabled(): logger.info(f"Starting profiling project {project_id}") - self.report.set_ingestion_stage(project_id, PROFILING) - yield from self.profiler.get_workunits( - project_id=project_id, - tables=db_tables, - ) + with self.report.new_stage(f"{project_id}: {PROFILING}"): + yield from self.profiler.get_workunits( + project_id=project_id, + tables=db_tables, + ) def _process_project_datasets( self, @@ -1203,9 +1203,9 @@ def get_tables_for_dataset( report=self.report, ) - self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round( - timer.elapsed_seconds(), 2 - ) + self.report.metadata_extraction_sec[ + f"{project_id}.{dataset.name}" + ] = timer.elapsed_seconds(digits=2) def get_core_table_details( self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index ba3357aa8ca20..433282a21fdb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -330,11 +330,11 @@ def get_lineage_workunits( projects = ["*"] # project_id not used when using exported metadata for project in projects: - self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION) - yield from self.generate_lineage( - project, - table_refs, - ) + with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"): + yield from self.generate_lineage( + project, + table_refs, + ) if self.redundant_run_skip_handler: # Update the checkpoint state for this run. @@ -368,8 +368,8 @@ def generate_lineage( self.report.lineage_metadata_entries[project_id] = len(lineage) logger.info(f"Built lineage map containing {len(lineage)} entries.") logger.debug(f"lineage metadata is {lineage}") - self.report.lineage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds( + digits=2 ) self.report.lineage_mem_size[project_id] = humanfriendly.format_size( memory_footprint.total_size(lineage) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 876ffab85ba31..f2f6cc731858d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -495,62 +495,62 @@ def _ingest_events( def _generate_operational_workunits( self, usage_state: BigQueryUsageState, table_refs: Collection[str] ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS) - for audit_event in usage_state.standalone_events(): - try: - operational_wu = self._create_operation_workunit( - audit_event, table_refs - ) - if operational_wu: - yield operational_wu - self.report.num_operational_stats_workunits_emitted += 1 - except Exception as e: - self.report.warning( - message="Unable to generate operation workunit", - context=f"{audit_event}", - exc=e, - ) + with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"): + for audit_event in usage_state.standalone_events(): + try: + operational_wu = self._create_operation_workunit( + audit_event, table_refs + ) + if operational_wu: + yield operational_wu + self.report.num_operational_stats_workunits_emitted += 1 + except Exception as e: + self.report.warning( + message="Unable to generate operation workunit", + context=f"{audit_event}", + exc=e, + ) def _generate_usage_workunits( self, usage_state: BigQueryUsageState ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION) - top_n = ( - self.config.usage.top_n_queries - if self.config.usage.include_top_n_queries - else 0 - ) - for entry in usage_state.usage_statistics(top_n=top_n): - try: - query_freq = [ - ( - self.uuid_to_query.get( - query_hash, usage_state.queries[query_hash] - ), - count, + with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"): + top_n = ( + self.config.usage.top_n_queries + if self.config.usage.include_top_n_queries + else 0 + ) + for entry in usage_state.usage_statistics(top_n=top_n): + try: + query_freq = [ + ( + self.uuid_to_query.get( + query_hash, usage_state.queries[query_hash] + ), + count, + ) + for query_hash, count in entry.query_freq + ] + yield make_usage_workunit( + bucket_start_time=datetime.fromisoformat(entry.timestamp), + resource=BigQueryTableRef.from_string_name(entry.resource), + query_count=entry.query_count, + query_freq=query_freq, + user_freq=entry.user_freq, + column_freq=entry.column_freq, + bucket_duration=self.config.bucket_duration, + resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref, + top_n_queries=self.config.usage.top_n_queries, + format_sql_queries=self.config.usage.format_sql_queries, + queries_character_limit=self.config.usage.queries_character_limit, + ) + self.report.num_usage_workunits_emitted += 1 + except Exception as e: + self.report.warning( + message="Unable to generate usage statistics workunit", + context=f"{entry.timestamp}, {entry.resource}", + exc=e, ) - for query_hash, count in entry.query_freq - ] - yield make_usage_workunit( - bucket_start_time=datetime.fromisoformat(entry.timestamp), - resource=BigQueryTableRef.from_string_name(entry.resource), - query_count=entry.query_count, - query_freq=query_freq, - user_freq=entry.user_freq, - column_freq=entry.column_freq, - bucket_duration=self.config.bucket_duration, - resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref, - top_n_queries=self.config.usage.top_n_queries, - format_sql_queries=self.config.usage.format_sql_queries, - queries_character_limit=self.config.usage.queries_character_limit, - ) - self.report.num_usage_workunits_emitted += 1 - except Exception as e: - self.report.warning( - message="Unable to generate usage statistics workunit", - context=f"{entry.timestamp}, {entry.resource}", - exc=e, - ) def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: if self.config.use_exported_bigquery_audit_metadata: @@ -559,10 +559,10 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: for project_id in projects: with PerfTimer() as timer: try: - self.report.set_ingestion_stage( - project_id, USAGE_EXTRACTION_INGESTION - ) - yield from self._get_parsed_bigquery_log_events(project_id) + with self.report.new_stage( + f"{project_id}: {USAGE_EXTRACTION_INGESTION}" + ): + yield from self._get_parsed_bigquery_log_events(project_id) except Exception as e: self.report.usage_failed_extraction.append(project_id) self.report.warning( @@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: ) self.report_status(f"usage-extraction-{project_id}", False) - self.report.usage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 + self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds( + digits=2 ) def _store_usage_event( diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py index d8ab62f1d6d91..7bf1d66f618a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py @@ -70,30 +70,30 @@ def get_workunits( ) -> Iterable[MetadataWorkUnit]: for keyspace_name in cassandra_data.keyspaces: tables = cassandra_data.tables.get(keyspace_name, []) - self.report.set_ingestion_stage(keyspace_name, PROFILING) - with ThreadPoolExecutor( - max_workers=self.config.profiling.max_workers - ) as executor: - future_to_dataset = { - executor.submit( - self.generate_profile, - keyspace_name, - table_name, - cassandra_data.columns.get(table_name, []), - ): table_name - for table_name in tables - } - for future in as_completed(future_to_dataset): - table_name = future_to_dataset[future] - try: - yield from future.result() - except Exception as exc: - self.report.profiling_skipped_other[table_name] += 1 - self.report.failure( - message="Failed to profile for table", - context=f"{keyspace_name}.{table_name}", - exc=exc, - ) + with self.report.new_stage(f"{keyspace_name}: {PROFILING}"): + with ThreadPoolExecutor( + max_workers=self.config.profiling.max_workers + ) as executor: + future_to_dataset = { + executor.submit( + self.generate_profile, + keyspace_name, + table_name, + cassandra_data.columns.get(table_name, []), + ): table_name + for table_name in tables + } + for future in as_completed(future_to_dataset): + table_name = future_to_dataset[future] + try: + yield from future.result() + except Exception as exc: + self.report.profiling_skipped_other[table_name] += 1 + self.report.failure( + message="Failed to profile for table", + context=f"{keyspace_name}.{table_name}", + exc=exc, + ) def generate_profile( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py index 41d4ac7ced603..75a0ba0c61773 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py @@ -54,9 +54,6 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: else: raise KeyError(f"Unknown entity {ent_type}.") - def set_ingestion_stage(self, keyspace: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{keyspace}: {stage}") - # TODO Need to create seperate common config for profiling report profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) profiling_skipped_table_profile_pattern: TopKDict[str, int] = field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py index c8eb035461ca1..9712d4ddc6799 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py @@ -45,6 +45,3 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: self.views_scanned += 1 else: raise KeyError(f"Unknown entity {ent_type}.") - - def set_ingestion_stage(self, dataset: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{dataset}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index 319290d25169a..6d34e86be6282 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -472,8 +472,8 @@ def generate_profiles( env=self.config.env, platform_instance=self.config.platform_instance, ) - self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING) - yield from self.profiler.get_workunits(dataset_info, dataset_urn) + with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"): + yield from self.profiler.get_workunits(dataset_info, dataset_urn) def generate_view_lineage( self, dataset_urn: str, parents: List[str] diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 443368e6d8b4f..b4cc5423277c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -141,40 +141,36 @@ def get_workunits_internal( ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: try: - self.report.report_ingestion_stage_start("Expired Token Cleanup") - self.revoke_expired_tokens() + with self.report.new_stage("Expired Token Cleanup"): + self.revoke_expired_tokens() except Exception as e: self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: try: - self.report.report_ingestion_stage_start("Truncate Indices") - self.truncate_indices() + with self.report.new_stage("Truncate Indices"): + self.truncate_indices() except Exception as e: self.report.failure("While trying to truncate indices ", exc=e) if self.config.soft_deleted_entities_cleanup.enabled: try: - self.report.report_ingestion_stage_start( - "Soft Deleted Entities Cleanup" - ) - self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + with self.report.new_stage("Soft Deleted Entities Cleanup"): + self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() except Exception as e: self.report.failure( "While trying to cleanup soft deleted entities ", exc=e ) if self.config.dataprocess_cleanup.enabled: try: - self.report.report_ingestion_stage_start("Data Process Cleanup") - yield from self.dataprocess_cleanup.get_workunits_internal() + with self.report.new_stage("Data Process Cleanup"): + yield from self.dataprocess_cleanup.get_workunits_internal() except Exception as e: self.report.failure("While trying to cleanup data process ", exc=e) if self.config.execution_request_cleanup.enabled: try: - self.report.report_ingestion_stage_start("Execution request Cleanup") - self.execution_request_cleanup.run() + with self.report.new_stage("Execution request Cleanup"): + self.execution_request_cleanup.run() except Exception as e: self.report.failure("While trying to cleanup execution request ", exc=e) - # Otherwise last stage's duration does not get calculated. - self.report.report_ingestion_stage_start("End") yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 49f7941563c1a..5371017a2a321 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -423,10 +423,10 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit database = self.config.database logger.info(f"Processing db {database}") - self.report.report_ingestion_stage_start(METADATA_EXTRACTION) - self.db_tables[database] = defaultdict() - self.db_views[database] = defaultdict() - self.db_schemas.setdefault(database, {}) + with self.report.new_stage(METADATA_EXTRACTION): + self.db_tables[database] = defaultdict() + self.db_views[database] = defaultdict() + self.db_schemas.setdefault(database, {}) # TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping # this fallback. For now, this gets us broad coverage quickly. @@ -462,12 +462,12 @@ def _extract_metadata( self.process_schemas(connection, database) ) - self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage_v2( - connection=connection, - database=database, - lineage_extractor=lineage_extractor, - ) + with self.report.new_stage(LINEAGE_EXTRACTION): + yield from self.extract_lineage_v2( + connection=connection, + database=database, + lineage_extractor=lineage_extractor, + ) all_tables = self.get_all_tables() else: @@ -480,25 +480,25 @@ def _extract_metadata( or self.config.include_view_lineage or self.config.include_copy_lineage ): - self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage( - connection=connection, all_tables=all_tables, database=database - ) + with self.report.new_stage(LINEAGE_EXTRACTION): + yield from self.extract_lineage( + connection=connection, all_tables=all_tables, database=database + ) - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_INGESTION) if self.config.include_usage_statistics: - yield from self.extract_usage( - connection=connection, all_tables=all_tables, database=database - ) + with self.report.new_stage(USAGE_EXTRACTION_INGESTION): + yield from self.extract_usage( + connection=connection, all_tables=all_tables, database=database + ) if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start(PROFILING) - profiler = RedshiftProfiler( - config=self.config, - report=self.report, - state_handler=self.profiling_state_handler, - ) - yield from profiler.get_workunits(self.db_tables) + with self.report.new_stage(PROFILING): + profiler = RedshiftProfiler( + config=self.config, + report=self.report, + state_handler=self.profiling_state_handler, + ) + yield from profiler.get_workunits(self.db_tables) def process_schemas(self, connection, database): for schema in self.data_dictionary.get_schemas( @@ -633,8 +633,8 @@ def process_schema( else: logger.info("View processing disabled, skipping") - self.report.metadata_extraction_sec[report_key] = round( - timer.elapsed_seconds(), 2 + self.report.metadata_extraction_sec[report_key] = timer.elapsed_seconds( + digits=2 ) def _process_table( @@ -986,9 +986,7 @@ def extract_usage( yield from usage_extractor.get_usage_workunits(all_tables=all_tables) - self.report.usage_extraction_sec[database] = round( - timer.elapsed_seconds(), 2 - ) + self.report.usage_extraction_sec[database] = timer.elapsed_seconds(digits=2) def extract_lineage( self, @@ -1011,8 +1009,8 @@ def extract_lineage( database=database, connection=connection, all_tables=all_tables ) - self.report.lineage_extraction_sec[f"{database}"] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds( + digits=2 ) yield from self.generate_lineage( database, lineage_extractor=lineage_extractor @@ -1042,8 +1040,8 @@ def extract_lineage_v2( yield from lineage_extractor.generate() - self.report.lineage_extraction_sec[f"{database}"] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds( + digits=2 ) if self.redundant_lineage_run_skip_handler: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index e0bf8b23dd0f7..d66a1ee18be40 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -182,38 +182,38 @@ def _get_workunits_internal( self.report.num_operational_stats_filtered = 0 if self.config.include_operational_stats: - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_OPERATIONAL_STATS) - with PerfTimer() as timer: - # Generate operation aspect workunits - yield from self._gen_operation_aspect_workunits( - self.connection, all_tables - ) - self.report.operational_metadata_extraction_sec[ - self.config.database - ] = round(timer.elapsed_seconds(), 2) + with self.report.new_stage(USAGE_EXTRACTION_OPERATIONAL_STATS): + with PerfTimer() as timer: + # Generate operation aspect workunits + yield from self._gen_operation_aspect_workunits( + self.connection, all_tables + ) + self.report.operational_metadata_extraction_sec[ + self.config.database + ] = timer.elapsed_seconds(digits=2) # Generate aggregate events - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION) - query: str = self.queries.usage_query( - start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT), - end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT), - database=self.config.database, - ) - access_events_iterable: Iterable[ - RedshiftAccessEvent - ] = self._gen_access_events_from_history_query( - query, connection=self.connection, all_tables=all_tables - ) + with self.report.new_stage(USAGE_EXTRACTION_USAGE_AGGREGATION): + query: str = self.queries.usage_query( + start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT), + end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT), + database=self.config.database, + ) + access_events_iterable: Iterable[ + RedshiftAccessEvent + ] = self._gen_access_events_from_history_query( + query, connection=self.connection, all_tables=all_tables + ) - aggregated_events: AggregatedAccessEvents = self._aggregate_access_events( - access_events_iterable - ) - # Generate usage workunits from aggregated events. - for time_bucket in aggregated_events.values(): - for aggregate in time_bucket.values(): - wu: MetadataWorkUnit = self._make_usage_stat(aggregate) - self.report.num_usage_workunits_emitted += 1 - yield wu + aggregated_events: AggregatedAccessEvents = self._aggregate_access_events( + access_events_iterable + ) + # Generate usage workunits from aggregated events. + for time_bucket in aggregated_events.values(): + for aggregate in time_bucket.values(): + wu: MetadataWorkUnit = self._make_usage_stat(aggregate) + self.report.num_usage_workunits_emitted += 1 + yield wu def _gen_operation_aspect_workunits( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index 030b2d43be81f..b24471f8666af 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -166,6 +166,3 @@ def _is_tag_scanned(self, tag_name: str) -> bool: def report_tag_processed(self, tag_name: str) -> None: self._processed_tags.add(tag_name) - - def set_ingestion_stage(self, database: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{database}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 8a1bf15b7a7bc..6f09c26b08da2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -216,21 +216,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: for snowflake_db in self.databases: - self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION) - yield from self._process_database(snowflake_db) + with self.report.new_stage( + f"{snowflake_db.name}: {METADATA_EXTRACTION}" + ): + yield from self._process_database(snowflake_db) - self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE) - discovered_tables: List[str] = [ - self.identifiers.get_dataset_identifier( - table_name, schema.name, db.name - ) - for db in self.databases - for schema in db.schemas - for table_name in schema.tables - ] - if self.aggregator: - for entry in self._external_tables_ddl_lineage(discovered_tables): - self.aggregator.add(entry) + with self.report.new_stage(f"*: {EXTERNAL_TABLE_DDL_LINEAGE}"): + discovered_tables: List[str] = [ + self.identifiers.get_dataset_identifier( + table_name, schema.name, db.name + ) + for db in self.databases + for schema in db.schemas + for table_name in schema.tables + ] + if self.aggregator: + for entry in self._external_tables_ddl_lineage(discovered_tables): + self.aggregator.add(entry) except SnowflakePermissionError as e: self.structured_reporter.failure( @@ -332,8 +334,8 @@ def _process_database( yield from self._process_db_schemas(snowflake_db, db_tables) if self.profiler and db_tables: - self.report.set_ingestion_stage(snowflake_db.name, PROFILING) - yield from self.profiler.get_workunits(snowflake_db, db_tables) + with self.report.new_stage(f"{snowflake_db.name}: {PROFILING}"): + yield from self.profiler.get_workunits(snowflake_db, db_tables) def _process_db_schemas( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 4bdf559f293b5..85e4071aec07d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -146,59 +146,58 @@ def get_usage_workunits( if not self._should_ingest_usage(): return - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION) - if self.report.edition == SnowflakeEdition.STANDARD.value: - logger.info( - "Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported." - ) - return + with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"): + if self.report.edition == SnowflakeEdition.STANDARD.value: + logger.info( + "Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported." + ) + return - logger.info("Checking usage date ranges") + logger.info("Checking usage date ranges") - self._check_usage_date_ranges() + self._check_usage_date_ranges() - # If permission error, execution returns from here - if ( - self.report.min_access_history_time is None - or self.report.max_access_history_time is None - ): - return + # If permission error, execution returns from here + if ( + self.report.min_access_history_time is None + or self.report.max_access_history_time is None + ): + return - # NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation - # Now, we report the usage as well as operation metadata even if user email is absent + # NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation + # Now, we report the usage as well as operation metadata even if user email is absent - if self.config.include_usage_stats: - yield from auto_empty_dataset_usage_statistics( - self._get_workunits_internal(discovered_datasets), - config=BaseTimeWindowConfig( - start_time=self.start_time, - end_time=self.end_time, - bucket_duration=self.config.bucket_duration, - ), - dataset_urns={ - self.identifiers.gen_dataset_urn(dataset_identifier) - for dataset_identifier in discovered_datasets - }, - ) + if self.config.include_usage_stats: + yield from auto_empty_dataset_usage_statistics( + self._get_workunits_internal(discovered_datasets), + config=BaseTimeWindowConfig( + start_time=self.start_time, + end_time=self.end_time, + bucket_duration=self.config.bucket_duration, + ), + dataset_urns={ + self.identifiers.gen_dataset_urn(dataset_identifier) + for dataset_identifier in discovered_datasets + }, + ) - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS) + with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"): + if self.config.include_operational_stats: + # Generate the operation workunits. + access_events = self._get_snowflake_history() + for event in access_events: + yield from self._get_operation_aspect_work_unit( + event, discovered_datasets + ) - if self.config.include_operational_stats: - # Generate the operation workunits. - access_events = self._get_snowflake_history() - for event in access_events: - yield from self._get_operation_aspect_work_unit( - event, discovered_datasets + if self.redundant_run_skip_handler: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + self.config.start_time, + self.config.end_time, + self.config.bucket_duration, ) - if self.redundant_run_skip_handler: - # Update the checkpoint state for this run. - self.redundant_run_skip_handler.update_state( - self.config.start_time, - self.config.end_time, - self.config.bucket_duration, - ) - def _get_workunits_internal( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: @@ -386,7 +385,7 @@ def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]: ) self.report_status(USAGE_EXTRACTION_OPERATIONAL_STATS, False) return - self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2) + self.report.access_history_query_secs = timer.elapsed_seconds(digits=2) for row in results: yield from self._process_snowflake_history_row(row) @@ -434,8 +433,8 @@ def _check_usage_date_ranges(self) -> None: self.report.max_access_history_time = db_row["MAX_TIME"].astimezone( tz=timezone.utc ) - self.report.access_history_range_query_secs = round( - timer.elapsed_seconds(), 2 + self.report.access_history_range_query_secs = timer.elapsed_seconds( + digits=2 ) def _get_operation_aspect_work_unit( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index aede3d056709a..c0385a8d5af30 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -480,8 +480,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: identifiers=self.identifiers, ) - self.report.set_ingestion_stage("*", METADATA_EXTRACTION) - yield from schema_extractor.get_workunits_internal() + with self.report.new_stage(f"*: {METADATA_EXTRACTION}"): + yield from schema_extractor.get_workunits_internal() databases = schema_extractor.databases @@ -513,47 +513,46 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: discovered_datasets = discovered_tables + discovered_views if self.config.use_queries_v2: - self.report.set_ingestion_stage("*", VIEW_PARSING) - yield from auto_workunit(self.aggregator.gen_metadata()) - - self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - - schema_resolver = self.aggregator._schema_resolver - - queries_extractor = SnowflakeQueriesExtractor( - connection=self.connection, - config=SnowflakeQueriesExtractorConfig( - window=self.config, - temporary_tables_pattern=self.config.temporary_tables_pattern, - include_lineage=self.config.include_table_lineage, - include_usage_statistics=self.config.include_usage_stats, - include_operations=self.config.include_operational_stats, - include_queries=self.config.include_queries, - include_query_usage_statistics=self.config.include_query_usage_statistics, - user_email_pattern=self.config.user_email_pattern, - ), - structured_report=self.report, - filters=self.filters, - identifiers=self.identifiers, - schema_resolver=schema_resolver, - discovered_tables=discovered_datasets, - graph=self.ctx.graph, - ) + with self.report.new_stage(f"*: {VIEW_PARSING}"): + yield from auto_workunit(self.aggregator.gen_metadata()) - # TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs - # but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, - # it should be pretty straightforward to refactor this and only initialize the aggregator once. - self.report.queries_extractor = queries_extractor.report - yield from queries_extractor.get_workunits_internal() - queries_extractor.close() + with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"): + schema_resolver = self.aggregator._schema_resolver + + queries_extractor = SnowflakeQueriesExtractor( + connection=self.connection, + config=SnowflakeQueriesExtractorConfig( + window=self.config, + temporary_tables_pattern=self.config.temporary_tables_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_stats, + include_operations=self.config.include_operational_stats, + include_queries=self.config.include_queries, + include_query_usage_statistics=self.config.include_query_usage_statistics, + user_email_pattern=self.config.user_email_pattern, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=schema_resolver, + discovered_tables=discovered_datasets, + graph=self.ctx.graph, + ) + + # TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs + # but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, + # it should be pretty straightforward to refactor this and only initialize the aggregator once. + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() + queries_extractor.close() else: if self.lineage_extractor: - self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION) - self.lineage_extractor.add_time_based_lineage_to_aggregator( - discovered_tables=discovered_tables, - discovered_views=discovered_views, - ) + with self.report.new_stage(f"*: {LINEAGE_EXTRACTION}"): + self.lineage_extractor.add_time_based_lineage_to_aggregator( + discovered_tables=discovered_tables, + discovered_views=discovered_views, + ) # This would emit view and external table ddl lineage # as well as query lineage via lineage_extractor diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e42564975c3d1..5b76fe41d92e9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -878,7 +878,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit urns = self.schema_resolver.get_urns() if self.config.include_table_lineage or self.config.include_usage_statistics: - self.report.report_ingestion_stage_start("audit log extraction") - yield from self.get_audit_log_mcps(urns=urns) + with self.report.new_stage("Audit log extraction"): + yield from self.get_audit_log_mcps(urns=urns) yield from self.builder.gen_workunits() diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index d149402741e82..2543cbe653ba7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -118,6 +118,7 @@ ) from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo from datahub.ingestion.source.tableau.tableau_validation import check_user_role +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -170,6 +171,8 @@ create_lineage_sql_parsed_result, ) from datahub.utilities import config_clean +from datahub.utilities.perf_timer import PerfTimer +from datahub.utilities.stats_collections import TopKDict from datahub.utilities.urns.dataset_urn import DatasetUrn try: @@ -643,12 +646,41 @@ class SiteIdContentUrl: @dataclass -class TableauSourceReport(StaleEntityRemovalSourceReport): +class TableauSourceReport( + StaleEntityRemovalSourceReport, + IngestionStageReport, +): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 num_datasource_field_skipped_no_name: int = 0 num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 + # timers + extract_usage_stats_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + fetch_groups_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + populate_database_server_hostname_map_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + populate_projects_registry_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_workbooks_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_sheets_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_dashboards_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_embedded_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_published_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_custom_sql_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_upstream_tables_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) # lineage num_tables_with_upstream_lineage: int = 0 num_upstream_table_lineage: int = 0 @@ -660,6 +692,7 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 num_hidden_assets_skipped: int = 0 logged_in_user: List[UserInfo] = dataclass_field(default_factory=list) + last_authenticated_at: Optional[datetime] = None num_expected_tableau_metadata_queries: int = 0 @@ -834,6 +867,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: platform=self.platform, ) yield from site_source.ingest_tableau_site() + except MetadataQueryException as md_exception: self.report.failure( title="Failed to Retrieve Tableau Metadata", @@ -3489,33 +3523,87 @@ def _create_workbook_properties( return {"permissions": json.dumps(groups)} if len(groups) > 0 else None def ingest_tableau_site(self): - # Initialise the dictionary to later look-up for chart and dashboard stat - if self.config.extract_usage_stats: - self._populate_usage_stat_registry() - - if self.config.permission_ingestion: - self._fetch_groups() - - # Populate the map of database names and database hostnames to be used later to map - # databases to platform instances. - if self.config.database_hostname_to_platform_instance_map: - self._populate_database_server_hostname_map() - - self._populate_projects_registry() - - if self.config.add_site_container: - yield from self.emit_site_container() - yield from self.emit_project_containers() - yield from self.emit_workbooks() - if self.sheet_ids: - yield from self.emit_sheets() - if self.dashboard_ids: - yield from self.emit_dashboards() - if self.embedded_datasource_ids_being_used: - yield from self.emit_embedded_datasources() - if self.datasource_ids_being_used: - yield from self.emit_published_datasources() - if self.custom_sql_ids_being_used: - yield from self.emit_custom_sql_datasources() - if self.database_tables: - yield from self.emit_upstream_tables() + with self.report.new_stage( + f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}" + ): + # Initialise the dictionary to later look-up for chart and dashboard stat + if self.config.extract_usage_stats: + with PerfTimer() as timer: + self._populate_usage_stat_registry() + self.report.extract_usage_stats_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.config.permission_ingestion: + with PerfTimer() as timer: + self._fetch_groups() + self.report.fetch_groups_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + # Populate the map of database names and database hostnames to be used later to map + # databases to platform instances. + if self.config.database_hostname_to_platform_instance_map: + with PerfTimer() as timer: + self._populate_database_server_hostname_map() + self.report.populate_database_server_hostname_map_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + with PerfTimer() as timer: + self._populate_projects_registry() + self.report.populate_projects_registry_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.config.add_site_container: + yield from self.emit_site_container() + yield from self.emit_project_containers() + + with PerfTimer() as timer: + yield from self.emit_workbooks() + self.report.emit_workbooks_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.sheet_ids: + with PerfTimer() as timer: + yield from self.emit_sheets() + self.report.emit_sheets_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.dashboard_ids: + with PerfTimer() as timer: + yield from self.emit_dashboards() + self.report.emit_dashboards_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.embedded_datasource_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_embedded_datasources() + self.report.emit_embedded_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.datasource_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_published_datasources() + self.report.emit_published_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.custom_sql_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_custom_sql_datasources() + self.report.emit_custom_sql_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.database_tables: + with PerfTimer() as timer: + yield from self.emit_upstream_tables() + self.report.emit_upstream_tables_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9d9a746580f93..43bd788f809c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -263,86 +263,86 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - self.report.report_ingestion_stage_start("Ingestion Setup") - wait_on_warehouse = None - if self.config.include_hive_metastore: - self.report.report_ingestion_stage_start("Start warehouse") - # Can take several minutes, so start now and wait later - wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() - if wait_on_warehouse is None: - self.report.report_failure( - "initialization", - f"SQL warehouse {self.config.profiling.warehouse_id} not found", - ) - return - else: - # wait until warehouse is started - wait_on_warehouse.result() + with self.report.new_stage("Ingestion Setup"): + wait_on_warehouse = None + if self.config.include_hive_metastore: + with self.report.new_stage("Start warehouse"): + # Can take several minutes, so start now and wait later + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() if self.config.include_ownership: - self.report.report_ingestion_stage_start("Ingest service principals") - self.build_service_principal_map() - self.build_groups_map() + with self.report.new_stage("Ingest service principals"): + self.build_service_principal_map() + self.build_groups_map() if self.config.include_notebooks: - self.report.report_ingestion_stage_start("Ingest notebooks") - yield from self.process_notebooks() + with self.report.new_stage("Ingest notebooks"): + yield from self.process_notebooks() yield from self.process_metastores() yield from self.get_view_lineage() if self.config.include_notebooks: - self.report.report_ingestion_stage_start("Notebook lineage") - for notebook in self.notebooks.values(): - wu = self._gen_notebook_lineage(notebook) - if wu: - yield wu + with self.report.new_stage("Notebook lineage"): + for notebook in self.notebooks.values(): + wu = self._gen_notebook_lineage(notebook) + if wu: + yield wu if self.config.include_usage_statistics: - self.report.report_ingestion_stage_start("Ingest usage") - usage_extractor = UnityCatalogUsageExtractor( - config=self.config, - report=self.report, - proxy=self.unity_catalog_api_proxy, - table_urn_builder=self.gen_dataset_urn, - user_urn_builder=self.gen_user_urn, - ) - yield from usage_extractor.get_usage_workunits( - self.table_refs | self.view_refs - ) - - if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start("Start warehouse") - # Need to start the warehouse again for profiling, - # as it may have been stopped after ingestion might take - # longer time to complete - wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() - if wait_on_warehouse is None: - self.report.report_failure( - "initialization", - f"SQL warehouse {self.config.profiling.warehouse_id} not found", + with self.report.new_stage("Ingest usage"): + usage_extractor = UnityCatalogUsageExtractor( + config=self.config, + report=self.report, + proxy=self.unity_catalog_api_proxy, + table_urn_builder=self.gen_dataset_urn, + user_urn_builder=self.gen_user_urn, + ) + yield from usage_extractor.get_usage_workunits( + self.table_refs | self.view_refs ) - return - else: - # wait until warehouse is started - wait_on_warehouse.result() - self.report.report_ingestion_stage_start("Profiling") - if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): - yield from UnityCatalogAnalyzeProfiler( - self.config.profiling, - self.report, - self.unity_catalog_api_proxy, - self.gen_dataset_urn, - ).get_workunits(self.table_refs) - elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): - yield from UnityCatalogGEProfiler( - sql_common_config=self.config, - profiling_config=self.config.profiling, - report=self.report, - ).get_workunits(list(self.tables.values())) - else: - raise ValueError("Unknown profiling config method") + if self.config.is_profiling_enabled(): + with self.report.new_stage("Start warehouse"): + # Need to start the warehouse again for profiling, + # as it may have been stopped after ingestion might take + # longer time to complete + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() + + with self.report.new_stage("Profiling"): + if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): + yield from UnityCatalogAnalyzeProfiler( + self.config.profiling, + self.report, + self.unity_catalog_api_proxy, + self.gen_dataset_urn, + ).get_workunits(self.table_refs) + elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): + yield from UnityCatalogGEProfiler( + sql_common_config=self.config, + profiling_config=self.config.profiling, + report=self.report, + ).get_workunits(list(self.tables.values())) + else: + raise ValueError("Unknown profiling config method") def build_service_principal_map(self) -> None: try: @@ -462,11 +462,11 @@ def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: self.report.schemas.dropped(schema.id) continue - self.report.report_ingestion_stage_start(f"Ingest schema {schema.id}") - yield from self.gen_schema_containers(schema) - yield from self.process_tables(schema) + with self.report.new_stage(f"Ingest schema {schema.id}"): + yield from self.gen_schema_containers(schema) + yield from self.process_tables(schema) - self.report.schemas.processed(schema.id) + self.report.schemas.processed(schema.id) def process_tables(self, schema: Schema) -> Iterable[MetadataWorkUnit]: for table in self.unity_catalog_api_proxy.tables(schema=schema): diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index ce683e64b3f46..130a36e254fef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -1,7 +1,7 @@ import logging +from contextlib import AbstractContextManager from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Optional from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict @@ -22,25 +22,29 @@ @dataclass class IngestionStageReport: - ingestion_stage: Optional[str] = None ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) - _timer: Optional[PerfTimer] = field( - default=None, init=False, repr=False, compare=False - ) - - def report_ingestion_stage_start(self, stage: str) -> None: - if self._timer: - elapsed = round(self._timer.elapsed_seconds(), 2) - logger.info( - f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds", - stacklevel=2, - ) - if self.ingestion_stage: - self.ingestion_stage_durations[self.ingestion_stage] = elapsed - else: - self._timer = PerfTimer() - - self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" - logger.info(f"Stage started: {self.ingestion_stage}") + def new_stage(self, stage: str) -> "IngestionStageContext": + return IngestionStageContext(stage, self) + + +@dataclass +class IngestionStageContext(AbstractContextManager): + def __init__(self, stage: str, report: IngestionStageReport): + self._ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" + self._timer: PerfTimer = PerfTimer() + self._report = report + + def __enter__(self) -> "IngestionStageContext": + logger.info(f"Stage started: {self._ingestion_stage}") self._timer.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + elapsed = self._timer.elapsed_seconds(digits=2) + logger.info( + f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds", + stacklevel=2, + ) + self._report.ingestion_stage_durations[self._ingestion_stage] = elapsed + return None diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 9488683d6d8ca..fc1b1ed58244c 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -57,7 +57,7 @@ def __exit__( self.finish() return None - def elapsed_seconds(self) -> float: + def elapsed_seconds(self, digits: int = 4) -> float: """ Returns the elapsed time in seconds. """ @@ -65,11 +65,18 @@ def elapsed_seconds(self) -> float: return self._past_active_time if self.end_time is None: - return (time.perf_counter() - self.start_time) + (self._past_active_time) + elapsed = (time.perf_counter() - self.start_time) + (self._past_active_time) else: - return (self.end_time - self.start_time) + self._past_active_time + elapsed = (self.end_time - self.start_time) + self._past_active_time + + return round(elapsed, digits) def assert_timer_is_running(self) -> None: + if not self.is_running(): + self._error_state = True + logger.warning("Did you forget to start the timer ?") + + def is_running(self) -> bool: """ Returns true if timer is in running state. Timer is in NOT in running state if @@ -77,9 +84,7 @@ def assert_timer_is_running(self) -> None: 2. it is in paused state. 3. it had been started and finished in the past but not started again. """ - if self.start_time is None or self.paused or self.end_time: - self._error_state = True - logger.warning("Did you forget to start the timer ?") + return self.start_time is not None and not self.paused and self.end_time is None def __repr__(self) -> str: return repr(self.as_obj()) diff --git a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py index 9cb80ff02657b..24460f3829806 100644 --- a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py @@ -26,14 +26,14 @@ def run_test(): report = BigQueryV2Report() - report.set_ingestion_stage("All", "Seed Data Generation") - seed_metadata = generate_data( - num_containers=2000, - num_tables=20000, - num_views=2000, - time_range=timedelta(days=7), - ) - all_tables = seed_metadata.all_tables + with report.new_stage("All: Seed Data Generation"): + seed_metadata = generate_data( + num_containers=2000, + num_tables=20000, + num_views=2000, + time_range=timedelta(days=7), + ) + all_tables = seed_metadata.all_tables config = BigQueryV2Config( start_time=seed_metadata.start_time, @@ -51,42 +51,45 @@ def run_test(): schema_resolver=SchemaResolver(platform="bigquery"), identifiers=BigQueryIdentifierBuilder(config, report), ) - report.set_ingestion_stage("All", "Event Generation") - - num_projects = 100 - projects = [f"project-{i}" for i in range(num_projects)] - table_to_project = {table.name: random.choice(projects) for table in all_tables} - table_refs = {str(ref_from_table(table, table_to_project)) for table in all_tables} + with report.new_stage("All: Event Generation"): + num_projects = 100 + projects = [f"project-{i}" for i in range(num_projects)] + table_to_project = {table.name: random.choice(projects) for table in all_tables} + table_refs = { + str(ref_from_table(table, table_to_project)) for table in all_tables + } - queries = list( - generate_queries( - seed_metadata, - num_selects=240_000, - num_operations=800_000, - num_unique_queries=50_000, - num_users=2000, - query_length=NormalDistribution(2000, 500), + queries = list( + generate_queries( + seed_metadata, + num_selects=240_000, + num_operations=800_000, + num_unique_queries=50_000, + num_users=2000, + query_length=NormalDistribution(2000, 500), + ) ) - ) - queries.sort(key=lambda q: q.timestamp) - events = list(generate_events(queries, projects, table_to_project, config=config)) - print(f"Events generated: {len(events)}") - pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss - print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") + queries.sort(key=lambda q: q.timestamp) + events = list( + generate_events(queries, projects, table_to_project, config=config) + ) + print(f"Events generated: {len(events)}") + pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss + print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") - report.set_ingestion_stage("All", "Event Ingestion") - with PerfTimer() as timer: - workunits = usage_extractor._get_workunits_internal(events, table_refs) - num_workunits, peak_memory_usage = workunit_sink(workunits) - report.set_ingestion_stage("All", "Done") - print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + with report.new_stage("All: Event Ingestion"): + with PerfTimer() as timer: + workunits = usage_extractor._get_workunits_internal(events, table_refs) + num_workunits, peak_memory_usage = workunit_sink(workunits) + with report.new_stage("All: Done"): + print(f"Workunits Generated: {num_workunits}") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") - print( - f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" - ) - print(f"Disk Used: {report.processing_perf.usage_state_size}") - print(f"Hash collisions: {report.num_usage_query_hash_collisions}") + print( + f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" + ) + print(f"Disk Used: {report.processing_perf.usage_state_size}") + print(f"Hash collisions: {report.num_usage_query_hash_collisions}") if __name__ == "__main__": diff --git a/metadata-ingestion/tests/performance/databricks/test_unity.py b/metadata-ingestion/tests/performance/databricks/test_unity.py index ddd19804ba184..71192dc5b509b 100644 --- a/metadata-ingestion/tests/performance/databricks/test_unity.py +++ b/metadata-ingestion/tests/performance/databricks/test_unity.py @@ -59,7 +59,7 @@ def run_test(): workunits = source.get_workunits() num_workunits, peak_memory_usage = workunit_sink(workunits) print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") print( f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" diff --git a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py index 984d9e4295745..a940cce46a8f7 100644 --- a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py @@ -53,7 +53,7 @@ def run_test(): workunits = source.get_workunits() num_workunits, peak_memory_usage = workunit_sink(workunits) logging.info(f"Workunits Generated: {num_workunits}") - logging.info(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + logging.info(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") logging.info(source.get_report().as_string()) logging.info( diff --git a/metadata-ingestion/tests/performance/sql/test_sql_formatter.py b/metadata-ingestion/tests/performance/sql/test_sql_formatter.py index 5f783efc559bc..f09047c0ec4a4 100644 --- a/metadata-ingestion/tests/performance/sql/test_sql_formatter.py +++ b/metadata-ingestion/tests/performance/sql/test_sql_formatter.py @@ -12,12 +12,14 @@ def run_test() -> None: for i in range(N): if i % 50 == 0: print( - f"Running iteration {i}, elapsed time: {timer.elapsed_seconds():.2f} seconds" + f"Running iteration {i}, elapsed time: {timer.elapsed_seconds(digits=2)} seconds" ) try_format_query.__wrapped__(large_sql_query, platform="snowflake") - print(f"Total time taken for {N} iterations: {timer.elapsed_seconds():.2f} seconds") + print( + f"Total time taken for {N} iterations: {timer.elapsed_seconds(digits=2)} seconds" + ) if __name__ == "__main__": diff --git a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py new file mode 100644 index 0000000000000..8bae38eaa7444 --- /dev/null +++ b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py @@ -0,0 +1,42 @@ +import time + +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport + + +def test_ingestion_stage_context_records_duration(): + report = IngestionStageReport() + with report.new_stage(stage="Test Stage"): + pass + assert len(report.ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report.ingestion_stage_durations.keys())) + + +def test_ingestion_stage_context_handles_exceptions(): + report = IngestionStageReport() + try: + with report.new_stage(stage="Test Stage"): + raise ValueError("Test Exception") + except ValueError: + pass + assert len(report.ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report.ingestion_stage_durations)) + + +def test_ingestion_stage_context_report_handles_multiple_stages(): + report = IngestionStageReport() + with report.new_stage(stage="Test Stage 1"): + time.sleep(0.1) + with report.new_stage(stage="Test Stage 2"): + time.sleep(0.1) + with report.new_stage(stage="Test Stage 3"): + time.sleep(0.1) + assert len(report.ingestion_stage_durations) == 3 + assert all( + isinstance(duration, float) and duration > 0.0 + for duration in report.ingestion_stage_durations.values() + ) + + sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) + assert "Test Stage 1" in sorted_stages[0] + assert "Test Stage 2" in sorted_stages[1] + assert "Test Stage 3" in sorted_stages[2]