From 89d794facc3e1fa728ccfbe6ac0323d668c8e021 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Thu, 24 Aug 2023 16:13:22 +0200 Subject: [PATCH] Introduce connection pool (#2829) Make it hold connection kwargs for local connections and all `NamedConnection` objects use them automatically. Also get rid of redundant `ConfigHandler.local_connect_kwargs`. On top of that we will introduce a dedicated connection for the REST API thread. --- patroni/postgresql/__init__.py | 15 ++--- patroni/postgresql/bootstrap.py | 2 +- patroni/postgresql/citus.py | 16 ++--- patroni/postgresql/config.py | 59 ++++++++++++------ patroni/postgresql/connection.py | 101 +++++++++++++++++++++++++------ patroni/postgresql/slots.py | 3 +- tests/test_bootstrap.py | 4 +- tests/test_citus.py | 2 +- tests/test_postgresql.py | 5 +- 9 files changed, 145 insertions(+), 62 deletions(-) diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 9b1991a0b..a37e15e12 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -18,7 +18,7 @@ from .callback_executor import CallbackAction, CallbackExecutor from .cancellable import CancellableSubprocess from .config import ConfigHandler, mtime -from .connection import Connection, get_connection_cursor +from .connection import ConnectionPool, get_connection_cursor from .citus import CitusHandler from .misc import parse_history, parse_lsn, postgres_major_version_to_int from .postmaster import PostmasterProcess @@ -79,7 +79,8 @@ def __init__(self, config: Dict[str, Any]) -> None: self.set_state('stopped') self._pending_restart = False - self._connection = Connection() + self.connection_pool = ConnectionPool() + self._connection = self.connection_pool.get('heartbeat') self.citus_handler = CitusHandler(self, config.get('citus')) self.config = ConfigHandler(self, config) self.config.check_directories() @@ -277,7 +278,7 @@ def pg_isready(self) -> str: :returns: 'ok' if PostgreSQL is up, 'reject' if starting up, 'no_resopnse' if not up.""" - r = self.config.local_connect_kwargs + r = self.connection_pool.conn_kwargs cmd = [self.pgcommand('pg_isready'), '-p', r['port'], '-d', self._database] # Host is not set if we are connecting via default unix socket @@ -328,10 +329,6 @@ def server_version(self) -> int: def connection(self) -> Union['connection3', 'Connection3[Any]']: return self._connection.get() - def set_connection_kwargs(self, kwargs: Dict[str, Any]) -> None: - self._connection.set_conn_kwargs(kwargs.copy()) - self.citus_handler.set_conn_kwargs(kwargs.copy()) - def _query(self, sql: str, *params: Any) -> List[Tuple[Any, ...]]: """Execute *sql* query with *params* and optionally return results. @@ -694,7 +691,7 @@ def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = # the former node, otherwise, we might get a stalled one # after kill -9, which would report incorrect data to # patroni. - self._connection.close() + self.connection_pool.close() if self.is_running(): logger.error('Cannot start PostgreSQL because one is already running.') @@ -765,7 +762,7 @@ def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = def checkpoint(self, connect_kwargs: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> Optional[str]: check_not_is_in_recovery = connect_kwargs is not None - connect_kwargs = connect_kwargs or self.config.local_connect_kwargs + connect_kwargs = connect_kwargs or self.connection_pool.conn_kwargs for p in ['connect_timeout', 'options']: connect_kwargs.pop(p, None) if timeout: diff --git a/patroni/postgresql/bootstrap.py b/patroni/postgresql/bootstrap.py index 6e25012b0..26025e43a 100644 --- a/patroni/postgresql/bootstrap.py +++ b/patroni/postgresql/bootstrap.py @@ -176,7 +176,7 @@ def call_post_bootstrap(self, config: Dict[str, Any]) -> bool: """ cmd = config.get('post_bootstrap') or config.get('post_init') if cmd: - r = self._postgresql.config.local_connect_kwargs + r = self._postgresql.connection_pool.conn_kwargs connstring = self._postgresql.config.format_dsn(r, True) if 'host' not in r: # https://www.postgresql.org/docs/current/static/libpq-pgpass.html diff --git a/patroni/postgresql/citus.py b/patroni/postgresql/citus.py index f659e3255..09f77f2b0 100644 --- a/patroni/postgresql/citus.py +++ b/patroni/postgresql/citus.py @@ -6,7 +6,6 @@ from urllib.parse import urlparse from typing import Any, Dict, List, Optional, Union, Tuple, TYPE_CHECKING -from .connection import Connection from ..dcs import CITUS_COORDINATOR_GROUP_ID, Cluster from ..psycopg import connect, quote_ident @@ -71,7 +70,10 @@ def __init__(self, postgresql: 'Postgresql', config: Optional[Dict[str, Union[st self.daemon = True self._postgresql = postgresql self._config = config - self._connection = Connection() + if config: + self._connection = postgresql.connection_pool.get( + 'citus', {'dbname': config['database'], + 'options': '-c statement_timeout=0 -c idle_in_transaction_session_timeout=0'}) self._pg_dist_node: Dict[int, PgDistNode] = {} # Cache of pg_dist_node: {groupid: PgDistNode()} self._tasks: List[PgDistNode] = [] # Requests to change pg_dist_node, every task is a `PgDistNode` self._in_flight: Optional[PgDistNode] = None # Reference to the `PgDistNode` being changed in a transaction @@ -91,12 +93,6 @@ def is_coordinator(self) -> bool: def is_worker(self) -> bool: return self.is_enabled() and not self.is_coordinator() - def set_conn_kwargs(self, kwargs: Dict[str, Any]) -> None: - if isinstance(self._config, dict): # self.is_enabled(): - kwargs.update({'dbname': self._config['database'], - 'options': '-c statement_timeout=0 -c idle_in_transaction_session_timeout=0'}) - self._connection.set_conn_kwargs(kwargs) - def schedule_cache_rebuild(self) -> None: with self._condition: self._schedule_load_pg_dist_node = True @@ -359,8 +355,8 @@ def bootstrap(self) -> None: if not isinstance(self._config, dict): # self.is_enabled() return - conn_kwargs = self._postgresql.config.local_connect_kwargs - conn_kwargs['options'] = '-c synchronous_commit=local -c statement_timeout=0' + conn_kwargs = {**self._postgresql.connection_pool.conn_kwargs, + 'options': '-c synchronous_commit=local -c statement_timeout=0'} if self._config['database'] != self._postgresql.database: conn = connect(**conn_kwargs) try: diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index 51bbf9af8..79141256e 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -942,24 +942,32 @@ def _get_tcp_local_address(self) -> str: return 'localhost' # connection via localhost is preferred return listen_addresses[0].strip() # can't use localhost, take first address from listen_addresses - @property - def local_connect_kwargs(self) -> Dict[str, Any]: - ret = self._local_address.copy() - # add all of the other connection settings that are available - ret.update(self._superuser) - # if the "username" parameter is present, it actually needs to be "user" - # for connecting to PostgreSQL - if 'username' in self._superuser: - ret['user'] = self._superuser['username'] - del ret['username'] - # ensure certain Patroni configurations are available - ret.update({'dbname': self._postgresql.database, - 'fallback_application_name': 'Patroni', - 'connect_timeout': 3, - 'options': '-c statement_timeout=2000'}) - return ret - def resolve_connection_addresses(self) -> None: + """Calculates and sets local and remote connection urls and options. + + This method sets: + * :attr:`Postgresql.connection_string ` attribute, which + is later written to the member key in DCS as ``conn_url``. + * :attr:`ConfigHandler.local_replication_address` attribute, which is used for replication connections to + local postgres. + * :attr:`ConnectionPool.conn_kwargs ` attribute, + which is used for superuser connections to local postgres. + + .. note:: + If there is a valid directory in ``postgresql.parameters.unix_socket_directories`` in the Patroni + configuration and ``postgresql.use_unix_socket`` and/or ``postgresql.use_unix_socket_repl`` + are set to ``True``, we respectively use unix sockets for superuser and replication connections + to local postgres. + + If there is a requirement to use unix sockets, but nothing is set in the + ``postgresql.parameters.unix_socket_directories``, we omit a ``host`` in connection parameters relying + on the ability of ``libpq`` to connect via some default unix socket directory. + + If unix sockets are not requested we "switch" to TCP, prefering to use ``localhost`` if it is possible + to deduce that Postgres is listening on a local interface address. + + Otherwise we just used the first address specified in the ``listen_addresses`` GUC. + """ port = self._server_parameters['port'] tcp_local_address = self._get_tcp_local_address() netloc = self._config.get('connect_address') or tcp_local_address + ':' + port @@ -972,12 +980,25 @@ def resolve_connection_addresses(self) -> None: tcp_local_address = {'host': tcp_local_address, 'port': port} - self._local_address = unix_local_address if self._config.get('use_unix_socket') else tcp_local_address self.local_replication_address = unix_local_address\ if self._config.get('use_unix_socket_repl') else tcp_local_address self._postgresql.connection_string = uri('postgres', netloc, self._postgresql.database) - self._postgresql.set_connection_kwargs(self.local_connect_kwargs) + + local_address = unix_local_address if self._config.get('use_unix_socket') else tcp_local_address + local_conn_kwargs = { + **local_address, + **self._superuser, + 'dbname': self._postgresql.database, + 'fallback_application_name': 'Patroni', + 'connect_timeout': 3, + 'options': '-c statement_timeout=2000' + } + # if the "username" parameter is present, it actually needs to be "user" for connecting to PostgreSQL + if 'username' in local_conn_kwargs: + local_conn_kwargs['user'] = local_conn_kwargs.pop('username') + # "notify" connection_pool about the "new" local connection address + self._postgresql.connection_pool.conn_kwargs = local_conn_kwargs def _get_pg_settings( self, names: Collection[str] diff --git a/patroni/postgresql/connection.py b/patroni/postgresql/connection.py index 5bb97a1ff..2a50dbb5b 100644 --- a/patroni/postgresql/connection.py +++ b/patroni/postgresql/connection.py @@ -2,9 +2,9 @@ from contextlib import contextmanager from threading import Lock -from typing import Any, Dict, Iterator, List, Union, Tuple, TYPE_CHECKING +from typing import Any, Dict, Iterator, List, Optional, Union, Tuple, TYPE_CHECKING if TYPE_CHECKING: # pragma: no cover - from psycopg import Connection as Connection3, Cursor + from psycopg import Connection, Cursor from psycopg2 import connection, cursor from .. import psycopg @@ -13,27 +13,34 @@ logger = logging.getLogger(__name__) -class Connection: - """Helper class to manage connections from Patroni to PostgreSQL. +class NamedConnection: + """Helper class to manage ``psycopg`` connections from Patroni to PostgreSQL. :ivar server_version: PostgreSQL version in integer format where we are connected to. """ server_version: int - def __init__(self) -> None: - """Create an instance of :class:`Connection` class.""" + def __init__(self, pool: 'ConnectionPool', name: str, kwargs_override: Optional[Dict[str, Any]]) -> None: + """Create an instance of :class:`NamedConnection` class. + + :param pool: reference to a :class:`ConnectionPool` object. + :param name: name of the connection. + :param kwargs_override: :class:`dict` object with connection parameters that should be + different from default values provided by connection *pool*. + """ + self._pool = pool + self._name = name + self._kwargs_override = kwargs_override or {} self._lock = Lock() # used to make sure that only one connection to postgres is established self._connection = None - def set_conn_kwargs(self, conn_kwargs: Dict[str, Any]) -> None: - """Set connection parameters, like user, password, host, port and so on. - - :param conn_kwargs: connection parameters as a dictionary. - """ - self._conn_kwargs = conn_kwargs + @property + def _conn_kwargs(self) -> Dict[str, Any]: + """Connection parameters for this :class:`NamedConnection`.""" + return {**self._pool.conn_kwargs, **self._kwargs_override, 'application_name': f'Patroni {self._name}'} - def get(self) -> Union['connection', 'Connection3[Any]']: + def get(self) -> Union['connection', 'Connection[Any]']: """Get ``psycopg``/``psycopg2`` connection object. .. note:: @@ -43,7 +50,7 @@ def get(self) -> Union['connection', 'Connection3[Any]']: """ with self._lock: if not self._connection or self._connection.closed != 0: - logger.info("establishing a new patroni connection to postgres") + logger.info("establishing a new patroni %s connection to postgres", self._name) self._connection = psycopg.connect(**self._conn_kwargs) self.server_version = getattr(self._connection, 'server_version', 0) return self._connection @@ -76,12 +83,72 @@ def query(self, sql: str, *params: Any) -> List[Tuple[Any, ...]]: raise exc raise PostgresConnectionException('connection problems') from exc - def close(self) -> None: - """Close the psycopg connection to postgres.""" + def close(self, silent: bool = False) -> bool: + """Close the psycopg connection to postgres. + + :param silent: whether the method should not write logs. + + :returns: ``True`` if ``psycopg`` connection was closed, ``False`` otherwise.`` + """ + ret = False if self._connection and self._connection.closed == 0: self._connection.close() - logger.info("closed patroni connection to postgres") + if not silent: + logger.info("closed patroni %s connection to postgres", self._name) + ret = True self._connection = None + return ret + + +class ConnectionPool: + """Helper class to manage named connections from Patroni to PostgreSQL. + + The instance keeps named :class:`NamedConnection` objects and parameters that must be used for new connections. + """ + + def __init__(self) -> None: + """Create an instance of :class:`ConnectionPool` class.""" + self._lock = Lock() + self._connections: Dict[str, NamedConnection] = {} + self._conn_kwargs: Dict[str, Any] = {} + + @property + def conn_kwargs(self) -> Dict[str, Any]: + """Connection parameters that must be used for new ``psycopg`` connections.""" + with self._lock: + return self._conn_kwargs.copy() + + @conn_kwargs.setter + def conn_kwargs(self, value: Dict[str, Any]) -> None: + """Set new connection parameters. + + :param value: :class:`dict` object with connection parameters. + """ + with self._lock: + self._conn_kwargs = value + + def get(self, name: str, kwargs_override: Optional[Dict[str, Any]] = None) -> NamedConnection: + """Get a new named :class:`NamedConnection` object from the pool. + + .. note:: + Creates a new :class:`NamedConnection` object if it doesn't yet exist in the pool. + + :param name: name of the connection. + :param kwargs_override: :class:`dict` object with connection parameters that should be + different from default values provided by :attr:`conn_kwargs`. + + :returns: :class:`NamedConnection` object. + """ + with self._lock: + if name not in self._connections: + self._connections[name] = NamedConnection(self, name, kwargs_override) + return self._connections[name] + + def close(self) -> None: + """Close all named connections from Patroni to PostgreSQL registered in the pool.""" + with self._lock: + if any(conn.close(True) for conn in self._connections.values()): + logger.info("closed patroni connections to postgres") @contextmanager diff --git a/patroni/postgresql/slots.py b/patroni/postgresql/slots.py index e4543c711..51a8fc5ad 100644 --- a/patroni/postgresql/slots.py +++ b/patroni/postgresql/slots.py @@ -384,8 +384,7 @@ def get_local_connection_cursor(self, **kwargs: Any) -> Iterator[Union['cursor', :yields: connection cursor object, note implementation varies depending on version of :mod:`psycopg`. """ - conn_kwargs = self._postgresql.config.local_connect_kwargs - conn_kwargs.update(kwargs) + conn_kwargs = {**self._postgresql.connection_pool.conn_kwargs, **kwargs} with get_connection_cursor(**conn_kwargs) as cur: yield cur diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 9f98fecbc..c922fcae4 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -250,7 +250,7 @@ def test_call_post_bootstrap(self, mock_cancellable_subprocess_call): self.assertFalse(self.b.call_post_bootstrap({'post_init': '/bin/false'})) mock_cancellable_subprocess_call.return_value = 0 - self.p.config.superuser.pop('username') + self.p.connection_pool._conn_kwargs.pop('user') self.assertTrue(self.b.call_post_bootstrap({'post_init': '/bin/false'})) mock_cancellable_subprocess_call.assert_called() args, kwargs = mock_cancellable_subprocess_call.call_args @@ -258,7 +258,7 @@ def test_call_post_bootstrap(self, mock_cancellable_subprocess_call): self.assertEqual(args[0], ['/bin/false', 'dbname=postgres host=127.0.0.2 port=5432']) mock_cancellable_subprocess_call.reset_mock() - self.p.config._local_address.pop('host') + self.p.connection_pool._conn_kwargs.pop('host') self.assertTrue(self.b.call_post_bootstrap({'post_init': '/bin/false'})) mock_cancellable_subprocess_call.assert_called() self.assertEqual(mock_cancellable_subprocess_call.call_args[0][0], ['/bin/false', 'dbname=postgres port=5432']) diff --git a/tests/test_citus.py b/tests/test_citus.py index 7c2d63bbc..9849a069a 100644 --- a/tests/test_citus.py +++ b/tests/test_citus.py @@ -13,7 +13,7 @@ class TestCitus(BaseTestPostgresql): def setUp(self): super(TestCitus, self).setUp() self.c = self.p.citus_handler - self.c.set_conn_kwargs({'host': 'localhost', 'dbname': 'postgres'}) + self.p.connection_pool.conn_kwargs = {'host': 'localhost', 'dbname': 'postgres'} self.cluster = get_cluster_initialized_with_leader() self.cluster.workers[1] = self.cluster diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index 510c2a90b..29b25ee47 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -577,7 +577,10 @@ def test_resolve_connection_addresses(self): self.assertEqual(self.p.config.local_replication_address, {'host': '/tmp', 'port': '5432'}) self.p.config._server_parameters.pop('unix_socket_directories') self.p.config.resolve_connection_addresses() - self.assertEqual(self.p.config._local_address, {'port': '5432'}) + self.assertEqual(self.p.connection_pool.conn_kwargs, {'connect_timeout': 3, 'dbname': 'postgres', + 'fallback_application_name': 'Patroni', + 'options': '-c statement_timeout=2000', + 'password': 'test', 'port': '5432', 'user': 'foo'}) @patch.object(Postgresql, '_version_file_exists', Mock(return_value=True)) def test_get_major_version(self):