diff --git a/metaphor/bigquery/extractor.py b/metaphor/bigquery/extractor.py index e834a9ad..631fe7ee 100644 --- a/metaphor/bigquery/extractor.py +++ b/metaphor/bigquery/extractor.py @@ -1,10 +1,8 @@ -import re import time from concurrent.futures import ThreadPoolExecutor -from typing import Collection, Dict, Iterable, Iterator, List, Optional +from typing import Collection, Dict, Iterable, Iterator, List -from metaphor.bigquery.log_filter import build_log_filter -from metaphor.bigquery.log_type import query_type_to_log_type +from metaphor.bigquery.log_filter import build_list_entries_filter from metaphor.bigquery.queries import Queries from metaphor.bigquery.table import TableExtractor from metaphor.common.sql.table_level_lineage import table_level_lineage @@ -20,21 +18,14 @@ from metaphor.bigquery.config import BigQueryRunConfig from metaphor.bigquery.job_change_event import JobChangeEvent -from metaphor.bigquery.utils import ( - BigQueryResource, - build_client, - build_logging_client, - get_credentials, -) +from metaphor.bigquery.utils import build_client, build_logging_client, get_credentials from metaphor.common.base_extractor import BaseExtractor from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id from metaphor.common.event_util import ENTITY_TYPES from metaphor.common.filter import DatasetFilter from metaphor.common.logger import get_logger from metaphor.common.models import to_dataset_statistics -from metaphor.common.sql.query_log import PartialQueryLog, process_and_init_query_log from metaphor.common.tag_matcher import tag_datasets -from metaphor.common.utils import safe_float from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( DataPlatform, @@ -42,7 +33,6 @@ DatasetLogicalID, DatasetStructure, EntityUpstream, - QueriedDataset, QueryLog, SourceInfo, ) @@ -92,7 +82,7 @@ def _extract_project(self, project_id: str) -> None: max_workers=self._config.max_concurrency ) as executor: - def get_table(table: bigquery.TableReference) -> Dataset: + def table_ref_to_dataset(table: bigquery.TableReference) -> Dataset: logger.info(f"Getting table {table.table_id}") return self._parse_table(client.project, client.get_table(table)) @@ -100,7 +90,7 @@ def get_table(table: bigquery.TableReference) -> Dataset: tables: Dict[str, Dataset] = { d.logical_id.name.split(".")[-1]: d for d in executor.map( - get_table, + table_ref_to_dataset, BigQueryExtractor._list_tables_with_filter( dataset_ref, client, self._dataset_filter ), @@ -190,13 +180,6 @@ def _parse_table(self, project_id: str, bq_table: BQTable) -> Dataset: return dataset def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]: - log_filter = build_log_filter( - target="query_log", - query_log_lookback_days=self._config.query_log.lookback_days, - audit_log_lookback_days=self._config.lineage.lookback_days, - exclude_service_accounts=self._config.query_log.exclude_service_accounts, - ) - client = build_client(project_id, self._credentials) logging_client = build_logging_client(project_id, self._credentials) @@ -204,12 +187,13 @@ def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]: last_time = time.time() for entry in logging_client.list_entries( - page_size=self._config.query_log.fetch_size, filter_=log_filter + page_size=self._config.query_log.fetch_size, + filter_=build_list_entries_filter("query_log", self._config), ): count += 1 if job_change := JobChangeEvent.from_entry(entry): - if log := self._parse_job_change_event(job_change, client): + if log := job_change.to_query_log(client, self._config): fetched += 1 yield log @@ -225,98 +209,6 @@ def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]: logger.info(f"Number of query log entries fetched: {fetched}") - @staticmethod - def _fetch_job_query(client: bigquery.Client, job_name: str) -> Optional[str]: - logger.info(f"Query {job_name}") - if match := re.match(r"^projects/([^/]+)/jobs/([^/]+)$", job_name): - project = match.group(1) - job_id = match.group(2) - - try: - job = client.get_job(job_id, project) - except Exception as e: - logger.warning(f"Failed to get job information: {e}") - return None - - if isinstance(job, bigquery.QueryJob): - return job.query - - return None - - def _parse_job_change_event( - self, job_change: JobChangeEvent, client: bigquery.Client - ) -> Optional[QueryLog]: - if job_change.query is None: - return None - - if job_change.user_email in self._config.query_log.excluded_usernames: - logger.debug(f"Skipped query issued by {job_change.user_email}") - return None - - sources: List[QueriedDataset] = [ - self._convert_resource_to_queried_dataset(d) - for d in job_change.source_tables - ] - target = job_change.destination_table - target_datasets = ( - [self._convert_resource_to_queried_dataset(target)] if target else None - ) - - default_database, default_schema = None, None - if job_change.default_dataset and job_change.default_dataset.count(".") == 1: - default_database, default_schema = job_change.default_dataset.split(".") - - query = job_change.query - # if query SQL is truncated, fetch full SQL from job API - if ( - job_change.job_type == "QUERY" - and job_change.query_truncated - and self._config.query_log.fetch_job_query_if_truncated - ): - query = self._fetch_job_query(client, job_change.job_name) or query - - elapsed_time = ( - (job_change.end_time - job_change.start_time).total_seconds() - if job_change.start_time and job_change.end_time - else None - ) - - return process_and_init_query_log( - query=query, - platform=DataPlatform.BIGQUERY, - process_query_config=self._config.query_log.process_query, - query_log=PartialQueryLog( - start_time=job_change.start_time, - duration=safe_float(elapsed_time), - user_id=job_change.get_email(service_account=True), - email=job_change.get_email(service_account=False), - rows_written=safe_float(job_change.output_rows), - bytes_read=safe_float(job_change.input_bytes), - bytes_written=safe_float(job_change.output_bytes), - sources=sources, - targets=target_datasets, - default_database=default_database, - default_schema=default_schema, - type=query_type_to_log_type(job_change.statementType), - ), - query_id=job_change.job_name, - ) - - @staticmethod - def _convert_resource_to_queried_dataset( - resource: BigQueryResource, - ) -> QueriedDataset: - dataset_name = dataset_normalized_name( - resource.project_id, resource.dataset_id, resource.table_id - ) - dataset_id = str(to_dataset_entity_id(dataset_name, DataPlatform.BIGQUERY)) - return QueriedDataset( - id=dataset_id, - database=resource.project_id, - schema=resource.dataset_id, - table=resource.table_id, - ) - def _fetch_view_upstream(self, client: bigquery.Client, project_id: str) -> None: logger.info("Fetching lineage info from BigQuery API") @@ -371,15 +263,10 @@ def _fetch_audit_log(self, project_id: str): logging_client = build_logging_client(project_id, self._credentials) - log_filter = build_log_filter( - target="audit_log", - query_log_lookback_days=self._config.query_log.lookback_days, - audit_log_lookback_days=self._config.lineage.lookback_days, - exclude_service_accounts=self._config.query_log.exclude_service_accounts, - ) fetched, parsed = 0, 0 for entry in logging_client.list_entries( - page_size=self._config.lineage.batch_size, filter_=log_filter + page_size=self._config.lineage.batch_size, + filter_=build_list_entries_filter("audit_log", self._config), ): fetched += 1 try: diff --git a/metaphor/bigquery/job_change_event.py b/metaphor/bigquery/job_change_event.py index 12c2a2de..d2997975 100644 --- a/metaphor/bigquery/job_change_event.py +++ b/metaphor/bigquery/job_change_event.py @@ -1,13 +1,20 @@ import logging +import re from dataclasses import dataclass from datetime import datetime from typing import List, Optional +from google.cloud import bigquery from google.cloud._helpers import _rfc3339_nanos_to_datetime +from metaphor.bigquery.config import BigQueryRunConfig +from metaphor.bigquery.log_type import query_type_to_log_type from metaphor.bigquery.utils import BigQueryResource, LogEntry +from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id from metaphor.common.logger import get_logger -from metaphor.common.utils import safe_int, unique_list +from metaphor.common.sql.query_log import PartialQueryLog, process_and_init_query_log +from metaphor.common.utils import safe_float, safe_int, unique_list +from metaphor.models.metadata_change_event import DataPlatform, QueriedDataset, QueryLog logger = get_logger() logger.setLevel(logging.INFO) @@ -152,7 +159,7 @@ def from_entry(cls, entry: LogEntry) -> Optional["JobChangeEvent"]: output_rows=output_rows, ) - def get_email(self, service_account: bool) -> Optional[str]: + def _get_email(self, service_account: bool) -> Optional[str]: """ Returns the email for this Job Change Event. @@ -171,3 +178,97 @@ def get_email(self, service_account: bool) -> Optional[str]: return self.user_email if is_service_account else None else: return None if is_service_account else self.user_email + + def to_query_log( + self, client: bigquery.Client, config: BigQueryRunConfig + ) -> Optional[QueryLog]: + """ + Converts this JobChangeEvent to a QueryLog. + """ + if self.query is None: + return None + + if self.user_email in config.query_log.excluded_usernames: + logger.debug(f"Skipped query issued by {self.user_email}") + return None + + sources: List[QueriedDataset] = [ + self._convert_resource_to_queried_dataset(d) for d in self.source_tables + ] + target = self.destination_table + target_datasets = ( + [self._convert_resource_to_queried_dataset(target)] if target else None + ) + + default_database, default_schema = None, None + if self.default_dataset and self.default_dataset.count(".") == 1: + default_database, default_schema = self.default_dataset.split(".") + + query = self.query + # if query SQL is truncated, fetch full SQL from job API + if ( + self.job_type == "QUERY" + and self.query_truncated + and config.query_log.fetch_job_query_if_truncated + ): + query = self._fetch_job_query(client, self.job_name) or query + + elapsed_time = ( + (self.end_time - self.start_time).total_seconds() + if self.start_time and self.end_time + else None + ) + + return process_and_init_query_log( + query=query, + platform=DataPlatform.BIGQUERY, + process_query_config=config.query_log.process_query, + query_log=PartialQueryLog( + start_time=self.start_time, + duration=safe_float(elapsed_time), + user_id=self._get_email(service_account=True), + email=self._get_email(service_account=False), + rows_written=safe_float(self.output_rows), + bytes_read=safe_float(self.input_bytes), + bytes_written=safe_float(self.output_bytes), + sources=sources, + targets=target_datasets, + default_database=default_database, + default_schema=default_schema, + type=query_type_to_log_type(self.statementType), + ), + query_id=self.job_name, + ) + + @staticmethod + def _fetch_job_query(client: bigquery.Client, job_name: str) -> Optional[str]: + logger.info(f"Query {job_name}") + if match := re.match(r"^projects/([^/]+)/jobs/([^/]+)$", job_name): + project = match.group(1) + job_id = match.group(2) + + try: + job = client.get_job(job_id, project) + except Exception as e: + logger.warning(f"Failed to get job information: {e}") + return None + + if isinstance(job, bigquery.QueryJob): + return job.query + + return None + + @staticmethod + def _convert_resource_to_queried_dataset( + resource: BigQueryResource, + ) -> QueriedDataset: + dataset_name = dataset_normalized_name( + resource.project_id, resource.dataset_id, resource.table_id + ) + dataset_id = str(to_dataset_entity_id(dataset_name, DataPlatform.BIGQUERY)) + return QueriedDataset( + id=dataset_id, + database=resource.project_id, + schema=resource.dataset_id, + table=resource.table_id, + ) diff --git a/metaphor/bigquery/log_filter.py b/metaphor/bigquery/log_filter.py index d9ea8de9..8f782440 100644 --- a/metaphor/bigquery/log_filter.py +++ b/metaphor/bigquery/log_filter.py @@ -1,24 +1,28 @@ from typing import Literal +from metaphor.bigquery.config import BigQueryRunConfig from metaphor.bigquery.queries import Queries from metaphor.common.utils import start_of_day -def build_log_filter( +def build_list_entries_filter( target: Literal["audit_log", "query_log"], - query_log_lookback_days: int, - audit_log_lookback_days: int, - exclude_service_accounts: bool, + config: BigQueryRunConfig, ) -> str: + """ + Builds the filter for a list entries query. + """ start_time = start_of_day( - query_log_lookback_days if target == "query_log" else audit_log_lookback_days + config.query_log.lookback_days + if target == "query_log" + else config.lineage.lookback_days ).isoformat() end_time = start_of_day().isoformat() # Filter for service account service_account_filter = ( "NOT protoPayload.authenticationInfo.principalEmail:gserviceaccount.com AND" - if target == "query_log" and exclude_service_accounts + if target == "query_log" and config.query_log.exclude_service_accounts else "" ) diff --git a/metaphor/bigquery/log_type.py b/metaphor/bigquery/log_type.py index d815fbd5..322e5ec0 100644 --- a/metaphor/bigquery/log_type.py +++ b/metaphor/bigquery/log_type.py @@ -28,6 +28,9 @@ def query_type_to_log_type(query_type: Optional[str]) -> Optional[LogType]: + """ + Converts query type to LogType if it is not None. + """ if not query_type: return None return _query_type_map.get(query_type.upper(), LogType.OTHER) diff --git a/metaphor/bigquery/queries.py b/metaphor/bigquery/queries.py index d760aac3..49cb0d75 100644 --- a/metaphor/bigquery/queries.py +++ b/metaphor/bigquery/queries.py @@ -1,6 +1,8 @@ class Queries: """ - See https://cloud.google.com/logging/docs/view/logging-query-language for query syntax + Utility class for all queries used by the BQ crawler. + + See https://cloud.google.com/logging/docs/view/logging-query-language for query syntax. """ @staticmethod