Skip to content

Commit

Permalink
Introduce connection pool (patroni#2829)
Browse files Browse the repository at this point in the history
Make it hold connection kwargs for local connections and all `NamedConnection` objects use them automatically.

Also get rid of redundant `ConfigHandler.local_connect_kwargs`.

On top of that we will introduce a dedicated connection for the REST API thread.
  • Loading branch information
CyberDem0n authored Aug 24, 2023
1 parent 3333e78 commit 89d794f
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 62 deletions.
15 changes: 6 additions & 9 deletions patroni/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .callback_executor import CallbackAction, CallbackExecutor
from .cancellable import CancellableSubprocess
from .config import ConfigHandler, mtime
from .connection import Connection, get_connection_cursor
from .connection import ConnectionPool, get_connection_cursor
from .citus import CitusHandler
from .misc import parse_history, parse_lsn, postgres_major_version_to_int
from .postmaster import PostmasterProcess
Expand Down Expand Up @@ -79,7 +79,8 @@ def __init__(self, config: Dict[str, Any]) -> None:
self.set_state('stopped')

self._pending_restart = False
self._connection = Connection()
self.connection_pool = ConnectionPool()
self._connection = self.connection_pool.get('heartbeat')
self.citus_handler = CitusHandler(self, config.get('citus'))
self.config = ConfigHandler(self, config)
self.config.check_directories()
Expand Down Expand Up @@ -277,7 +278,7 @@ def pg_isready(self) -> str:
:returns: 'ok' if PostgreSQL is up, 'reject' if starting up, 'no_resopnse' if not up."""

r = self.config.local_connect_kwargs
r = self.connection_pool.conn_kwargs
cmd = [self.pgcommand('pg_isready'), '-p', r['port'], '-d', self._database]

# Host is not set if we are connecting via default unix socket
Expand Down Expand Up @@ -328,10 +329,6 @@ def server_version(self) -> int:
def connection(self) -> Union['connection3', 'Connection3[Any]']:
return self._connection.get()

def set_connection_kwargs(self, kwargs: Dict[str, Any]) -> None:
self._connection.set_conn_kwargs(kwargs.copy())
self.citus_handler.set_conn_kwargs(kwargs.copy())

def _query(self, sql: str, *params: Any) -> List[Tuple[Any, ...]]:
"""Execute *sql* query with *params* and optionally return results.
Expand Down Expand Up @@ -694,7 +691,7 @@ def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] =
# the former node, otherwise, we might get a stalled one
# after kill -9, which would report incorrect data to
# patroni.
self._connection.close()
self.connection_pool.close()

if self.is_running():
logger.error('Cannot start PostgreSQL because one is already running.')
Expand Down Expand Up @@ -765,7 +762,7 @@ def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] =
def checkpoint(self, connect_kwargs: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None) -> Optional[str]:
check_not_is_in_recovery = connect_kwargs is not None
connect_kwargs = connect_kwargs or self.config.local_connect_kwargs
connect_kwargs = connect_kwargs or self.connection_pool.conn_kwargs
for p in ['connect_timeout', 'options']:
connect_kwargs.pop(p, None)
if timeout:
Expand Down
2 changes: 1 addition & 1 deletion patroni/postgresql/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def call_post_bootstrap(self, config: Dict[str, Any]) -> bool:
"""
cmd = config.get('post_bootstrap') or config.get('post_init')
if cmd:
r = self._postgresql.config.local_connect_kwargs
r = self._postgresql.connection_pool.conn_kwargs
connstring = self._postgresql.config.format_dsn(r, True)
if 'host' not in r:
# https://www.postgresql.org/docs/current/static/libpq-pgpass.html
Expand Down
16 changes: 6 additions & 10 deletions patroni/postgresql/citus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, Union, Tuple, TYPE_CHECKING

from .connection import Connection
from ..dcs import CITUS_COORDINATOR_GROUP_ID, Cluster
from ..psycopg import connect, quote_ident

Expand Down Expand Up @@ -71,7 +70,10 @@ def __init__(self, postgresql: 'Postgresql', config: Optional[Dict[str, Union[st
self.daemon = True
self._postgresql = postgresql
self._config = config
self._connection = Connection()
if config:
self._connection = postgresql.connection_pool.get(
'citus', {'dbname': config['database'],
'options': '-c statement_timeout=0 -c idle_in_transaction_session_timeout=0'})
self._pg_dist_node: Dict[int, PgDistNode] = {} # Cache of pg_dist_node: {groupid: PgDistNode()}
self._tasks: List[PgDistNode] = [] # Requests to change pg_dist_node, every task is a `PgDistNode`
self._in_flight: Optional[PgDistNode] = None # Reference to the `PgDistNode` being changed in a transaction
Expand All @@ -91,12 +93,6 @@ def is_coordinator(self) -> bool:
def is_worker(self) -> bool:
return self.is_enabled() and not self.is_coordinator()

def set_conn_kwargs(self, kwargs: Dict[str, Any]) -> None:
if isinstance(self._config, dict): # self.is_enabled():
kwargs.update({'dbname': self._config['database'],
'options': '-c statement_timeout=0 -c idle_in_transaction_session_timeout=0'})
self._connection.set_conn_kwargs(kwargs)

def schedule_cache_rebuild(self) -> None:
with self._condition:
self._schedule_load_pg_dist_node = True
Expand Down Expand Up @@ -359,8 +355,8 @@ def bootstrap(self) -> None:
if not isinstance(self._config, dict): # self.is_enabled()
return

conn_kwargs = self._postgresql.config.local_connect_kwargs
conn_kwargs['options'] = '-c synchronous_commit=local -c statement_timeout=0'
conn_kwargs = {**self._postgresql.connection_pool.conn_kwargs,
'options': '-c synchronous_commit=local -c statement_timeout=0'}
if self._config['database'] != self._postgresql.database:
conn = connect(**conn_kwargs)
try:
Expand Down
59 changes: 40 additions & 19 deletions patroni/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,24 +942,32 @@ def _get_tcp_local_address(self) -> str:
return 'localhost' # connection via localhost is preferred
return listen_addresses[0].strip() # can't use localhost, take first address from listen_addresses

@property
def local_connect_kwargs(self) -> Dict[str, Any]:
ret = self._local_address.copy()
# add all of the other connection settings that are available
ret.update(self._superuser)
# if the "username" parameter is present, it actually needs to be "user"
# for connecting to PostgreSQL
if 'username' in self._superuser:
ret['user'] = self._superuser['username']
del ret['username']
# ensure certain Patroni configurations are available
ret.update({'dbname': self._postgresql.database,
'fallback_application_name': 'Patroni',
'connect_timeout': 3,
'options': '-c statement_timeout=2000'})
return ret

def resolve_connection_addresses(self) -> None:
"""Calculates and sets local and remote connection urls and options.
This method sets:
* :attr:`Postgresql.connection_string <patroni.postgresql.Postgresql.connection_string>` attribute, which
is later written to the member key in DCS as ``conn_url``.
* :attr:`ConfigHandler.local_replication_address` attribute, which is used for replication connections to
local postgres.
* :attr:`ConnectionPool.conn_kwargs <patroni.postgresql.connection.ConnectionPool.conn_kwargs>` attribute,
which is used for superuser connections to local postgres.
.. note::
If there is a valid directory in ``postgresql.parameters.unix_socket_directories`` in the Patroni
configuration and ``postgresql.use_unix_socket`` and/or ``postgresql.use_unix_socket_repl``
are set to ``True``, we respectively use unix sockets for superuser and replication connections
to local postgres.
If there is a requirement to use unix sockets, but nothing is set in the
``postgresql.parameters.unix_socket_directories``, we omit a ``host`` in connection parameters relying
on the ability of ``libpq`` to connect via some default unix socket directory.
If unix sockets are not requested we "switch" to TCP, prefering to use ``localhost`` if it is possible
to deduce that Postgres is listening on a local interface address.
Otherwise we just used the first address specified in the ``listen_addresses`` GUC.
"""
port = self._server_parameters['port']
tcp_local_address = self._get_tcp_local_address()
netloc = self._config.get('connect_address') or tcp_local_address + ':' + port
Expand All @@ -972,12 +980,25 @@ def resolve_connection_addresses(self) -> None:

tcp_local_address = {'host': tcp_local_address, 'port': port}

self._local_address = unix_local_address if self._config.get('use_unix_socket') else tcp_local_address
self.local_replication_address = unix_local_address\
if self._config.get('use_unix_socket_repl') else tcp_local_address

self._postgresql.connection_string = uri('postgres', netloc, self._postgresql.database)
self._postgresql.set_connection_kwargs(self.local_connect_kwargs)

local_address = unix_local_address if self._config.get('use_unix_socket') else tcp_local_address
local_conn_kwargs = {
**local_address,
**self._superuser,
'dbname': self._postgresql.database,
'fallback_application_name': 'Patroni',
'connect_timeout': 3,
'options': '-c statement_timeout=2000'
}
# if the "username" parameter is present, it actually needs to be "user" for connecting to PostgreSQL
if 'username' in local_conn_kwargs:
local_conn_kwargs['user'] = local_conn_kwargs.pop('username')
# "notify" connection_pool about the "new" local connection address
self._postgresql.connection_pool.conn_kwargs = local_conn_kwargs

def _get_pg_settings(
self, names: Collection[str]
Expand Down
101 changes: 84 additions & 17 deletions patroni/postgresql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from contextlib import contextmanager
from threading import Lock
from typing import Any, Dict, Iterator, List, Union, Tuple, TYPE_CHECKING
from typing import Any, Dict, Iterator, List, Optional, Union, Tuple, TYPE_CHECKING
if TYPE_CHECKING: # pragma: no cover
from psycopg import Connection as Connection3, Cursor
from psycopg import Connection, Cursor
from psycopg2 import connection, cursor

from .. import psycopg
Expand All @@ -13,27 +13,34 @@
logger = logging.getLogger(__name__)


class Connection:
"""Helper class to manage connections from Patroni to PostgreSQL.
class NamedConnection:
"""Helper class to manage ``psycopg`` connections from Patroni to PostgreSQL.
:ivar server_version: PostgreSQL version in integer format where we are connected to.
"""

server_version: int

def __init__(self) -> None:
"""Create an instance of :class:`Connection` class."""
def __init__(self, pool: 'ConnectionPool', name: str, kwargs_override: Optional[Dict[str, Any]]) -> None:
"""Create an instance of :class:`NamedConnection` class.
:param pool: reference to a :class:`ConnectionPool` object.
:param name: name of the connection.
:param kwargs_override: :class:`dict` object with connection parameters that should be
different from default values provided by connection *pool*.
"""
self._pool = pool
self._name = name
self._kwargs_override = kwargs_override or {}
self._lock = Lock() # used to make sure that only one connection to postgres is established
self._connection = None

def set_conn_kwargs(self, conn_kwargs: Dict[str, Any]) -> None:
"""Set connection parameters, like user, password, host, port and so on.
:param conn_kwargs: connection parameters as a dictionary.
"""
self._conn_kwargs = conn_kwargs
@property
def _conn_kwargs(self) -> Dict[str, Any]:
"""Connection parameters for this :class:`NamedConnection`."""
return {**self._pool.conn_kwargs, **self._kwargs_override, 'application_name': f'Patroni {self._name}'}

def get(self) -> Union['connection', 'Connection3[Any]']:
def get(self) -> Union['connection', 'Connection[Any]']:
"""Get ``psycopg``/``psycopg2`` connection object.
.. note::
Expand All @@ -43,7 +50,7 @@ def get(self) -> Union['connection', 'Connection3[Any]']:
"""
with self._lock:
if not self._connection or self._connection.closed != 0:
logger.info("establishing a new patroni connection to postgres")
logger.info("establishing a new patroni %s connection to postgres", self._name)
self._connection = psycopg.connect(**self._conn_kwargs)
self.server_version = getattr(self._connection, 'server_version', 0)
return self._connection
Expand Down Expand Up @@ -76,12 +83,72 @@ def query(self, sql: str, *params: Any) -> List[Tuple[Any, ...]]:
raise exc
raise PostgresConnectionException('connection problems') from exc

def close(self) -> None:
"""Close the psycopg connection to postgres."""
def close(self, silent: bool = False) -> bool:
"""Close the psycopg connection to postgres.
:param silent: whether the method should not write logs.
:returns: ``True`` if ``psycopg`` connection was closed, ``False`` otherwise.``
"""
ret = False
if self._connection and self._connection.closed == 0:
self._connection.close()
logger.info("closed patroni connection to postgres")
if not silent:
logger.info("closed patroni %s connection to postgres", self._name)
ret = True
self._connection = None
return ret


class ConnectionPool:
"""Helper class to manage named connections from Patroni to PostgreSQL.
The instance keeps named :class:`NamedConnection` objects and parameters that must be used for new connections.
"""

def __init__(self) -> None:
"""Create an instance of :class:`ConnectionPool` class."""
self._lock = Lock()
self._connections: Dict[str, NamedConnection] = {}
self._conn_kwargs: Dict[str, Any] = {}

@property
def conn_kwargs(self) -> Dict[str, Any]:
"""Connection parameters that must be used for new ``psycopg`` connections."""
with self._lock:
return self._conn_kwargs.copy()

@conn_kwargs.setter
def conn_kwargs(self, value: Dict[str, Any]) -> None:
"""Set new connection parameters.
:param value: :class:`dict` object with connection parameters.
"""
with self._lock:
self._conn_kwargs = value

def get(self, name: str, kwargs_override: Optional[Dict[str, Any]] = None) -> NamedConnection:
"""Get a new named :class:`NamedConnection` object from the pool.
.. note::
Creates a new :class:`NamedConnection` object if it doesn't yet exist in the pool.
:param name: name of the connection.
:param kwargs_override: :class:`dict` object with connection parameters that should be
different from default values provided by :attr:`conn_kwargs`.
:returns: :class:`NamedConnection` object.
"""
with self._lock:
if name not in self._connections:
self._connections[name] = NamedConnection(self, name, kwargs_override)
return self._connections[name]

def close(self) -> None:
"""Close all named connections from Patroni to PostgreSQL registered in the pool."""
with self._lock:
if any(conn.close(True) for conn in self._connections.values()):
logger.info("closed patroni connections to postgres")


@contextmanager
Expand Down
3 changes: 1 addition & 2 deletions patroni/postgresql/slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ def get_local_connection_cursor(self, **kwargs: Any) -> Iterator[Union['cursor',
:yields: connection cursor object, note implementation varies depending on version of :mod:`psycopg`.
"""
conn_kwargs = self._postgresql.config.local_connect_kwargs
conn_kwargs.update(kwargs)
conn_kwargs = {**self._postgresql.connection_pool.conn_kwargs, **kwargs}
with get_connection_cursor(**conn_kwargs) as cur:
yield cur

Expand Down
4 changes: 2 additions & 2 deletions tests/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,15 @@ def test_call_post_bootstrap(self, mock_cancellable_subprocess_call):
self.assertFalse(self.b.call_post_bootstrap({'post_init': '/bin/false'}))

mock_cancellable_subprocess_call.return_value = 0
self.p.config.superuser.pop('username')
self.p.connection_pool._conn_kwargs.pop('user')
self.assertTrue(self.b.call_post_bootstrap({'post_init': '/bin/false'}))
mock_cancellable_subprocess_call.assert_called()
args, kwargs = mock_cancellable_subprocess_call.call_args
self.assertTrue('PGPASSFILE' in kwargs['env'])
self.assertEqual(args[0], ['/bin/false', 'dbname=postgres host=127.0.0.2 port=5432'])

mock_cancellable_subprocess_call.reset_mock()
self.p.config._local_address.pop('host')
self.p.connection_pool._conn_kwargs.pop('host')
self.assertTrue(self.b.call_post_bootstrap({'post_init': '/bin/false'}))
mock_cancellable_subprocess_call.assert_called()
self.assertEqual(mock_cancellable_subprocess_call.call_args[0][0], ['/bin/false', 'dbname=postgres port=5432'])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_citus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestCitus(BaseTestPostgresql):
def setUp(self):
super(TestCitus, self).setUp()
self.c = self.p.citus_handler
self.c.set_conn_kwargs({'host': 'localhost', 'dbname': 'postgres'})
self.p.connection_pool.conn_kwargs = {'host': 'localhost', 'dbname': 'postgres'}
self.cluster = get_cluster_initialized_with_leader()
self.cluster.workers[1] = self.cluster

Expand Down
5 changes: 4 additions & 1 deletion tests/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,10 @@ def test_resolve_connection_addresses(self):
self.assertEqual(self.p.config.local_replication_address, {'host': '/tmp', 'port': '5432'})
self.p.config._server_parameters.pop('unix_socket_directories')
self.p.config.resolve_connection_addresses()
self.assertEqual(self.p.config._local_address, {'port': '5432'})
self.assertEqual(self.p.connection_pool.conn_kwargs, {'connect_timeout': 3, 'dbname': 'postgres',
'fallback_application_name': 'Patroni',
'options': '-c statement_timeout=2000',
'password': 'test', 'port': '5432', 'user': 'foo'})

@patch.object(Postgresql, '_version_file_exists', Mock(return_value=True))
def test_get_major_version(self):
Expand Down

0 comments on commit 89d794f

Please sign in to comment.