Skip to content

Commit

Permalink
Add safe_int util method to prevent parsing errors (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
mars-lan authored Sep 27, 2023
1 parent 3d8b1be commit f64f0d2
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 49 deletions.
4 changes: 2 additions & 2 deletions metaphor/bigquery/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from metaphor.common.models import to_dataset_statistics
from metaphor.common.query_history import chunk_query_logs
from metaphor.common.tag_matcher import tag_datasets
from metaphor.common.utils import md5_digest, start_of_day
from metaphor.common.utils import md5_digest, safe_float, start_of_day
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -356,7 +356,7 @@ def _parse_job_change_entry(
query_id=job_change.job_name,
platform=DataPlatform.BIGQUERY,
start_time=job_change.start_time,
duration=float(elapsed_time) if elapsed_time else None,
duration=safe_float(elapsed_time),
email=job_change.user_email,
rows_written=float(job_change.output_rows)
if job_change.output_rows
Expand Down
6 changes: 3 additions & 3 deletions metaphor/bigquery/logEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from metaphor.bigquery.utils import BigQueryResource, LogEntry
from metaphor.common.logger import get_logger
from metaphor.common.utils import unique_list
from metaphor.common.utils import safe_int, unique_list

logger = get_logger()
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -102,10 +102,10 @@ def from_entry(cls, entry: LogEntry) -> Optional["JobChangeEvent"]:
default_dataset = query_job.get("defaultDataset", None)

processed_bytes = query_stats.get("totalProcessedBytes", None)
input_bytes = int(processed_bytes) if processed_bytes else None
input_bytes = safe_int(processed_bytes)

output_row_count = query_stats.get("outputRowCount", None)
output_rows = int(output_row_count) if output_row_count else None
output_rows = safe_int(output_row_count)
else:
logger.error(f"unsupported job type {job_type}")
return None
Expand Down
10 changes: 5 additions & 5 deletions metaphor/bigquery/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.filter import DatasetFilter
from metaphor.common.logger import get_logger
from metaphor.common.utils import convert_to_float
from metaphor.common.utils import safe_float
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -246,19 +246,19 @@ def _parse_result(
min_value, max_value, avg, std_dev = None, None, None, None
if BigQueryProfileExtractor._is_numeric(data_type):
if column_statistics.min_value:
min_value = convert_to_float(results[index])
min_value = safe_float(results[index])
index += 1

if column_statistics.max_value:
max_value = convert_to_float(results[index])
max_value = safe_float(results[index])
index += 1

if column_statistics.avg_value:
avg = convert_to_float(results[index])
avg = safe_float(results[index])
index += 1

if column_statistics.std_dev:
std_dev = convert_to_float(results[index])
std_dev = safe_float(results[index])
index += 1

fields.append(
Expand Down
5 changes: 3 additions & 2 deletions metaphor/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.entity_id import normalize_full_dataset_name, to_dataset_entity_id
from metaphor.common.utils import safe_float
from metaphor.models.metadata_change_event import (
DataPlatform,
DatasetLogicalID,
Expand Down Expand Up @@ -39,7 +40,7 @@ def to_dataset_statistics(
last_updated: Optional[datetime] = None,
) -> DatasetStatistics:
return DatasetStatistics(
data_size_bytes=float(size_bytes) if size_bytes is not None else None,
record_count=float(rows) if rows is not None else None,
data_size_bytes=safe_float(size_bytes),
record_count=safe_float(rows),
last_updated=last_updated,
)
12 changes: 11 additions & 1 deletion metaphor/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def safe_parse_ISO8601(iso8601_str: Optional[str]) -> Optional[datetime]:
return None


def convert_to_float(value: Optional[Union[float, int, str]]) -> Optional[float]:
def safe_float(value: Optional[Union[float, int, str]]) -> Optional[float]:
"""Converts a value to float, return None if the original value is None or NaN or INF"""
return (
None
Expand All @@ -74,6 +74,16 @@ def convert_to_float(value: Optional[Union[float, int, str]]) -> Optional[float]
)


def safe_int(value: Any) -> Optional[int]:
"""Converts a value to int, return None if the original value is None or NaN or INF"""
return (
None
if value is None
or (isinstance(value, float) and (math.isnan(value) or math.isinf(value)))
else int(value)
)


def chunk_by_size(
list_to_chunk: list,
items_per_chunk: int,
Expand Down
5 changes: 2 additions & 3 deletions metaphor/fivetran/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.common.snowflake import normalize_snowflake_account
from metaphor.common.utils import safe_float
from metaphor.fivetran.config import FivetranRunConfig
from metaphor.fivetran.models import (
ConnectorPayload,
Expand Down Expand Up @@ -345,9 +346,7 @@ def _create_fivetran_pipeline(

connector_type_name = source_metadata.name if source_metadata else None
icon_url = source_metadata.icon_url if source_metadata else None
sync_interval = (
float(connector.sync_frequency) if connector.sync_frequency else None
)
sync_interval = safe_float(connector.sync_frequency)

fivetran = FivetranPipeline(
status=FiveTranConnectorStatus(
Expand Down
4 changes: 2 additions & 2 deletions metaphor/glue/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.common.utils import unique_list
from metaphor.common.utils import safe_float, unique_list
from metaphor.glue.config import AwsCredentials, GlueRunConfig
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
Expand Down Expand Up @@ -170,7 +170,7 @@ def _init_dataset(
)

dataset.statistics = DatasetStatistics()
dataset.statistics.record_count = float(row_count) if row_count else None
dataset.statistics.record_count = safe_float(row_count)
dataset.statistics.last_updated = last_updated

dataset.structure = DatasetStructure(schema=schema, table=name)
Expand Down
9 changes: 5 additions & 4 deletions metaphor/postgresql/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.common.sampling import SamplingConfig
from metaphor.common.utils import safe_float
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -160,7 +161,7 @@ def _parse_result(results: List, dataset: Dataset):
nullable = field.nullable
is_numeric = field.precision is not None

unique_values = float(results[index])
unique_values = safe_float(results[index])
index += 1

if nullable:
Expand All @@ -170,11 +171,11 @@ def _parse_result(results: List, dataset: Dataset):
nulls = 0.0

if is_numeric:
min_value = float(results[index]) if results[index] else None
min_value = safe_float(results[index])
index += 1
max_value = float(results[index]) if results[index] else None
max_value = safe_float(results[index])
index += 1
avg = float(results[index]) if results[index] else None
avg = safe_float(results[index])
index += 1
else:
min_value, max_value, avg = None, None, None
Expand Down
28 changes: 9 additions & 19 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from metaphor.common.query_history import chunk_query_logs
from metaphor.common.snowflake import normalize_snowflake_account
from metaphor.common.tag_matcher import tag_datasets
from metaphor.common.utils import chunks, md5_digest, start_of_day
from metaphor.common.utils import chunks, md5_digest, safe_float, start_of_day
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -231,8 +231,8 @@ def _fetch_columns(self, cursor: SnowflakeCursor, database: str) -> None:
field_path=column,
field_name=column,
native_type=data_type,
max_length=float(max_length) if max_length is not None else None,
precision=float(precision) if precision is not None else None,
max_length=safe_float(max_length),
precision=safe_float(precision),
nullable=nullable == "YES",
description=comment,
subfields=None,
Expand Down Expand Up @@ -525,25 +525,15 @@ def _parse_query_logs(self, batch_number: str, query_logs: List[Tuple]) -> None:
platform=DataPlatform.SNOWFLAKE,
account=self._account,
start_time=start_time,
duration=float(elapsed_time / 1000.0),
cost=float(credit) if credit is not None else None,
duration=safe_float(elapsed_time / 1000.0),
cost=safe_float(credit),
user_id=username,
default_database=default_database,
default_schema=default_schema,
rows_read=float(rows_produced)
if rows_produced is not None
else None,
rows_written=float(rows_inserted)
if rows_inserted is not None
else float(rows_updated)
if rows_updated is not None
else None,
bytes_read=float(bytes_scanned)
if bytes_scanned is not None
else None,
bytes_written=float(bytes_written)
if bytes_written is not None
else None,
rows_read=safe_float(rows_produced),
rows_written=safe_float(rows_inserted) or safe_float(rows_updated),
bytes_read=safe_float(bytes_scanned),
bytes_written=safe_float(bytes_written),
sources=sources,
targets=targets,
sql=query_text,
Expand Down
16 changes: 9 additions & 7 deletions metaphor/snowflake/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from metaphor.common.logger import get_logger
from metaphor.common.sampling import SamplingConfig
from metaphor.common.snowflake import normalize_snowflake_account
from metaphor.common.utils import convert_to_float
from metaphor.common.utils import safe_float, safe_int
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -78,7 +78,9 @@ async def extract(self) -> Collection[ENTITY_TYPES]:

for database in databases:
tables = self._fetch_tables(cursor, database)
logger.info(f"Include {len(tables)} tables from {database}")
logger.info(
f"Include {len(tables)} {'tables/views' if self._include_views else 'tables'} from {database}"
)

self._fetch_columns_async(self._conn, tables)

Expand Down Expand Up @@ -113,7 +115,7 @@ def _fetch_tables(self, cursor, database: str) -> Dict[str, DatasetInfo]:
self._account, normalized_name
)
tables[normalized_name] = DatasetInfo(
database, schema, name, table_type, int(row_count)
database, schema, name, table_type, safe_int(row_count)
)

return tables
Expand Down Expand Up @@ -255,19 +257,19 @@ def _parse_profiling_result(
min_value, max_value, avg, std_dev = None, None, None, None
if SnowflakeProfileExtractor._is_numeric(data_type):
if column_statistics.min_value:
min_value = convert_to_float(results[index])
min_value = safe_float(results[index])
index += 1

if column_statistics.max_value:
max_value = convert_to_float(results[index])
max_value = safe_float(results[index])
index += 1

if column_statistics.avg_value:
avg = convert_to_float(results[index])
avg = safe_float(results[index])
index += 1

if column_statistics.std_dev:
std_dev = convert_to_float(results[index])
std_dev = safe_float(results[index])
index += 1

fields.append(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.12.52"
version = "0.12.53"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
18 changes: 18 additions & 0 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
filter_empty_strings,
must_set_exactly_one,
removesuffix,
safe_float,
safe_int,
safe_parse_ISO8601,
start_of_day,
unique_list,
Expand Down Expand Up @@ -114,3 +116,19 @@ def test_safe_parse_ISO8061():
== "2023-09-20T08:10:15+00:00"
)
assert safe_parse_ISO8601("isvalid") is None


def test_safe_float():
assert safe_float(None) is None
assert safe_float(float("NaN")) is None
assert safe_float(float("Inf")) is None
assert safe_float(1) == 1.0
assert safe_float(1.7) == 1.7


def test_safe_int():
assert safe_int(None) is None
assert safe_float(float("NaN")) is None
assert safe_float(float("Inf")) is None
assert safe_int(1) == 1
assert safe_int(1.7) == 1

0 comments on commit f64f0d2

Please sign in to comment.