Skip to content

Commit

Permalink
Parse table and column stats for hive (#747)
Browse files Browse the repository at this point in the history
* Parse table and column stats for hive

* readme

* update readme
  • Loading branch information
usefulalgorithm authored Jan 10, 2024
1 parent b8f9dc2 commit e85cc98
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 5 deletions.
4 changes: 4 additions & 0 deletions metaphor/common/column_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ class ColumnStatistics:

# Compute standard deviation
std_dev: bool = False

@property
def should_calculate(self) -> bool:
return any(bool(value) for value in self.__dict__.values())
18 changes: 18 additions & 0 deletions metaphor/hive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

This connector extracts technical metadata from Apache Hive.

## Setup

Please ensure the user running the connector has `SELECT` privilege with `WITH GRANT OPTION` specified, so that the connector can read table informations from Hive. See [SQL Standard Based Hive Authorization](https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization) for more information.

## Config File

Create a YAML config file based on the following template.
Expand All @@ -13,8 +17,22 @@ host: <host>
port: <port>
auth_user: <auth user for hiveserver>
password: <password for the auth user>

output:
file:
directory: <output_directory>
```
For testing environments there could be no authentication. In that case, do not set `auth_user` and `password`

See [Output Config](../common/docs/output.md) for more information on `output`.

### Optional Configurations

#### Column Statistics

See [Column Statistics](../../common/docs/column_statistics.md) for details.

## Testing

Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv).
Expand Down
7 changes: 7 additions & 0 deletions metaphor/hive/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from dataclasses import field
from typing import Any, Dict, Optional

from pydantic.dataclasses import dataclass

from metaphor.common.base_config import BaseConfig
from metaphor.common.column_statistics import ColumnStatistics
from metaphor.common.dataclass import ConnectorConfig


Expand All @@ -13,6 +15,11 @@ class HiveRunConfig(BaseConfig):
auth_user: Optional[str] = None
password: Optional[str] = None

# Compute specific types of statistics for each column
column_statistics: ColumnStatistics = field(
default_factory=lambda: ColumnStatistics()
)

@property
def connect_kwargs(self) -> Dict[str, Any]:
kwargs = {
Expand Down
139 changes: 136 additions & 3 deletions metaphor/hive/extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Collection, Iterable, List, Tuple
from typing import Any, Collection, Dict, Iterable, List, Optional, Tuple

from pyhive import hive

Expand All @@ -13,7 +13,9 @@
Dataset,
DatasetLogicalID,
DatasetSchema,
DatasetStatistics,
EntityType,
FieldStatistics,
MaterializationType,
SchemaField,
SchemaType,
Expand All @@ -22,6 +24,19 @@

logger = get_logger()

NUMERIC_TYPES = {
"TINYINT",
"SMALLINT",
"INT",
"INTEGER",
"BIGINT",
"FLOAT",
"DOUBLE",
"DOUBLE PRECISION",
"DECIMAL",
"NUMERIC",
}


class HiveExtractor(BaseExtractor):
"""Hive metadata extractor"""
Expand All @@ -39,7 +54,15 @@ def __init__(self, config: HiveRunConfig) -> None:

@staticmethod
def get_connection(**kwargs) -> hive.Connection:
return hive.connect(**kwargs)
return hive.connect(
**kwargs,
configuration={
"hive.txn.manager": "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager",
"hive.support.concurrency": "true",
"hive.enforce.bucketing": "true",
"hive.exec.dynamic.partition.mode": "nonstrict",
},
)

@staticmethod
def extract_names_from_cursor(cursor: Iterable[Tuple]) -> List[str]:
Expand All @@ -57,7 +80,7 @@ def _extract_table(
platform=DataPlatform.HIVE,
),
)
fields = []
fields: List[SchemaField] = []
cursor.execute(f"describe {database}.{table}")
for field_path, field_type, comment in cursor:
fields.append(
Expand Down Expand Up @@ -85,8 +108,118 @@ def _extract_table(
table_schema=table_schema,
)
dataset.schema = dataset_schema

if materialization in {
MaterializationType.TABLE,
MaterializationType.MATERIALIZED_VIEW,
}:
dataset.statistics = self._extract_table_stats(database, table, fields)
return dataset

@staticmethod
def _is_numeric_field(field: SchemaField) -> bool:
return (
field.native_type is not None and field.native_type.upper() in NUMERIC_TYPES
)

def _extract_field_stats( # noqa C091
self,
database: str,
table: str,
field: SchemaField,
) -> FieldStatistics:
with self._connection.cursor() as cursor:
raw_field_statistics: Dict[str, Any] = {
"fieldPath": field.field_path,
}

chosen_stats: Dict[str, str] = {}
if self._config.column_statistics.null_count:
chosen_stats["num_nulls"] = "nullValueCount"
if self._config.column_statistics.min_value:
chosen_stats["min"] = "minValue"
if self._config.column_statistics.max_value:
chosen_stats["max"] = "maxValue"
if self._config.column_statistics.unique_count:
chosen_stats["distinct_count"] = "distinctValueCount"

if chosen_stats:
# Gotta extract column stats calculated by Hive
cursor.execute(
f"describe formatted {database}.{table} {field.field_path}"
)
for row in cursor:
field_stats_key = chosen_stats.get(row[0])
if field_stats_key:
try:
raw_field_statistics[field_stats_key] = float(row[1])
except Exception:
if HiveExtractor._is_numeric_field(field):
logger.warning(
f"Cannot find {field_stats_key} for field {field.field_path}"
)

def _calculate_by_hand(function: str) -> Optional[float]:
try:
cursor.execute(
f"select {function}({field.field_path}) from {database}.{table}"
)
return float(next(cursor)[0])
except Exception:
logger.exception(
f"Cannot calculate {function} for field {field.field_path}"
)
return None

if field.native_type and field.native_type.upper() in NUMERIC_TYPES:
if self._config.column_statistics.avg_value:
raw_field_statistics["average"] = _calculate_by_hand("avg")
if self._config.column_statistics.std_dev:
raw_field_statistics["stdDev"] = _calculate_by_hand("std")

return FieldStatistics.from_dict(raw_field_statistics)

def _extract_table_stats(
self, database: str, table: str, fields: List[SchemaField]
):
with self._connection.cursor() as cursor:
statement = f"analyze table {database}.{table} compute statistics"
if self._config.column_statistics.should_calculate:
statement += " for columns"
cursor.execute(statement)
cursor.execute(f"describe formatted {database}.{table}")
raw_table_stats = list(cursor)
table_size = next(
(
float(str(r[-1]).strip())
for r in raw_table_stats
if r[1] and "totalSize" in r[1]
),
None,
)
num_rows = next(
(
float(str(r[-1]).strip())
for r in raw_table_stats
if r[1] and "numRows" in r[1]
),
None,
)
dataset_statistics = DatasetStatistics(
data_size_bytes=table_size,
record_count=num_rows,
)

field_statistics = None
if self._config.column_statistics.should_calculate:
field_statistics = [
self._extract_field_stats(database, table, field)
for field in fields
]

dataset_statistics.field_statistics = field_statistics
return dataset_statistics

def _extract_database(self, database: str) -> List[Dataset]:
with self._connection.cursor() as cursor:
datasets = []
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.103"
version = "0.13.104"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
Loading

0 comments on commit e85cc98

Please sign in to comment.