diff --git a/metaphor/common/column_statistics.py b/metaphor/common/column_statistics.py index a804036d..ffd6a642 100644 --- a/metaphor/common/column_statistics.py +++ b/metaphor/common/column_statistics.py @@ -22,3 +22,7 @@ class ColumnStatistics: # Compute standard deviation std_dev: bool = False + + @property + def should_calculate(self) -> bool: + return any(bool(value) for value in self.__dict__.values()) diff --git a/metaphor/hive/README.md b/metaphor/hive/README.md index 57c10998..05448aaa 100644 --- a/metaphor/hive/README.md +++ b/metaphor/hive/README.md @@ -2,6 +2,10 @@ This connector extracts technical metadata from Apache Hive. +## Setup + +Please ensure the user running the connector has `SELECT` privilege with `WITH GRANT OPTION` specified, so that the connector can read table informations from Hive. See [SQL Standard Based Hive Authorization](https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization) for more information. + ## Config File Create a YAML config file based on the following template. @@ -13,8 +17,22 @@ host: port: auth_user: password: + +output: + file: + directory: ``` +For testing environments there could be no authentication. In that case, do not set `auth_user` and `password` + +See [Output Config](../common/docs/output.md) for more information on `output`. + +### Optional Configurations + +#### Column Statistics + +See [Column Statistics](../../common/docs/column_statistics.md) for details. + ## Testing Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). diff --git a/metaphor/hive/config.py b/metaphor/hive/config.py index 641b81ff..178a5180 100644 --- a/metaphor/hive/config.py +++ b/metaphor/hive/config.py @@ -1,8 +1,10 @@ +from dataclasses import field from typing import Any, Dict, Optional from pydantic.dataclasses import dataclass from metaphor.common.base_config import BaseConfig +from metaphor.common.column_statistics import ColumnStatistics from metaphor.common.dataclass import ConnectorConfig @@ -13,6 +15,11 @@ class HiveRunConfig(BaseConfig): auth_user: Optional[str] = None password: Optional[str] = None + # Compute specific types of statistics for each column + column_statistics: ColumnStatistics = field( + default_factory=lambda: ColumnStatistics() + ) + @property def connect_kwargs(self) -> Dict[str, Any]: kwargs = { diff --git a/metaphor/hive/extractor.py b/metaphor/hive/extractor.py index 0cca8495..02ec49e7 100644 --- a/metaphor/hive/extractor.py +++ b/metaphor/hive/extractor.py @@ -1,4 +1,4 @@ -from typing import Collection, Iterable, List, Tuple +from typing import Any, Collection, Dict, Iterable, List, Optional, Tuple from pyhive import hive @@ -13,7 +13,9 @@ Dataset, DatasetLogicalID, DatasetSchema, + DatasetStatistics, EntityType, + FieldStatistics, MaterializationType, SchemaField, SchemaType, @@ -22,6 +24,19 @@ logger = get_logger() +NUMERIC_TYPES = { + "TINYINT", + "SMALLINT", + "INT", + "INTEGER", + "BIGINT", + "FLOAT", + "DOUBLE", + "DOUBLE PRECISION", + "DECIMAL", + "NUMERIC", +} + class HiveExtractor(BaseExtractor): """Hive metadata extractor""" @@ -39,7 +54,15 @@ def __init__(self, config: HiveRunConfig) -> None: @staticmethod def get_connection(**kwargs) -> hive.Connection: - return hive.connect(**kwargs) + return hive.connect( + **kwargs, + configuration={ + "hive.txn.manager": "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager", + "hive.support.concurrency": "true", + "hive.enforce.bucketing": "true", + "hive.exec.dynamic.partition.mode": "nonstrict", + }, + ) @staticmethod def extract_names_from_cursor(cursor: Iterable[Tuple]) -> List[str]: @@ -57,7 +80,7 @@ def _extract_table( platform=DataPlatform.HIVE, ), ) - fields = [] + fields: List[SchemaField] = [] cursor.execute(f"describe {database}.{table}") for field_path, field_type, comment in cursor: fields.append( @@ -85,8 +108,118 @@ def _extract_table( table_schema=table_schema, ) dataset.schema = dataset_schema + + if materialization in { + MaterializationType.TABLE, + MaterializationType.MATERIALIZED_VIEW, + }: + dataset.statistics = self._extract_table_stats(database, table, fields) return dataset + @staticmethod + def _is_numeric_field(field: SchemaField) -> bool: + return ( + field.native_type is not None and field.native_type.upper() in NUMERIC_TYPES + ) + + def _extract_field_stats( # noqa C091 + self, + database: str, + table: str, + field: SchemaField, + ) -> FieldStatistics: + with self._connection.cursor() as cursor: + raw_field_statistics: Dict[str, Any] = { + "fieldPath": field.field_path, + } + + chosen_stats: Dict[str, str] = {} + if self._config.column_statistics.null_count: + chosen_stats["num_nulls"] = "nullValueCount" + if self._config.column_statistics.min_value: + chosen_stats["min"] = "minValue" + if self._config.column_statistics.max_value: + chosen_stats["max"] = "maxValue" + if self._config.column_statistics.unique_count: + chosen_stats["distinct_count"] = "distinctValueCount" + + if chosen_stats: + # Gotta extract column stats calculated by Hive + cursor.execute( + f"describe formatted {database}.{table} {field.field_path}" + ) + for row in cursor: + field_stats_key = chosen_stats.get(row[0]) + if field_stats_key: + try: + raw_field_statistics[field_stats_key] = float(row[1]) + except Exception: + if HiveExtractor._is_numeric_field(field): + logger.warning( + f"Cannot find {field_stats_key} for field {field.field_path}" + ) + + def _calculate_by_hand(function: str) -> Optional[float]: + try: + cursor.execute( + f"select {function}({field.field_path}) from {database}.{table}" + ) + return float(next(cursor)[0]) + except Exception: + logger.exception( + f"Cannot calculate {function} for field {field.field_path}" + ) + return None + + if field.native_type and field.native_type.upper() in NUMERIC_TYPES: + if self._config.column_statistics.avg_value: + raw_field_statistics["average"] = _calculate_by_hand("avg") + if self._config.column_statistics.std_dev: + raw_field_statistics["stdDev"] = _calculate_by_hand("std") + + return FieldStatistics.from_dict(raw_field_statistics) + + def _extract_table_stats( + self, database: str, table: str, fields: List[SchemaField] + ): + with self._connection.cursor() as cursor: + statement = f"analyze table {database}.{table} compute statistics" + if self._config.column_statistics.should_calculate: + statement += " for columns" + cursor.execute(statement) + cursor.execute(f"describe formatted {database}.{table}") + raw_table_stats = list(cursor) + table_size = next( + ( + float(str(r[-1]).strip()) + for r in raw_table_stats + if r[1] and "totalSize" in r[1] + ), + None, + ) + num_rows = next( + ( + float(str(r[-1]).strip()) + for r in raw_table_stats + if r[1] and "numRows" in r[1] + ), + None, + ) + dataset_statistics = DatasetStatistics( + data_size_bytes=table_size, + record_count=num_rows, + ) + + field_statistics = None + if self._config.column_statistics.should_calculate: + field_statistics = [ + self._extract_field_stats(database, table, field) + for field in fields + ] + + dataset_statistics.field_statistics = field_statistics + return dataset_statistics + def _extract_database(self, database: str) -> List[Dataset]: with self._connection.cursor() as cursor: datasets = [] diff --git a/pyproject.toml b/pyproject.toml index 8f24526d..004a4089 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.103" +version = "0.13.104" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/hive/expected.json b/tests/hive/expected.json index e3618bb1..ea66860d 100644 --- a/tests/hive/expected.json +++ b/tests/hive/expected.json @@ -17,6 +17,21 @@ "materialization": "TABLE", "tableSchema": "CREATE EXTERNAL TABLE `default`.`admirals`(\n `id` int)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.mapred.TextInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/admirals'\nTBLPROPERTIES (\n 'TRANSLATED_TO_EXTERNAL'='TRUE', \n 'bucketing_version'='2', \n 'external.table.purge'='TRUE')" } + }, + "statistics": { + "dataSizeBytes": 21.0, + "fieldStatistics": [ + { + "average": 5.5, + "distinctValueCount": 10.0, + "fieldPath": "id", + "maxValue": 10.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 2.8722813232690143 + } + ], + "recordCount": 10.0 } }, { @@ -45,6 +60,35 @@ "materialization": "TABLE", "tableSchema": "CREATE TABLE `default`.`depts`(\n `deptno` int, \n `deptname` varchar(256), \n `locationid` int)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/depts'\nTBLPROPERTIES (\n 'bucketing_version'='2', \n 'transactional'='true', \n 'transactional_properties'='default')" } + }, + "statistics": { + "dataSizeBytes": 926.0, + "fieldStatistics": [ + { + "average": 203.0, + "distinctValueCount": 5.0, + "fieldPath": "deptno", + "maxValue": 205.0, + "minValue": 201.0, + "nullValueCount": 0.0, + "stdDev": 1.4142135623730951 + }, + { + "distinctValueCount": 5.0, + "fieldPath": "deptname", + "nullValueCount": 0.0 + }, + { + "average": 503.0, + "distinctValueCount": 5.0, + "fieldPath": "locationid", + "maxValue": 505.0, + "minValue": 501.0, + "nullValueCount": 0.0, + "stdDev": 1.4142135623730951 + } + ], + "recordCount": 5.0 } }, { @@ -81,6 +125,49 @@ "materialization": "TABLE", "tableSchema": "CREATE TABLE `default`.`emps`(\n `empid` int, \n `deptno` int, \n `name` varchar(256), \n `salary` float, \n `hire_date` timestamp)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/emps'\nTBLPROPERTIES (\n 'bucketing_version'='2', \n 'transactional'='true', \n 'transactional_properties'='default')" } + }, + "statistics": { + "dataSizeBytes": 1594.0, + "fieldStatistics": [ + { + "average": 113.0, + "distinctValueCount": 25.0, + "fieldPath": "empid", + "maxValue": 125.0, + "minValue": 101.0, + "nullValueCount": 0.0, + "stdDev": 7.211102550927978 + }, + { + "average": 203.0, + "distinctValueCount": 5.0, + "fieldPath": "deptno", + "maxValue": 205.0, + "minValue": 201.0, + "nullValueCount": 0.0, + "stdDev": 1.4142135623730951 + }, + { + "distinctValueCount": 25.0, + "fieldPath": "name", + "nullValueCount": 0.0 + }, + { + "average": 58000.0, + "distinctValueCount": 13.0, + "fieldPath": "salary", + "maxValue": 65000.0, + "minValue": 50000.0, + "nullValueCount": 0.0, + "stdDev": 3577.7087639996635 + }, + { + "distinctValueCount": 25.0, + "fieldPath": "hire_date", + "nullValueCount": 0.0 + } + ], + "recordCount": 25.0 } }, { @@ -109,6 +196,31 @@ "materialization": "MATERIALIZED_VIEW", "tableSchema": "CREATE TABLE `default`.`mv1`(\n `empid` int, \n `deptname` varchar(256), \n `hire_date` timestamp)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/mv1'\nTBLPROPERTIES (\n 'bucketing_version'='2')" } + }, + "statistics": { + "dataSizeBytes": 624.0, + "fieldStatistics": [ + { + "average": 113.61538461538461, + "distinctValueCount": 13.0, + "fieldPath": "empid", + "maxValue": 125.0, + "minValue": 102.0, + "nullValueCount": 0.0, + "stdDev": 7.07692307692315 + }, + { + "distinctValueCount": 5.0, + "fieldPath": "deptname", + "nullValueCount": 0.0 + }, + { + "distinctValueCount": 13.0, + "fieldPath": "hire_date", + "nullValueCount": 0.0 + } + ], + "recordCount": 13.0 } }, { @@ -165,6 +277,26 @@ "materialization": "TABLE", "tableSchema": "CREATE EXTERNAL TABLE `default`.`ship_types`(\n `id` int, \n `type_name` string)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.mapred.TextInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/ship_types'\nTBLPROPERTIES (\n 'TRANSLATED_TO_EXTERNAL'='TRUE', \n 'bucketing_version'='2', \n 'external.table.purge'='TRUE')" } + }, + "statistics": { + "dataSizeBytes": 139.0, + "fieldStatistics": [ + { + "average": 5.5, + "distinctValueCount": 10.0, + "fieldPath": "id", + "maxValue": 10.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 2.8722813232690143 + }, + { + "distinctValueCount": 9.0, + "fieldPath": "type_name", + "nullValueCount": 0.0 + } + ], + "recordCount": 10.0 } }, { @@ -193,6 +325,39 @@ "materialization": "TABLE", "tableSchema": "CREATE EXTERNAL TABLE `default`.`ships`(\n `id` int, \n `ship_type_id` int, \n `crew_size` int)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.mapred.TextInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/ships'\nTBLPROPERTIES (\n 'TRANSLATED_TO_EXTERNAL'='TRUE', \n 'bucketing_version'='2', \n 'external.table.purge'='TRUE')" } + }, + "statistics": { + "dataSizeBytes": 712.0, + "fieldStatistics": [ + { + "average": 50.5, + "distinctValueCount": 100.0, + "fieldPath": "id", + "maxValue": 100.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 28.86607004772212 + }, + { + "average": 5.5, + "distinctValueCount": 10.0, + "fieldPath": "ship_type_id", + "maxValue": 10.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 2.8722813232690143 + }, + { + "average": 5.5, + "distinctValueCount": 10.0, + "fieldPath": "crew_size", + "maxValue": 10.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 2.8722813232690143 + } + ], + "recordCount": 100.0 } }, { @@ -221,6 +386,39 @@ "materialization": "TABLE", "tableSchema": "CREATE EXTERNAL TABLE `default`.`torpedos`(\n `id` int, \n `ship_id` int, \n `admiral_id` int)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.mapred.TextInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/torpedos'\nTBLPROPERTIES (\n 'TRANSLATED_TO_EXTERNAL'='TRUE', \n 'bucketing_version'='2', \n 'external.table.purge'='TRUE')" } + }, + "statistics": { + "dataSizeBytes": 8913.0, + "fieldStatistics": [ + { + "average": 500.5, + "distinctValueCount": 987.0, + "fieldPath": "id", + "maxValue": 1000.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 288.6749902572095 + }, + { + "average": 50.5, + "distinctValueCount": 100.0, + "fieldPath": "ship_id", + "maxValue": 100.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 28.86607004772212 + }, + { + "average": 5.5, + "distinctValueCount": 10.0, + "fieldPath": "admiral_id", + "maxValue": 10.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 2.8722813232690143 + } + ], + "recordCount": 1000.0 } }, { @@ -253,6 +451,44 @@ "materialization": "TABLE", "tableSchema": "CREATE EXTERNAL TABLE `default`.`u_data`(\n `userid` int, \n `movieid` int, \n `rating` int, \n `unixtime` string)\nROW FORMAT SERDE \n 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \nWITH SERDEPROPERTIES ( \n 'field.delim'='\\t', \n 'serialization.format'='\\t') \nSTORED AS INPUTFORMAT \n 'org.apache.hadoop.mapred.TextInputFormat' \nOUTPUTFORMAT \n 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\nLOCATION\n 'file:/opt/hive/data/warehouse/u_data'\nTBLPROPERTIES (\n 'TRANSLATED_TO_EXTERNAL'='TRUE', \n 'bucketing_version'='2', \n 'external.table.purge'='TRUE')" } + }, + "statistics": { + "dataSizeBytes": 1979173.0, + "fieldStatistics": [ + { + "average": 462.48475, + "distinctValueCount": 940.0, + "fieldPath": "userid", + "maxValue": 943.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 266.61308705207534 + }, + { + "average": 425.53013, + "distinctValueCount": 1720.0, + "fieldPath": "movieid", + "maxValue": 1682.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 330.7967023296682 + }, + { + "average": 3.52986, + "distinctValueCount": 5.0, + "fieldPath": "rating", + "maxValue": 5.0, + "minValue": 1.0, + "nullValueCount": 0.0, + "stdDev": 1.1256679707622494 + }, + { + "distinctValueCount": 49262.0, + "fieldPath": "unixtime", + "nullValueCount": 0.0 + } + ], + "recordCount": 100000.0 } } ] diff --git a/tests/hive/test_extractor.py b/tests/hive/test_extractor.py index 65f0868c..5b625025 100644 --- a/tests/hive/test_extractor.py +++ b/tests/hive/test_extractor.py @@ -5,6 +5,7 @@ from testcontainers.general import DockerContainer from metaphor.common.base_config import OutputConfig +from metaphor.common.column_statistics import ColumnStatistics from metaphor.common.event_util import EventUtil from metaphor.common.logger import get_logger from metaphor.hive.config import HiveRunConfig @@ -43,7 +44,14 @@ async def test_extractor(test_root_dir: str) -> None: port = container.get_exposed_port(10000) host = container.get_container_host_ip() - config = HiveRunConfig(output=OutputConfig(), host=host, port=int(port)) + config = HiveRunConfig( + output=OutputConfig(), + host=host, + port=int(port), + column_statistics=ColumnStatistics( + unique_count=True, avg_value=True, std_dev=True + ), + ) while True: try: