Skip to content

Commit

Permalink
Fixes Engine.release method to release connection in any way
Browse files Browse the repository at this point in the history
  • Loading branch information
Pliner committed Dec 5, 2020
1 parent 7f8a846 commit 65ab4da
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 9 deletions.
5 changes: 0 additions & 5 deletions aiopg/sa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from ..connection import TIMEOUT
from ..utils import _PoolAcquireContextManager, _PoolContextManager
from .connection import SAConnection
from .exc import InvalidRequestError

try:
from sqlalchemy.dialects.postgresql.psycopg2 import (
Expand Down Expand Up @@ -169,10 +168,6 @@ async def _acquire(self):
return conn

def release(self, conn):
"""Revert back connection to pool."""
if conn.in_transaction:
raise InvalidRequestError("Cannot release a connection with "
"not finished transaction")
raw = conn.connection
fut = self._pool.release(raw)
return fut
Expand Down
73 changes: 73 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,76 @@ def warning():
@pytest.fixture
def log():
yield _AssertLogsContext


@pytest.fixture
def tcp_proxy(loop):
proxy = None

async def go(src_port, dst_port):
nonlocal proxy
proxy = TcpProxy(
dst_port=dst_port,
src_port=src_port,
)
await proxy.start()
return proxy
yield go
if proxy is not None:
loop.run_until_complete(proxy.disconnect())


class TcpProxy:
"""
TCP proxy. Allows simulating connection breaks in tests.
"""
MAX_BYTES = 1024

def __init__(self, *, src_port, dst_port):
self.src_host = '127.0.0.1'
self.src_port = src_port
self.dst_host = '127.0.0.1'
self.dst_port = dst_port
self.connections = set()

async def start(self):
return await asyncio.start_server(
self.handle_client,
host=self.src_host,
port=self.src_port,
)

async def disconnect(self):
while self.connections:
writer = self.connections.pop()
writer.close()
await writer.wait_closed()

@staticmethod
async def _pipe(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
while not reader.at_eof():
bytes_read = await reader.read(TcpProxy.MAX_BYTES)
writer.write(bytes_read)
finally:
writer.close()

async def handle_client(
self,
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
):
server_reader, server_writer = await asyncio.open_connection(
host=self.dst_host,
port=self.dst_port
)

self.connections.add(server_writer)
self.connections.add(client_writer)

await asyncio.wait([
self._pipe(server_reader, client_writer),
self._pipe(client_reader, server_writer),
])
2 changes: 1 addition & 1 deletion tests/test_async_await.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def test_pool_context_manager_timeout(pg_params, loop):
async with aiopg.create_pool(**pg_params, minsize=1,
maxsize=1) as pool:
cursor_ctx = await pool.cursor()
with pytest.warns(ResourceWarning):
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
with cursor_ctx as cursor:
hung_task = cursor.execute('SELECT pg_sleep(10000);')
# start task
Expand Down
28 changes: 25 additions & 3 deletions tests/test_sa_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

import psycopg2
import pytest
from psycopg2.extensions import parse_dsn
from sqlalchemy import Column, Integer, MetaData, String, Table
Expand Down Expand Up @@ -80,10 +81,10 @@ def test_not_context_manager(engine):
async def test_release_transacted(engine):
conn = await engine.acquire()
tr = await conn.begin()
with pytest.raises(sa.InvalidRequestError):
engine.release(conn)
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
await engine.release(conn)
del tr
await conn.close()
assert conn.closed


def test_timeout(engine):
Expand Down Expand Up @@ -147,3 +148,24 @@ async def test_terminate_with_acquired_connections(make_engine):
await engine.wait_closed()

assert conn.closed


async def test_release_disconnected_connection(
tcp_proxy, unused_port, pg_params, make_engine
):
server_port = pg_params["port"]
proxy_port = unused_port()

tcp_proxy = await tcp_proxy(proxy_port, server_port)
engine = await make_engine(port=proxy_port)

with pytest.raises(
psycopg2.InterfaceError, match='connection already closed'
):
with pytest.warns(ResourceWarning, match='Invalid transaction status'):
async with engine.acquire() as conn, conn.begin():
await conn.execute('SELECT 1;')
await tcp_proxy.disconnect()
await conn.execute('SELECT 1;')

assert engine.size == 0

0 comments on commit 65ab4da

Please sign in to comment.