Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Posgres to psycopg3 #19322

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .ddev/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ oauthlib = ['BSD-3-Clause']
mmh3 = ['CC0-1.0']
# https://github.com/paramiko/paramiko/blob/master/LICENSE
paramiko = ['LGPL-2.1-only']
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
psycopg = ['LGPL-3.0-only']
# https://github.com/psycopg/psycopg2/blob/master/LICENSE
# https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER
psycopg2-binary = ['LGPL-3.0-only', 'BSD-3-Clause']
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ply,PyPI,BSD-3-Clause,Copyright (C) 2001-2018
prometheus-client,PyPI,Apache-2.0,Copyright 2015 The Prometheus Authors
protobuf,PyPI,BSD-3-Clause,Copyright 2008 Google Inc. All rights reserved.
psutil,PyPI,BSD-3-Clause,"Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola"
psycopg,PyPI,LGPL-3.0-only,Copyright (C) 2020 The Psycopg Team
psycopg2-binary,PyPI,BSD-3-Clause,Copyright 2013 Federico Di Gregorio
psycopg2-binary,PyPI,LGPL-3.0-only,Copyright (C) 2013 Federico Di Gregorio
pyOpenSSL,PyPI,Apache-2.0,Copyright The pyOpenSSL developers
Expand Down
1 change: 1 addition & 0 deletions agent_requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ prometheus-client==0.20.0
protobuf==5.27.3
psutil==6.0.0
psycopg2-binary==2.9.9
psycopg==3.2.3
pyasn1==0.4.8
pycryptodomex==3.20.0
pydantic==2.8.2
Expand Down
1 change: 1 addition & 0 deletions postgres/changelog.d/19322.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Upgrade psycopg to version 3 for Postgres integration
18 changes: 10 additions & 8 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from typing import Callable, Dict

import psycopg2
import psycopg


class ConnectionPoolFullError(Exception):
Expand All @@ -23,7 +23,7 @@ def __str__(self):
class ConnectionInfo:
def __init__(
self,
connection: psycopg2.extensions.connection,
connection: psycopg.Connection,
deadline: int,
active: bool,
last_accessed: int,
Expand Down Expand Up @@ -86,9 +86,9 @@ def _get_connection_raw(
dbname: str,
ttl_ms: int,
timeout: int = None,
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
startup_fn: Callable[[psycopg.Connection], None] = None,
persistent: bool = False,
) -> psycopg2.extensions.connection:
) -> psycopg.Connection:
"""
Return a connection from the pool.
Pass a function to startup_func if there is an action needed with the connection
Expand Down Expand Up @@ -117,7 +117,7 @@ def _get_connection_raw(
# if already in pool, retain persistence status
persistent = conn.persistent

if db.status != psycopg2.extensions.STATUS_READY:
if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()

Expand All @@ -138,7 +138,7 @@ def get_connection(
dbname: str,
ttl_ms: int,
timeout: int = None,
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
startup_fn: Callable[[psycopg.Connection], None] = None,
persistent: bool = False,
):
"""
Expand All @@ -147,12 +147,14 @@ def get_connection(
make a new connection if the max_conn limit hasn't been reached.
Blocks until a connection can be added to the pool,
and optionally takes a timeout in seconds.
Note that leaving a connection context here does NOT close the connection in psycopg2;
Note that leaving a connection context here does NOT close the connection in psycopg;
connections must be manually closed by `close_all_connections()`.
"""
try:
with self._mu:
db = self._get_connection_raw(dbname, ttl_ms, timeout, startup_fn, persistent)
db = self._get_connection_raw(
dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, startup_fn=startup_fn, persistent=persistent
)
yield db
finally:
with self._mu:
Expand Down
7 changes: 3 additions & 4 deletions postgres/datadog_checks/postgres/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

import psycopg2.extensions
import psycopg2.extras
import psycopg

from datadog_checks.base.utils.db.sql_commenter import add_sql_comment

Expand All @@ -28,9 +27,9 @@ def execute(self, query, vars=None, ignore_query_metric=False):
return super().execute(query, vars)


class CommenterCursor(BaseCommenterCursor, psycopg2.extensions.cursor):
class CommenterCursor(BaseCommenterCursor, psycopg.ClientCursor):
pass


class CommenterDictCursor(BaseCommenterCursor, psycopg2.extras.DictCursor):
class CommenterDictCursor(BaseCommenterCursor, psycopg.ClientCursor):
pass
3 changes: 1 addition & 2 deletions postgres/datadog_checks/postgres/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from datadog_checks.base import AgentCheck
from datadog_checks.base.utils.discovery import Discovery
from datadog_checks.postgres.cursor import CommenterCursor
from datadog_checks.postgres.util import DatabaseConfigurationError, warning_with_tags

AUTODISCOVERY_QUERY: str = """select datname from pg_catalog.pg_database where datistemplate = false;"""
Expand Down Expand Up @@ -72,7 +71,7 @@ def get_items(self) -> List[str]:

def _get_databases(self) -> List[str]:
with self.db_pool.get_connection(self._db, self._default_ttl) as conn:
with conn.cursor(cursor_factory=CommenterCursor) as cursor:
with conn.cursor() as cursor:
cursor.execute(AUTODISCOVERY_QUERY)
databases = list(cursor.fetchall())
databases = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import logging
import re

import psycopg2
import psycopg

from datadog_checks.base.utils.db.sql import compute_sql_signature
from datadog_checks.base.utils.tracking import tracked_method
from datadog_checks.postgres.cursor import CommenterDictCursor

from .util import DBExplainError
from .version_utils import V12
Expand Down Expand Up @@ -77,7 +76,7 @@ def explain_statement(self, dbname, statement, obfuscated_statement):
if self._check.version < V12:
# if pg version < 12, skip explaining parameterized queries because
# plan_cache_mode is not supported
e = psycopg2.errors.UndefinedParameter("Unable to explain parameterized query")
e = psycopg.errors.UndefinedParameter("Unable to explain parameterized query")
logger.debug(
"Unable to explain parameterized query. Postgres version %s does not support plan_cache_mode",
self._check.version,
Expand Down Expand Up @@ -180,16 +179,14 @@ def _deallocate_prepared_statement(self, dbname, query_signature):
)

def _execute_query(self, dbname, query):
# Psycopg2 connections do not get closed when context ends;
# leaving context will just mark the connection as inactive in MultiDatabaseConnectionPool
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
with conn.cursor() as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query, ignore_query_metric=True)

def _execute_query_and_fetch_rows(self, dbname, query):
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
with conn.cursor() as cursor:
cursor.execute(query, ignore_query_metric=True)
return cursor.fetchall()

Expand Down
75 changes: 38 additions & 37 deletions postgres/datadog_checks/postgres/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import time
from typing import Dict, List, Optional, Tuple, Union # noqa: F401

import psycopg2

from datadog_checks.postgres.cursor import CommenterDictCursor
import psycopg
from psycopg.rows import dict_row

try:
import datadog_agent
Expand Down Expand Up @@ -219,7 +218,7 @@ def __init__(self, check, config, shutdown_callback):
enabled=is_affirmative(config.resources_metadata_config.get("enabled", True)),
dbms="postgres",
min_collection_interval=config.min_collection_interval,
expected_db_exceptions=(psycopg2.errors.DatabaseError,),
expected_db_exceptions=(psycopg.errors.DatabaseError,),
job_name="database-metadata",
shutdown_callback=shutdown_callback,
)
Expand Down Expand Up @@ -317,7 +316,7 @@ def _collect_postgres_schemas(self):
continue

with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with conn.cursor(row_factory=dict_row) as cursor:
for schema in database["schemas"]:
if not self._should_collect_metadata(schema["name"], "schema"):
continue
Expand Down Expand Up @@ -432,9 +431,7 @@ def _collect_schema_info(self):
self._last_schemas_query_time = time.time()
return metadata

def _query_database_information(
self, cursor: psycopg2.extensions.cursor, dbname: str
) -> Dict[str, Union[str, int]]:
def _query_database_information(self, cursor: psycopg.Cursor, dbname: str) -> Dict[str, Union[str, int]]:
"""
Collect database info. Returns
description: str
Expand All @@ -447,7 +444,7 @@ def _query_database_information(
row = cursor.fetchone()
return row

def _query_schema_information(self, cursor: psycopg2.extensions.cursor, dbname: str) -> Dict[str, str]:
def _query_schema_information(self, cursor: psycopg.Cursor, dbname: str) -> Dict[str, str]:
"""
Collect user schemas. Returns
id: str
Expand Down Expand Up @@ -553,7 +550,7 @@ def sort_tables(info):
return table_info[:limit]

def _query_tables_for_schema(
self, cursor: psycopg2.extensions.cursor, schema_id: str, dbname: str
self, cursor: psycopg.Cursor, schema_id: str, dbname: str
) -> List[Dict[str, Union[str, Dict]]]:
"""
Collect list of tables for a schema. Returns a list of dictionaries
Expand Down Expand Up @@ -584,7 +581,7 @@ def _query_tables_for_schema(
return table_payloads

def _query_table_information(
self, cursor: psycopg2.extensions.cursor, schema_name: str, table_info: List[Dict[str, Union[str, bool]]]
self, cursor: psycopg.Cursor, schema_name: str, table_info: List[Dict[str, Union[str, bool]]]
) -> List[Dict[str, Union[str, Dict]]]:
"""
Collect table information . Returns a dictionary
Expand Down Expand Up @@ -655,7 +652,7 @@ def _query_table_information(
def _collect_metadata_for_database(self, dbname):
metadata = {}
with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with conn.cursor(row_factory=dict_row) as cursor:
database_info = self._query_database_information(cursor, dbname)
metadata.update(
{
Expand All @@ -676,28 +673,32 @@ def _collect_metadata_for_database(self, dbname):
@tracked_method(agent_check_getter=agent_check_getter)
def _collect_postgres_settings(self):
with self._check._get_main_db() as conn:
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
# Get loaded extensions
cursor.execute(PG_EXTENSIONS_QUERY)
rows = cursor.fetchall()
query = PG_SETTINGS_QUERY
for row in rows:
extension = row['extname']
if extension in PG_EXTENSION_LOADER_QUERY:
query = PG_EXTENSION_LOADER_QUERY[extension] + "\n" + query
else:
self._log.warning("unable to collect settings for unknown extension %s", extension)

if self.pg_settings_ignored_patterns:
query = query + " WHERE name NOT LIKE ALL(%s)"

self._log.debug(
"Running query [%s] and patterns are %s",
query,
self.pg_settings_ignored_patterns,
)
self._time_since_last_settings_query = time.time()
cursor.execute(query, (self.pg_settings_ignored_patterns,))
rows = cursor.fetchall()
self._log.debug("Loaded %s rows from pg_settings", len(rows))
return [dict(row) for row in rows]
with conn.cursor(row_factory=dict_row) as cursor:
with conn.transaction():
# Get loaded extensions
cursor.execute(PG_EXTENSIONS_QUERY)
rows = cursor.fetchall()
query = PG_SETTINGS_QUERY
for row in rows:
extension = row['extname']
# Run query to force loading of extension
# This allow us to reliably collect extension settings
if extension in PG_EXTENSION_LOADER_QUERY:
cursor.execute(PG_EXTENSION_LOADER_QUERY[extension])
else:
self._log.warning("unable to collect settings for unknown extension %s", extension)

if self.pg_settings_ignored_patterns:
query = query + " WHERE name NOT LIKE ALL(%s)"

self._log.debug(
"Running query [%s] and patterns are %s",
query,
self.pg_settings_ignored_patterns,
)
self._time_since_last_settings_query = time.time()
cursor.execute(query, (self.pg_settings_ignored_patterns,))
rows = cursor.fetchall()
self._log.warning("Loaded %s rows from pg_settings", rows)
self._log.debug("Loaded %s rows from pg_settings", len(rows))
return rows
Loading
Loading