Skip to content

Commit

Permalink
more refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
usefulalgorithm committed Nov 7, 2024
1 parent 4fbbbff commit 4dc2ec3
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 132 deletions.
133 changes: 10 additions & 123 deletions metaphor/bigquery/extractor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import re
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Collection, Dict, Iterable, Iterator, List, Optional
from typing import Collection, Dict, Iterable, Iterator, List

from metaphor.bigquery.log_filter import build_log_filter
from metaphor.bigquery.log_type import query_type_to_log_type
from metaphor.bigquery.log_filter import build_list_entries_filter
from metaphor.bigquery.queries import Queries
from metaphor.bigquery.table import TableExtractor
from metaphor.common.sql.table_level_lineage import table_level_lineage
Expand All @@ -20,29 +18,21 @@

from metaphor.bigquery.config import BigQueryRunConfig
from metaphor.bigquery.job_change_event import JobChangeEvent
from metaphor.bigquery.utils import (
BigQueryResource,
build_client,
build_logging_client,
get_credentials,
)
from metaphor.bigquery.utils import build_client, build_logging_client, get_credentials
from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.filter import DatasetFilter
from metaphor.common.logger import get_logger
from metaphor.common.models import to_dataset_statistics
from metaphor.common.sql.query_log import PartialQueryLog, process_and_init_query_log
from metaphor.common.tag_matcher import tag_datasets
from metaphor.common.utils import safe_float
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Dataset,
DatasetLogicalID,
DatasetStructure,
EntityUpstream,
QueriedDataset,
QueryLog,
SourceInfo,
)
Expand Down Expand Up @@ -92,15 +82,15 @@ def _extract_project(self, project_id: str) -> None:
max_workers=self._config.max_concurrency
) as executor:

def get_table(table: bigquery.TableReference) -> Dataset:
def table_ref_to_dataset(table: bigquery.TableReference) -> Dataset:
logger.info(f"Getting table {table.table_id}")
return self._parse_table(client.project, client.get_table(table))

# map of table name to Dataset
tables: Dict[str, Dataset] = {
d.logical_id.name.split(".")[-1]: d
for d in executor.map(
get_table,
table_ref_to_dataset,
BigQueryExtractor._list_tables_with_filter(
dataset_ref, client, self._dataset_filter
),
Expand Down Expand Up @@ -190,26 +180,20 @@ def _parse_table(self, project_id: str, bq_table: BQTable) -> Dataset:
return dataset

def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]:
log_filter = build_log_filter(
target="query_log",
query_log_lookback_days=self._config.query_log.lookback_days,
audit_log_lookback_days=self._config.lineage.lookback_days,
exclude_service_accounts=self._config.query_log.exclude_service_accounts,
)

client = build_client(project_id, self._credentials)
logging_client = build_logging_client(project_id, self._credentials)

fetched, count = 0, 0
last_time = time.time()

for entry in logging_client.list_entries(
page_size=self._config.query_log.fetch_size, filter_=log_filter
page_size=self._config.query_log.fetch_size,
filter_=build_list_entries_filter("query_log", self._config),
):
count += 1

if job_change := JobChangeEvent.from_entry(entry):
if log := self._parse_job_change_event(job_change, client):
if log := job_change.to_query_log(client, self._config):
fetched += 1
yield log

Expand All @@ -225,98 +209,6 @@ def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]:

logger.info(f"Number of query log entries fetched: {fetched}")

@staticmethod
def _fetch_job_query(client: bigquery.Client, job_name: str) -> Optional[str]:
logger.info(f"Query {job_name}")
if match := re.match(r"^projects/([^/]+)/jobs/([^/]+)$", job_name):
project = match.group(1)
job_id = match.group(2)

try:
job = client.get_job(job_id, project)
except Exception as e:
logger.warning(f"Failed to get job information: {e}")
return None

if isinstance(job, bigquery.QueryJob):
return job.query

return None

def _parse_job_change_event(
self, job_change: JobChangeEvent, client: bigquery.Client
) -> Optional[QueryLog]:
if job_change.query is None:
return None

if job_change.user_email in self._config.query_log.excluded_usernames:
logger.debug(f"Skipped query issued by {job_change.user_email}")
return None

sources: List[QueriedDataset] = [
self._convert_resource_to_queried_dataset(d)
for d in job_change.source_tables
]
target = job_change.destination_table
target_datasets = (
[self._convert_resource_to_queried_dataset(target)] if target else None
)

default_database, default_schema = None, None
if job_change.default_dataset and job_change.default_dataset.count(".") == 1:
default_database, default_schema = job_change.default_dataset.split(".")

query = job_change.query
# if query SQL is truncated, fetch full SQL from job API
if (
job_change.job_type == "QUERY"
and job_change.query_truncated
and self._config.query_log.fetch_job_query_if_truncated
):
query = self._fetch_job_query(client, job_change.job_name) or query

elapsed_time = (
(job_change.end_time - job_change.start_time).total_seconds()
if job_change.start_time and job_change.end_time
else None
)

return process_and_init_query_log(
query=query,
platform=DataPlatform.BIGQUERY,
process_query_config=self._config.query_log.process_query,
query_log=PartialQueryLog(
start_time=job_change.start_time,
duration=safe_float(elapsed_time),
user_id=job_change.get_email(service_account=True),
email=job_change.get_email(service_account=False),
rows_written=safe_float(job_change.output_rows),
bytes_read=safe_float(job_change.input_bytes),
bytes_written=safe_float(job_change.output_bytes),
sources=sources,
targets=target_datasets,
default_database=default_database,
default_schema=default_schema,
type=query_type_to_log_type(job_change.statementType),
),
query_id=job_change.job_name,
)

@staticmethod
def _convert_resource_to_queried_dataset(
resource: BigQueryResource,
) -> QueriedDataset:
dataset_name = dataset_normalized_name(
resource.project_id, resource.dataset_id, resource.table_id
)
dataset_id = str(to_dataset_entity_id(dataset_name, DataPlatform.BIGQUERY))
return QueriedDataset(
id=dataset_id,
database=resource.project_id,
schema=resource.dataset_id,
table=resource.table_id,
)

def _fetch_view_upstream(self, client: bigquery.Client, project_id: str) -> None:
logger.info("Fetching lineage info from BigQuery API")

Expand Down Expand Up @@ -371,15 +263,10 @@ def _fetch_audit_log(self, project_id: str):

logging_client = build_logging_client(project_id, self._credentials)

log_filter = build_log_filter(
target="audit_log",
query_log_lookback_days=self._config.query_log.lookback_days,
audit_log_lookback_days=self._config.lineage.lookback_days,
exclude_service_accounts=self._config.query_log.exclude_service_accounts,
)
fetched, parsed = 0, 0
for entry in logging_client.list_entries(
page_size=self._config.lineage.batch_size, filter_=log_filter
page_size=self._config.lineage.batch_size,
filter_=build_list_entries_filter("audit_log", self._config),
):
fetched += 1
try:
Expand Down
105 changes: 103 additions & 2 deletions metaphor/bigquery/job_change_event.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import logging
import re
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional

from google.cloud import bigquery
from google.cloud._helpers import _rfc3339_nanos_to_datetime

from metaphor.bigquery.config import BigQueryRunConfig
from metaphor.bigquery.log_type import query_type_to_log_type
from metaphor.bigquery.utils import BigQueryResource, LogEntry
from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id
from metaphor.common.logger import get_logger
from metaphor.common.utils import safe_int, unique_list
from metaphor.common.sql.query_log import PartialQueryLog, process_and_init_query_log
from metaphor.common.utils import safe_float, safe_int, unique_list
from metaphor.models.metadata_change_event import DataPlatform, QueriedDataset, QueryLog

logger = get_logger()
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -152,7 +159,7 @@ def from_entry(cls, entry: LogEntry) -> Optional["JobChangeEvent"]:
output_rows=output_rows,
)

def get_email(self, service_account: bool) -> Optional[str]:
def _get_email(self, service_account: bool) -> Optional[str]:
"""
Returns the email for this Job Change Event.
Expand All @@ -171,3 +178,97 @@ def get_email(self, service_account: bool) -> Optional[str]:
return self.user_email if is_service_account else None
else:
return None if is_service_account else self.user_email

def to_query_log(
self, client: bigquery.Client, config: BigQueryRunConfig
) -> Optional[QueryLog]:
"""
Converts this JobChangeEvent to a QueryLog.
"""
if self.query is None:
return None

Check warning on line 189 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L189

Added line #L189 was not covered by tests

if self.user_email in config.query_log.excluded_usernames:
logger.debug(f"Skipped query issued by {self.user_email}")
return None

Check warning on line 193 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L192-L193

Added lines #L192 - L193 were not covered by tests

sources: List[QueriedDataset] = [
self._convert_resource_to_queried_dataset(d) for d in self.source_tables
]
target = self.destination_table
target_datasets = (
[self._convert_resource_to_queried_dataset(target)] if target else None
)

default_database, default_schema = None, None
if self.default_dataset and self.default_dataset.count(".") == 1:
default_database, default_schema = self.default_dataset.split(".")

query = self.query
# if query SQL is truncated, fetch full SQL from job API
if (
self.job_type == "QUERY"
and self.query_truncated
and config.query_log.fetch_job_query_if_truncated
):
query = self._fetch_job_query(client, self.job_name) or query

Check warning on line 214 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L214

Added line #L214 was not covered by tests

elapsed_time = (
(self.end_time - self.start_time).total_seconds()
if self.start_time and self.end_time
else None
)

return process_and_init_query_log(
query=query,
platform=DataPlatform.BIGQUERY,
process_query_config=config.query_log.process_query,
query_log=PartialQueryLog(
start_time=self.start_time,
duration=safe_float(elapsed_time),
user_id=self._get_email(service_account=True),
email=self._get_email(service_account=False),
rows_written=safe_float(self.output_rows),
bytes_read=safe_float(self.input_bytes),
bytes_written=safe_float(self.output_bytes),
sources=sources,
targets=target_datasets,
default_database=default_database,
default_schema=default_schema,
type=query_type_to_log_type(self.statementType),
),
query_id=self.job_name,
)

@staticmethod
def _fetch_job_query(client: bigquery.Client, job_name: str) -> Optional[str]:
logger.info(f"Query {job_name}")
if match := re.match(r"^projects/([^/]+)/jobs/([^/]+)$", job_name):
project = match.group(1)
job_id = match.group(2)

Check warning on line 248 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L245-L248

Added lines #L245 - L248 were not covered by tests

try:
job = client.get_job(job_id, project)
except Exception as e:
logger.warning(f"Failed to get job information: {e}")
return None

Check warning on line 254 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L250-L254

Added lines #L250 - L254 were not covered by tests

if isinstance(job, bigquery.QueryJob):
return job.query

Check warning on line 257 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L256-L257

Added lines #L256 - L257 were not covered by tests

return None

Check warning on line 259 in metaphor/bigquery/job_change_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/job_change_event.py#L259

Added line #L259 was not covered by tests

@staticmethod
def _convert_resource_to_queried_dataset(
resource: BigQueryResource,
) -> QueriedDataset:
dataset_name = dataset_normalized_name(
resource.project_id, resource.dataset_id, resource.table_id
)
dataset_id = str(to_dataset_entity_id(dataset_name, DataPlatform.BIGQUERY))
return QueriedDataset(
id=dataset_id,
database=resource.project_id,
schema=resource.dataset_id,
table=resource.table_id,
)
16 changes: 10 additions & 6 deletions metaphor/bigquery/log_filter.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
from typing import Literal

from metaphor.bigquery.config import BigQueryRunConfig
from metaphor.bigquery.queries import Queries
from metaphor.common.utils import start_of_day


def build_log_filter(
def build_list_entries_filter(
target: Literal["audit_log", "query_log"],
query_log_lookback_days: int,
audit_log_lookback_days: int,
exclude_service_accounts: bool,
config: BigQueryRunConfig,
) -> str:
"""
Builds the filter for a list entries query.
"""
start_time = start_of_day(
query_log_lookback_days if target == "query_log" else audit_log_lookback_days
config.query_log.lookback_days
if target == "query_log"
else config.lineage.lookback_days
).isoformat()
end_time = start_of_day().isoformat()

# Filter for service account
service_account_filter = (
"NOT protoPayload.authenticationInfo.principalEmail:gserviceaccount.com AND"
if target == "query_log" and exclude_service_accounts
if target == "query_log" and config.query_log.exclude_service_accounts
else ""
)

Expand Down
3 changes: 3 additions & 0 deletions metaphor/bigquery/log_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@


def query_type_to_log_type(query_type: Optional[str]) -> Optional[LogType]:
"""
Converts query type to LogType if it is not None.
"""
if not query_type:
return None

Check warning on line 35 in metaphor/bigquery/log_type.py

View check run for this annotation

Codecov / codecov/patch

metaphor/bigquery/log_type.py#L35

Added line #L35 was not covered by tests
return _query_type_map.get(query_type.upper(), LogType.OTHER)
4 changes: 3 additions & 1 deletion metaphor/bigquery/queries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class Queries:
"""
See https://cloud.google.com/logging/docs/view/logging-query-language for query syntax
Utility class for all queries used by the BQ crawler.
See https://cloud.google.com/logging/docs/view/logging-query-language for query syntax.
"""

@staticmethod
Expand Down

0 comments on commit 4dc2ec3

Please sign in to comment.