Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Engine.release method to release connection in any way #756

Merged
merged 4 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions aiopg/sa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from ..connection import TIMEOUT
from ..utils import _PoolAcquireContextManager, _PoolContextManager
from .connection import SAConnection
from .exc import InvalidRequestError

try:
from sqlalchemy.dialects.postgresql.psycopg2 import (
Expand Down Expand Up @@ -169,10 +168,6 @@ async def _acquire(self):
return conn

def release(self, conn):
"""Revert back connection to pool."""
if conn.in_transaction:
raise InvalidRequestError("Cannot release a connection with "
"not finished transaction")
raw = conn.connection
fut = self._pool.release(raw)
return fut
Expand Down
74 changes: 74 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,77 @@ def warning():
@pytest.fixture
def log():
yield _AssertLogsContext


@pytest.fixture
def tcp_proxy(loop):
proxy = None

async def go(src_port, dst_port):
nonlocal proxy
proxy = TcpProxy(
dst_port=dst_port,
src_port=src_port,
)
await proxy.start()
return proxy
yield go
if proxy is not None:
loop.run_until_complete(proxy.disconnect())


class TcpProxy:
"""
TCP proxy. Allows simulating connection breaks in tests.
"""
MAX_BYTES = 1024

def __init__(self, *, src_port, dst_port):
self.src_host = '127.0.0.1'
self.src_port = src_port
self.dst_host = '127.0.0.1'
self.dst_port = dst_port
self.connections = set()

async def start(self):
return await asyncio.start_server(
self.handle_client,
host=self.src_host,
port=self.src_port,
)

async def disconnect(self):
while self.connections:
writer = self.connections.pop()
writer.close()
if hasattr(writer, "wait_closed"):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In python 3.6 there is no such a method

await writer.wait_closed()

@staticmethod
async def _pipe(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
while not reader.at_eof():
bytes_read = await reader.read(TcpProxy.MAX_BYTES)
writer.write(bytes_read)
finally:
writer.close()

async def handle_client(
self,
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
):
server_reader, server_writer = await asyncio.open_connection(
host=self.dst_host,
port=self.dst_port
)

self.connections.add(server_writer)
self.connections.add(client_writer)

await asyncio.wait([
self._pipe(server_reader, client_writer),
self._pipe(client_reader, server_writer),
])
2 changes: 1 addition & 1 deletion tests/test_async_await.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def test_pool_context_manager_timeout(pg_params, loop):
async with aiopg.create_pool(**pg_params, minsize=1,
maxsize=1) as pool:
cursor_ctx = await pool.cursor()
with pytest.warns(ResourceWarning):
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
with cursor_ctx as cursor:
hung_task = cursor.execute('SELECT pg_sleep(10000);')
# start task
Expand Down
50 changes: 47 additions & 3 deletions tests/test_sa_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

import psycopg2
import pytest
from psycopg2.extensions import parse_dsn
from sqlalchemy import Column, Integer, MetaData, String, Table
Expand Down Expand Up @@ -80,10 +81,10 @@ def test_not_context_manager(engine):
async def test_release_transacted(engine):
conn = await engine.acquire()
tr = await conn.begin()
with pytest.raises(sa.InvalidRequestError):
engine.release(conn)
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
await engine.release(conn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to raise an exception here instead of a warning. Looks like a user's bug to release a connection in the middle of transaction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this comment to the issue as well.

del tr
await conn.close()
assert conn.closed


def test_timeout(engine):
Expand Down Expand Up @@ -147,3 +148,46 @@ async def test_terminate_with_acquired_connections(make_engine):
await engine.wait_closed()

assert conn.closed


async def test_release_after_connection_disconnected_before_select(
tcp_proxy, unused_port, pg_params, make_engine
):
server_port = pg_params["port"]
proxy_port = unused_port()

tcp_proxy = await tcp_proxy(proxy_port, server_port)
engine = await make_engine(port=proxy_port)

with pytest.raises(
(psycopg2.InterfaceError, psycopg2.OperationalError)
):
with pytest.warns(ResourceWarning, match='Invalid transaction status'):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to spam users with both warning and exception? Maybe it is better just to raise an exception? Connection reset is not a user's fault, so I think that this warning doesn't help them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing this.

I'm a newbie in the aiopg codebase and don't know the reasons why these warnings are reported in such a way.

@asvetlov Could you share your opinion please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought was preventing a programming error and signal this situation early.
I agree that the connection releasing can be done for network reasons, there no need for extra checks.
I see two options:

  1. Drop exception in 'release' as PR does, https://github.com/aio-libs/aiopg/blob/master/aiopg/pool.py#L242-L248 warning also can be dropped. It can lead to reports about unfinished transactions by next acquired connection.
    Satisfactory but confusing.
  2. Raise an exception only if the underlying socket is not closed but swallow the transaction status for disconnected peers. It prevents false positives from network errors but can report a programming error fast. Connection for the unclosed transaction should be dropped immediately, the connection is in an undefined incorrect state and cannot be recovered. Perhaps a warning generated in the pool can be replaced with a strict error also.

The 1) is easier, 2) is harder to implement but maybe more correct.
I don't want to decide what to do; I have no time to contribute anyway. Please choose what you want.

Copy link
Member Author

@Pliner Pliner Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, got you, thanks.

I've opened a new issue to fix it a bit later, because it looks like merging of the PR with incorrect warnings in quite rare case is much better than suffering from broken pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

async with engine.acquire() as conn, conn.begin():
await conn.execute('SELECT 1;')
await tcp_proxy.disconnect()
await conn.execute('SELECT 1;')

assert engine.size == 0


async def test_release_after_connection_disconnected_before_begin(
tcp_proxy, unused_port, pg_params, make_engine
):
server_port = pg_params["port"]
proxy_port = unused_port()

tcp_proxy = await tcp_proxy(proxy_port, server_port)
engine = await make_engine(port=proxy_port)

with pytest.raises(
(psycopg2.InterfaceError, psycopg2.OperationalError)
):
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
async with engine.acquire() as conn:
await conn.execute('SELECT 1;')
await tcp_proxy.disconnect()
async with conn.begin():
pytest.fail("Should not be here")

assert engine.size == 0