diff --git a/dbt/adapters/hive/connections.py b/dbt/adapters/hive/connections.py index 9cc0056..e98c1c7 100644 --- a/dbt/adapters/hive/connections.py +++ b/dbt/adapters/hive/connections.py @@ -30,9 +30,7 @@ from dbt.utils import DECIMALS from datetime import datetime -import sqlparams -from hologram.helpers import StrEnum from dataclasses import dataclass, field from typing import Any, Optional, Dict, Tuple import base64 @@ -185,10 +183,12 @@ def open(cls, connection): return connection credentials = connection.credentials + connection_ex = None auth_type = "insecure" hive_conn = None try: + connection_start_time = time.time() # add configuration to yaml if not credentials.auth_type: hive_conn = impala.dbapi.connect( @@ -223,13 +223,34 @@ def open(cls, connection): raise dbt.exceptions.DbtProfileError( "Invalid auth_type {} provided".format(credentials.auth_type) ) + connection_end_time = time.time() connection.state = ConnectionState.OPEN connection.handle = HiveConnectionWrapper(hive_conn) + connection.handle.cursor() except Exception as exc: logger.debug("Connection error: {}".format(exc)) + connection_ex = exc connection.state = ConnectionState.FAIL connection.handle = None + connection_end_time = time.time() + + # track usage + payload = { + "event_type": "dbt_hive_open", + "auth": auth_type, + "connection_state": connection.state, + "elapsed_time": "{:.2f}".format( + connection_end_time - connection_start_time + ), + } + + if connection.state == ConnectionState.FAIL: + payload["connection_exception"] = "{}".format(connection_ex) + tracker.track_usage(payload) + raise connection_ex + else: + tracker.track_usage(payload) return connection @@ -253,10 +274,30 @@ def exception_handler(self, sql: str): def cancel(self, connection): connection.handle.cancel() - def close(self, connection): - if connection.handle: - connection.handle.close() - connection.state = ConnectionState.CLOSED + @classmethod + def close(cls, connection): + try: + # if the connection is in closed or init, there's nothing to do + if connection.state in {ConnectionState.CLOSED, ConnectionState.INIT}: + return connection + + connection_close_start_time = time.time() + connection = super().close(connection) + connection_close_end_time = time.time() + + payload = { + "event_type": "dbt_hive_close", + "connection_state": ConnectionState.CLOSED, + "elapsed_time": "{:.2f}".format( + connection_close_end_time - connection_close_start_time + ), + } + + tracker.track_usage(payload) + + return connection + except Exception as err: + logger.debug(f"Error closing connection {err}") @classmethod def get_response(cls, cursor): @@ -369,5 +410,3 @@ def validate_creds(cls, creds, required): " to connect to Hive".format(key, method) ) - else: - raise exc