Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for "async with cursor_context()" #265

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f83c890
add support for "async with cursor_context()"
thehesiod Jan 26, 2017
c850e2b
fix pep warnings
thehesiod Jan 26, 2017
0ffd312
pep
thehesiod Jan 26, 2017
157c580
merge old style with new style
thehesiod Feb 17, 2017
e0d69bd
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Feb 17, 2017
0660956
add new testcase and remove unneeded statements
thehesiod Feb 17, 2017
8af3e6d
pep
thehesiod Feb 17, 2017
fd42c6c
use init trick from asyncpg
thehesiod Feb 17, 2017
4949621
bugfix
thehesiod Feb 17, 2017
bebe1b3
well that took awhile to figure out
thehesiod Feb 17, 2017
857abfd
pep
thehesiod Feb 17, 2017
e8a78b2
fix for py3.4
thehesiod Feb 17, 2017
bfbd60e
revise unittests
thehesiod Feb 17, 2017
623c368
arr
thehesiod Feb 17, 2017
4770379
refactor to use _PoolConnectionContextManager
thehesiod Feb 18, 2017
452e10b
remove unused code
thehesiod Feb 18, 2017
5da64ed
pep
thehesiod Feb 18, 2017
3897640
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Jul 4, 2017
375c6bf
simplify + update deps
thehesiod Jul 4, 2017
dabca4e
remove unused var
thehesiod Jul 4, 2017
cc05d18
attempt switching to the latest docker
thehesiod Jul 4, 2017
c8b506d
another attempt
thehesiod Jul 4, 2017
1e964ff
revert docker client upgrade
thehesiod Jul 4, 2017
1ca9b5f
revert docker client upgrade changes
thehesiod Jul 4, 2017
2321a82
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Jul 4, 2017
0a57593
add back missing change
thehesiod Jul 4, 2017
a5cec12
revert as this was later refactored
thehesiod Jul 4, 2017
a30fa02
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Sep 12, 2017
bdc39c3
add changes
thehesiod Sep 12, 2017
945b8fa
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Jan 4, 2018
dd94ae4
update from master
thehesiod Mar 15, 2018
351bcd8
fix merge
thehesiod Mar 15, 2018
60e9573
simplification
thehesiod Mar 15, 2018
256f7b8
add missing import
thehesiod Mar 17, 2018
02ca4e1
pep fix
thehesiod Mar 17, 2018
cbd93c5
integration fix
thehesiod Mar 17, 2018
4e52944
more merge fixes
thehesiod Mar 17, 2018
f613762
PEP fix
thehesiod Mar 17, 2018
3e4d0ba
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Sep 19, 2018
55b4b79
Merge remote-tracking branch 'aio-libs/master' into async_cursor
thehesiod Oct 16, 2018
1133c6a
fix merge bug
thehesiod Oct 16, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions aiopg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,12 @@ def release(self, conn):
fut = ensure_future(self._wakeup(), loop=self._loop)
return fut

@asyncio.coroutine
def cursor(self, name=None, cursor_factory=None,
scrollable=None, withhold=False, *, timeout=None):
"""XXX"""
conn = yield from self.acquire()
cur = yield from conn.cursor(name=name, cursor_factory=cursor_factory,
scrollable=scrollable, withhold=withhold,
timeout=timeout)
return _PoolCursorContextManager(self, conn, cur)
def cursor(self, name=None, cursor_factory=None, scrollable=None,
withhold=False, *, timeout=None):
cursor_kwargs = dict(name=name, cursor_factory=cursor_factory,
scrollable=scrollable, withhold=withhold,
timeout=timeout)
return _PoolCursorContextManager(self, cursor_kwargs)

def __enter__(self):
raise RuntimeError(
Expand Down
80 changes: 70 additions & 10 deletions aiopg/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import sys


PY_35 = sys.version_info >= (3, 5)
PY_352 = sys.version_info >= (3, 5, 2)

Expand Down Expand Up @@ -125,7 +124,7 @@ class _PoolAcquireContextManager(_ContextManager):
__slots__ = ('_coro', '_conn', '_pool')

def __init__(self, coro, pool):
self._coro = coro
super().__init__(coro)
self._conn = None
self._pool = pool

Expand Down Expand Up @@ -163,6 +162,10 @@ def __init__(self, pool, conn):
self._pool = pool
self._conn = conn

@property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def conn(self):
return self._conn

def __enter__(self):
assert self._conn
return self._conn
Expand Down Expand Up @@ -193,37 +196,94 @@ def __aexit__(self, exc_type, exc_val, exc_tb):
class _PoolCursorContextManager:
"""Context manager.

This enables the following idiom for acquiring and releasing a
This enables the following idioms for acquiring and releasing a
cursor around a block:

with (yield from pool.cursor()) as cur:
yield from cur.execute("SELECT 1")

async with pool.cursor() as cur:
yield from cur.execute("SELECT 1")

while failing loudly when accidentally using:

with pool:
<block>
"""

__slots__ = ('_pool', '_conn', '_cur')
__slots__ = ('_pool', '_cursor_kwargs', '_cur')

def __init__(self, pool, conn, cur):
def __init__(self, pool, cursor_kwargs=None):
self._pool = pool
self._conn = conn
self._cur = cur
self._cursor_kwargs = cursor_kwargs
self._cur = None

def __enter__(self):
return self._cur

def __exit__(self, *args):
try:
self._cur.close()
self._pool.release(self._conn)
self._pool.__exit__(*args)
finally:
self._pool = None
self._conn = None
self._cur = None

@asyncio.coroutine
def _init_cursor(self, with_aenter):
assert not self._cur

if with_aenter:
conn = None
else:
conn = yield from self._pool.acquire()

# self._pool now morphs into a _PoolConnectionContextManager
self._pool = _PoolConnectionContextManager(self._pool, conn)

if with_aenter:
# this will create the connection
yield from self._pool.__aenter__()
self._cur = yield from self._pool.conn.cursor(
**self._cursor_kwargs)

return self._cur
else:
self._cur = yield from self._pool.conn.cursor(
**self._cursor_kwargs)
return self

@asyncio.coroutine
def __iter__(self):
# This will get hit if you use "yield from pool.cursor()"
result = yield from self._init_cursor(False)
return result

def __await__(self):
# This will get hit directly if you "await pool.cursor()"
# this is using a trick similar to the one here:
# https://magicstack.github.io/asyncpg/current/_modules/asyncpg/pool.html
# however since `self._init()` is an "asyncio.coroutine" we can't use
# just return self._init().__await__() as that returns a generator
# without an "__await__" attribute and we can't return a coroutine from
# here
value = yield from self._init_cursor(False)
return value

if PY_35:
@asyncio.coroutine
def __aenter__(self):
value = yield from self._init_cursor(True)
return value

@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
try:
yield from self._cur.__aexit__(exc_type, exc_val, exc_tb)
self._cur = None
finally:
yield from self._pool.__aexit__(exc_type, exc_val, exc_tb)
self._pool = None


if not PY_35:
try:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ pytest-sugar==0.8.0
pytest-timeout==1.2.0
sphinxcontrib-asyncio==0.2.0
sqlalchemy==1.1.11
psycopg2==2.6.2
psycopg2==2.7.1
38 changes: 20 additions & 18 deletions tests/pep492/test_async_await.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
import pytest
import aiopg
import aiopg.sa
from aiopg.sa import SAConnection


@asyncio.coroutine
async def test_cursor_await(make_connection):
conn = await make_connection()

Expand All @@ -16,7 +14,6 @@ async def test_cursor_await(make_connection):
cursor.close()


@asyncio.coroutine
async def test_connect_context_manager(loop, pg_params):
async with aiopg.connect(loop=loop, **pg_params) as conn:
cursor = await conn.cursor()
Expand All @@ -27,7 +24,26 @@ async def test_connect_context_manager(loop, pg_params):
assert conn.closed


@asyncio.coroutine
async def test_pool_cursor_context_manager(loop, pg_params):
async with aiopg.create_pool(loop=loop, **pg_params) as pool:
async with pool.cursor() as cursor:
await cursor.execute('SELECT 42')
resp = await cursor.fetchone()
assert resp == (42, )
assert cursor.closed
assert pool.closed


async def test_pool_cursor_await_context_manager(loop, pg_params):
async with aiopg.create_pool(loop=loop, **pg_params) as pool:
with (await pool.cursor()) as cursor:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the yield from form be tested also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is tested in test_close_running_cursor

await cursor.execute('SELECT 42')
resp = await cursor.fetchone()
assert resp == (42, )
assert cursor.closed
assert pool.closed


async def test_connection_context_manager(make_connection):
conn = await make_connection()
assert not conn.closed
Expand All @@ -40,7 +56,6 @@ async def test_connection_context_manager(make_connection):
assert conn.closed


@asyncio.coroutine
async def test_cursor_create_with_context_manager(make_connection):
conn = await make_connection()

Expand All @@ -53,7 +68,6 @@ async def test_cursor_create_with_context_manager(make_connection):
assert cursor.closed


@asyncio.coroutine
async def test_cursor_with_context_manager(make_connection):
conn = await make_connection()
cursor = await conn.cursor()
Expand All @@ -66,7 +80,6 @@ async def test_cursor_with_context_manager(make_connection):
assert cursor.closed


@asyncio.coroutine
async def test_cursor_lightweight(make_connection):
conn = await make_connection()
cursor = await conn.cursor()
Expand All @@ -78,7 +91,6 @@ async def test_cursor_lightweight(make_connection):
assert cursor.closed


@asyncio.coroutine
async def test_pool_context_manager(pg_params, loop):
pool = await aiopg.create_pool(loop=loop, **pg_params)

Expand All @@ -93,7 +105,6 @@ async def test_pool_context_manager(pg_params, loop):
assert pool.closed


@asyncio.coroutine
async def test_create_pool_context_manager(pg_params, loop):
async with aiopg.create_pool(loop=loop, **pg_params) as pool:
async with pool.acquire() as conn:
Expand All @@ -107,7 +118,6 @@ async def test_create_pool_context_manager(pg_params, loop):
assert pool.closed


@asyncio.coroutine
async def test_cursor_aiter(make_connection):
result = []
conn = await make_connection()
Expand All @@ -122,7 +132,6 @@ async def test_cursor_aiter(make_connection):
assert conn.closed


@asyncio.coroutine
async def test_engine_context_manager(pg_params, loop):
engine = await aiopg.sa.create_engine(loop=loop, **pg_params)
async with engine:
Expand All @@ -132,15 +141,13 @@ async def test_engine_context_manager(pg_params, loop):
assert engine.closed


@asyncio.coroutine
async def test_create_engine_context_manager(pg_params, loop):
async with aiopg.sa.create_engine(loop=loop, **pg_params) as engine:
async with engine.acquire() as conn:
assert isinstance(conn, SAConnection)
assert engine.closed


@asyncio.coroutine
async def test_result_proxy_aiter(pg_params, loop):
sql = 'SELECT generate_series(1, 5);'
result = []
Expand All @@ -154,7 +161,6 @@ async def test_result_proxy_aiter(pg_params, loop):
assert conn.closed


@asyncio.coroutine
async def test_transaction_context_manager(pg_params, loop):
sql = 'SELECT generate_series(1, 5);'
result = []
Expand All @@ -181,7 +187,6 @@ async def test_transaction_context_manager(pg_params, loop):
assert conn.closed


@asyncio.coroutine
async def test_transaction_context_manager_error(pg_params, loop):
async with aiopg.sa.create_engine(loop=loop, **pg_params) as engine:
async with engine.acquire() as conn:
Expand All @@ -194,7 +199,6 @@ async def test_transaction_context_manager_error(pg_params, loop):
assert conn.closed


@asyncio.coroutine
async def test_transaction_context_manager_commit_once(pg_params, loop):
async with aiopg.sa.create_engine(loop=loop, **pg_params) as engine:
async with engine.acquire() as conn:
Expand All @@ -214,7 +218,6 @@ async def test_transaction_context_manager_commit_once(pg_params, loop):
assert conn.closed


@asyncio.coroutine
async def test_transaction_context_manager_nested_commit(pg_params, loop):
sql = 'SELECT generate_series(1, 5);'
result = []
Expand Down Expand Up @@ -244,7 +247,6 @@ async def test_transaction_context_manager_nested_commit(pg_params, loop):
assert conn.closed


@asyncio.coroutine
async def test_sa_connection_execute(pg_params, loop):
sql = 'SELECT generate_series(1, 5);'
result = []
Expand Down
27 changes: 17 additions & 10 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,17 @@ def test_set_session(connect):
@asyncio.coroutine
def test_dsn(connect, pg_params):
conn = yield from connect()

pg_params = pg_params.copy()
pg_params['password'] = 'x' * len(pg_params['password'])
assert 'dbname' in conn.dsn
assert 'user' in conn.dsn
assert 'password' in conn.dsn
assert 'host' in conn.dsn
assert 'port' in conn.dsn
pg_params['dbname'] = pg_params['database']
del pg_params['database']

pg_params['port'] = str(pg_params['port'])

# dictionary keys are unsorted so we need this hack
dsn_params = dict([tpl.split('=') for tpl in conn.dsn.split(' ')])
assert dsn_params == pg_params


@asyncio.coroutine
Expand Down Expand Up @@ -209,11 +214,12 @@ def test_autocommit(connect):
def test_isolation_level(connect):
conn = yield from connect()

assert 0 == conn.isolation_level
assert psycopg2.extensions.ISOLATION_LEVEL_DEFAULT == conn.isolation_level
with pytest.raises(psycopg2.ProgrammingError):
yield from conn.set_isolation_level(1)
yield from conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)

assert 0 == conn.isolation_level
assert psycopg2.extensions.ISOLATION_LEVEL_DEFAULT == conn.isolation_level


@asyncio.coroutine
Expand Down Expand Up @@ -496,7 +502,7 @@ def test_connect_to_unsupported_port(unused_port, loop, pg_params):
pg_params['port'] = port

with pytest.raises(psycopg2.OperationalError):
yield from aiopg.connect(loop=loop, **pg_params)
yield from aiopg.connect(loop=loop, timeout=3, **pg_params)


@asyncio.coroutine
Expand Down Expand Up @@ -621,7 +627,7 @@ def test_close_cursor_on_timeout_error(connect):
@asyncio.coroutine
def test_issue_111_crash_on_connect_error(loop):
import aiopg.connection
with pytest.raises(psycopg2.OperationalError):
with pytest.raises(psycopg2.ProgrammingError):
yield from aiopg.connection.connect('baddsn:1', loop=loop)


Expand Down Expand Up @@ -683,6 +689,7 @@ def test_connection_on_server_restart(connect, pg_server, docker):
yield from cur.execute('SELECT 1')
ret = yield from cur.fetchone()
assert (1,) == ret

docker.restart(container=pg_server['Id'])

with pytest.raises(psycopg2.OperationalError):
Expand Down
Loading