Skip to content

Commit

Permalink
Add connection related instrumentation (#43)
Browse files Browse the repository at this point in the history
Describe your changes
Add connection related instrumentation.

Ticket
Internal: https://jira.cloudera.com/browse/DBT-388

Testing procedure
With a dbt debug, check the output in logs/dbt.log, there should be an entry similar to:

(Connection close)
15:22:32.298920 [debug] [Thread-6  ]: Tracker adapter: Sending Event [{"data": {"event_type": "dbt_hive_close", "connection_state": "closed", "elapsed_time": "12.54", "auth": "N/A", "incremental_strategy": "N/A", "model_name": "N/A", "model_type": "N/A", "permissions": "N/A", "profile_name": "N/A", "sql_type": "N/A", "id": "af0d1791-29a7-4415-88b6-bdc34d4bcf87", "unique_host_hash": "cd2782200d6326126da659d53c3dfe44", "unique_user_hash": "3d23971613757087a31726e1b95f7b57", "unique_session_hash": "3bd8de5b61b414439cf16e9d96f06d46", "python_version": "3.9.12", "system": "Darwin", "machine": "arm64", "platform": "macOS-12.6-arm64-arm-64bit", "dbt_version": "1.1.2", "dbt_adapter": "hive-1.1.3"}}]

(Connection open - success)
16:37:29.825652 [debug] [Thread-2  ]: Tracker adapter: Sending Event [{"data": {"event_type": "dbt_impala_open", "auth": "ldap", "connection_state": "open", "elapsed_time": "0.01", "incremental_strategy": "N/A", "model_name": "N/A", "model_type": "N/A", "permissions": "N/A", "profile_name": "N/A", "sql_type": "N/A", "id": "ff76bab6-7395-4b44-8b08-8660f619d5cb", "unique_host_hash": "cd2782200d6326126da659d53c3dfe44", "unique_user_hash": "3d23971613757087a31726e1b95f7b57", "unique_session_hash": "8dde07c1d3eb64aada4866046687616c", "python_version": "3.9.12", "system": "Darwin", "machine": "arm64", "platform": "macOS-12.6-arm64-arm-64bit", "dbt_version": "1.1.2", "dbt_adapter": "impala-1.1.3", "project_name": "dbtdemo", "target_name": "dev_impala_demo_ldap", "no_of_threads": 1}}]

(Connection open - failure, ldap incorrect creds)
16:33:57.343942 [debug] [Thread-2  ]: Tracker adapter: Sending Event [{"data": {"event_type": "dbt_hive_open", "auth": "ldap", "connection_state": "fail", "elapsed_time": "19.05", "connection_exception": "HTTP code 401: Unauthorized", "incremental_strategy": "N/A", "model_name": "N/A", "model_type": "N/A", "permissions": "N/A", "profile_name": "N/A", "sql_type": "N/A", "id": "46680d80-aca1-47d6-8f5d-9272ab613cbc", "unique_host_hash": "cd2782200d6326126da659d53c3dfe44", "unique_user_hash": "3d23971613757087a31726e1b95f7b57", "unique_session_hash": "3e67a608f37299d6d7b6ba1204d68544", "python_version": "3.9.12", "system": "Darwin", "machine": "arm64", "platform": "macOS-12.6-arm64-arm-64bit", "dbt_version": "1.1.2", "dbt_adapter": "hive-1.1.3"}}]

(Connection open - failure, ldap - server incorrect)
16:35:57.801233 [debug] [Thread-2  ]: Tracker adapter: Sending Event [{"data": {"event_type": "dbt_hive_open", "auth": "ldap", "connection_state": "fail", "elapsed_time": "0.01", "connection_exception": "Failed after retrying 3 times", "incremental_strategy": "N/A", "model_name": "N/A", "model_type": "N/A", "permissions": "N/A", "profile_name": "N/A", "sql_type": "N/A", "id": "af1494cd-484f-4e8e-8894-0284c6b56d24", "unique_host_hash": "e39935894eec18fba9644b3a7e7c3e6a", "unique_user_hash": "3d23971613757087a31726e1b95f7b57", "unique_session_hash": "fa0bf54343207f40bab2153ba0a80695", "python_version": "3.9.12", "system": "Darwin", "machine": "arm64", "platform": "macOS-12.6-arm64-arm-64bit", "dbt_version": "1.1.2", "dbt_adapter": "hive-1.1.3"}}]

(Connection open - failure, kerberos server)
16:38:31.866322 [debug] [Thread-2  ]: Tracker adapter: Sending Event [{"data": {"event_type": "dbt_hive_open", "auth": "kerberos", "connection_state": "fail", "elapsed_time": "3.02", "connection_exception": "((' Miscellaneous failure (see text)', 851968), ('Error from KDC: LOOKING_UP_SERVER', -1765328377))", "incremental_strategy": "N/A", "model_name": "N/A", "model_type": "N/A", "permissions": "N/A", "profile_name": "N/A", "sql_type": "N/A", "id": "9c93905a-f0b7-4dc8-b309-a1af485e7a63", "unique_host_hash": "755624c34834153e99eb60a0d7fc07ac", "unique_user_hash": "6adf97f83acf6453d4a6a4b1070f3754", "unique_session_hash": "a8d3b2761287fccc5826babd28dda7fd", "python_version": "3.9.12", "system": "Darwin", "machine": "arm64", "platform": "macOS-12.6-arm64-arm-64bit", "dbt_version": "1.1.2", "dbt_adapter": "hive-1.1.3"}}]

Checklist for review
- [X] I have performed a self-review of my code
- [X] I have formatted my added/modified code to follow peop-8 standards
- [X] I have checked suggestions from pylint to make sure code is of good quality
  • Loading branch information
tovganesh authored Sep 22, 2022
1 parent d5ab21f commit 79c670d
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions dbt/adapters/hive/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
from dbt.utils import DECIMALS

from datetime import datetime
import sqlparams

from hologram.helpers import StrEnum
from dataclasses import dataclass, field
from typing import Any, Optional, Dict, Tuple
import base64
Expand Down Expand Up @@ -185,10 +183,12 @@ def open(cls, connection):
return connection

credentials = connection.credentials
connection_ex = None

auth_type = "insecure"
hive_conn = None
try:
connection_start_time = time.time()
# add configuration to yaml
if not credentials.auth_type:
hive_conn = impala.dbapi.connect(
Expand Down Expand Up @@ -223,13 +223,34 @@ def open(cls, connection):
raise dbt.exceptions.DbtProfileError(
"Invalid auth_type {} provided".format(credentials.auth_type)
)
connection_end_time = time.time()

connection.state = ConnectionState.OPEN
connection.handle = HiveConnectionWrapper(hive_conn)
connection.handle.cursor()
except Exception as exc:
logger.debug("Connection error: {}".format(exc))
connection_ex = exc
connection.state = ConnectionState.FAIL
connection.handle = None
connection_end_time = time.time()

# track usage
payload = {
"event_type": "dbt_hive_open",
"auth": auth_type,
"connection_state": connection.state,
"elapsed_time": "{:.2f}".format(
connection_end_time - connection_start_time
),
}

if connection.state == ConnectionState.FAIL:
payload["connection_exception"] = "{}".format(connection_ex)
tracker.track_usage(payload)
raise connection_ex
else:
tracker.track_usage(payload)

return connection

Expand All @@ -253,10 +274,30 @@ def exception_handler(self, sql: str):
def cancel(self, connection):
connection.handle.cancel()

def close(self, connection):
if connection.handle:
connection.handle.close()
connection.state = ConnectionState.CLOSED
@classmethod
def close(cls, connection):
try:
# if the connection is in closed or init, there's nothing to do
if connection.state in {ConnectionState.CLOSED, ConnectionState.INIT}:
return connection

connection_close_start_time = time.time()
connection = super().close(connection)
connection_close_end_time = time.time()

payload = {
"event_type": "dbt_hive_close",
"connection_state": ConnectionState.CLOSED,
"elapsed_time": "{:.2f}".format(
connection_close_end_time - connection_close_start_time
),
}

tracker.track_usage(payload)

return connection
except Exception as err:
logger.debug(f"Error closing connection {err}")

@classmethod
def get_response(cls, cursor):
Expand Down Expand Up @@ -369,5 +410,3 @@ def validate_creds(cls, creds, required):
" to connect to Hive".format(key, method)
)

else:
raise exc

0 comments on commit 79c670d

Please sign in to comment.