From 7bd40b30a9a806c5ffe4d52413b8355c31890096 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Fri, 20 Dec 2024 14:48:08 -0500 Subject: [PATCH] Sequence metric --- postgres/datadog_checks/postgres/metadata.py | 56 ++++++++++++++++++- postgres/metadata.csv | 1 + .../tests/compose/resources/01_postgres.sql | 1 + postgres/tests/compose/resources/02_setup.sh | 1 + .../tests/compose/resources/03_load_data.sh | 3 + postgres/tests/test_metadata.py | 32 ++++++++++- 6 files changed, 92 insertions(+), 2 deletions(-) diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index 09fd16752b992..7e9d5126a3293 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -21,7 +21,7 @@ from datadog_checks.postgres.util import get_list_chunks from .util import payload_pg_version -from .version_utils import VersionUtils +from .version_utils import V13, VersionUtils # default collection intervals in seconds DEFAULT_SETTINGS_COLLECTION_INTERVAL = 600 @@ -181,6 +181,28 @@ GROUP BY pi.inhparent; """ +PG_SEQUENCES_QUERY = """ +SELECT (stavalues1::text::bigint[])[array_upper(stavalues1::text::bigint[],1)] as current_value, +pg_attribute.attname AS column_name, +power(2, attlen * 8 - 1) - 1 as max_value, +t.relname AS table_name, +nsp.nspname AS schema_name +FROM pg_attribute +INNER JOIN pg_attrdef + ON adrelid = attrelid AND adnum = attnum AND pg_get_expr(adbin, adrelid) like 'nextval%' +INNER JOIN pg_class t ON attrelid = t.oid +INNER JOIN pg_namespace nsp ON t.relnamespace = nsp.oid AND relnamespace IN + (SELECT nsp.oid FROM pg_namespace nsp + LEFT JOIN pg_roles r on nsp.nspowner = r.oid + WHERE nspname NOT IN ( 'information_schema', 'pg_catalog' ) + AND nspname NOT LIKE 'pg_toast%' + AND nspname NOT LIKE 'pg_temp_%' + AND r.rolname != 'rds_superuser' + AND r.rolname != 'rdsadmin' + ) + INNER JOIN pg_statistic on attrelid = starelid AND attnum = staattnum; +""" + def agent_check_getter(self): return self._check @@ -313,6 +335,9 @@ def _collect_postgres_schemas(self): for database in schema_metadata: dbname = database["name"] + + self.collect_sequences(dbname) + if not self._should_collect_metadata(dbname, "database"): continue @@ -701,3 +726,32 @@ def _collect_postgres_settings(self): rows = cursor.fetchall() self._log.debug("Loaded %s rows from pg_settings", len(rows)) return [dict(row) for row in rows] + + @tracked_method(agent_check_getter=agent_check_getter) + def collect_sequences(self, dbname): + if self._check.version < V13: + # pg_sequence was introduced in Postgres 13 + return + with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + cursor.execute(PG_SEQUENCES_QUERY) + rows = cursor.fetchall() + # Emit a single gauge for each sequence + for row in rows: + if row["current_value"]: + self._check.gauge( + "postgresql.sequence.estimated_remaining", + row["max_value"] - row["current_value"], + tags=self._tags_no_db + + [ + "max_value:{:.0f}".format(row["max_value"]), + "current_value:{:.0f}".format(row["current_value"]), + "db:{}".format(dbname), + "schema:{}".format(row["schema_name"]), + "table:{}".format(row["table_name"]), + "column:{}".format(row["column_name"]), + ], + hostname=self._check.resolved_hostname, + raw=True, + ) + # TODO: We should eventually collect these on the backend for recommendations/visibility diff --git a/postgres/metadata.csv b/postgres/metadata.csv index 49183d67a61cc..42ae2d723e4f2 100644 --- a/postgres/metadata.csv +++ b/postgres/metadata.csv @@ -146,6 +146,7 @@ postgresql.rows_updated,gauge,,row,second,Enabled with `relations`. The number o postgresql.running,gauge,,,,The number of instances running.,0,postgres,running, postgresql.seq_rows_read,gauge,,row,second,"Enabled with `relations`. The number of live rows fetched by sequential scans. This metric is tagged with db, schema, table.",0,postgres,seq rows rd, postgresql.seq_scans,gauge,,scan,second,"Enabled with `relations`. The number of sequential scans initiated on this table. This metric is tagged with db, schema, table.",0,postgres,seq scans, +postgresql.sequence.estimated_remaining,gauge,,,,"The estimated remaining values of a sequence. This estimate is based on pg_statistics and will be more accurate for smaller tables and when ANALYZE has been run recently. This metric is tagged with db, schema, table, and column.",0,postgres,sequence remaining, postgresql.sessions.abandoned,count,,session,,Number of database sessions to this database that were terminated because connection to the client was lost. This metric is tagged with db.,-1,postgres,postgres sessions abandoned, postgresql.sessions.active_time,count,,millisecond,,"Time spent executing SQL statements in this database, in milliseconds (this corresponds to the states active and fastpath function call in pg_stat_activity). This metric is tagged with db.",0,postgres,postgres active time, postgresql.sessions.count,count,,session,,Total number of sessions established to this database. This metric is tagged with db.,0,postgres,postgres sessions, diff --git a/postgres/tests/compose/resources/01_postgres.sql b/postgres/tests/compose/resources/01_postgres.sql index 3035e7fdbdf2f..28fe605679013 100644 --- a/postgres/tests/compose/resources/01_postgres.sql +++ b/postgres/tests/compose/resources/01_postgres.sql @@ -12,6 +12,7 @@ REVOKE SELECT ON ALL tables IN SCHEMA pg_catalog from public; GRANT SELECT ON pg_stat_database TO datadog; GRANT SELECT ON pg_stat_database TO datadog_no_catalog; GRANT SELECT ON ALL tables IN SCHEMA pg_catalog to datadog; +GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public to datadog; CREATE DATABASE datadog_test; GRANT ALL PRIVILEGES ON DATABASE datadog_test TO datadog; CREATE DATABASE dogs; diff --git a/postgres/tests/compose/resources/02_setup.sh b/postgres/tests/compose/resources/02_setup.sh index aca2c6b034b14..89b0ea385ced3 100755 --- a/postgres/tests/compose/resources/02_setup.sh +++ b/postgres/tests/compose/resources/02_setup.sh @@ -15,6 +15,7 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" "datadog_test" <<-'EOSQL' CREATE EXTENSION pg_buffercache SCHEMA public; CREATE EXTENSION pg_stat_statements SCHEMA public; GRANT SELECT ON pg_stat_statements TO datadog; + GRANT SELECT ON pg_statistic TO datadog; CREATE SCHEMA IF NOT EXISTS datadog; GRANT USAGE ON SCHEMA datadog TO datadog; diff --git a/postgres/tests/compose/resources/03_load_data.sh b/postgres/tests/compose/resources/03_load_data.sh index 637b4e4a0cdc9..c8dcc434bde1f 100755 --- a/postgres/tests/compose/resources/03_load_data.sh +++ b/postgres/tests/compose/resources/03_load_data.sh @@ -36,10 +36,13 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-EOSQL SELECT * FROM persons; SELECT * FROM persons; SELECT * FROM persons; + VACUUM ANALYZE persons; CREATE SCHEMA public2; CREATE TABLE public2.cities (city VARCHAR(255), country VARCHAR(255), PRIMARY KEY(city)); GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO bob; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO blocking_bob; + + EOSQL # Create publication for logical replication tests diff --git a/postgres/tests/test_metadata.py b/postgres/tests/test_metadata.py index c85a2ccafb6a2..62acfcfe23c6e 100644 --- a/postgres/tests/test_metadata.py +++ b/postgres/tests/test_metadata.py @@ -10,7 +10,7 @@ from datadog_checks.base.utils.db.utils import DBMAsyncJob from .common import POSTGRES_VERSION -from .utils import run_one_check +from .utils import requires_over_13, run_one_check pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] @@ -404,6 +404,36 @@ def test_collect_schemas_interrupted(integration_check, dbm_instance, aggregator assert len(database_metadata[0]['schemas'][0]['tables']) == 1 +@requires_over_13 +def test_collect_sequences(integration_check, dbm_instance, aggregator): + dbm_instance["collect_schemas"] = {'enabled': True, 'collection_interval': 0.5} + dbm_instance['relations'] = [] + dbm_instance["database_autodiscovery"] = {"enabled": True, "include": ["datadog"]} + del dbm_instance['dbname'] + + check = integration_check(dbm_instance) + run_one_check(check, dbm_instance) + aggregator.assert_metric( + "postgresql.sequence.estimated_remaining", + value=None, + tags=[ + "column:{}".format('personid'), + "schema:{}".format('public'), + "table:{}".format('persons'), + "db:{}".format('datadog_test'), + "max_value:{}".format(2147483647), + "current_value:{}".format(2), + "database_hostname:stubbed.hostname", + "foo:bar", + "port:5432", + "postgresql_cluster_name:primary", + f'postgresql_version:{check.raw_version}', + f'system_identifier:{check.system_identifier}', + "replication_role:master", + ], + ) + + def assert_fields(keys: List[str], fields: List[str]): for field in fields: assert field in keys