From 9fdf7b92b4a705fcffe259af70b80a0afaf7bed7 Mon Sep 17 00:00:00 2001 From: Alexey Firsov Date: Wed, 13 Mar 2019 21:25:37 +0200 Subject: [PATCH] refactor one connect one cursore (#538) --- Makefile | 4 +- aiopg/__init__.py | 8 + aiopg/connection.py | 80 +++++--- aiopg/cursor.py | 18 +- aiopg/pool.py | 24 ++- aiopg/sa/connection.py | 12 +- tests/conftest.py | 46 +++-- tests/pep492/test_async_await.py | 38 ++-- tests/pep492/test_sa_priority_name.py | 8 +- tests/test_connection.py | 38 +--- tests/test_cursor.py | 261 ++++++++++++-------------- tests/test_pool.py | 39 ++-- tests/test_sa_connection.py | 5 +- tests/test_sa_cursor.py | 94 ++++++++++ tests/test_sa_transaction.py | 62 +++--- tests/test_transaction.py | 39 ++-- 16 files changed, 447 insertions(+), 329 deletions(-) create mode 100644 tests/test_sa_cursor.py diff --git a/Makefile b/Makefile index a4e015bd..61c1971f 100644 --- a/Makefile +++ b/Makefile @@ -18,11 +18,11 @@ vtest: flake pytest tests cov cover coverage: flake - py.test --cov=aiopg --cov-report=html --cov-report=term tests + py.test -svvv -rs --cov=aiopg --cov-report=html --cov-report=term tests @echo "open file://`pwd`/htmlcov/index.html" cov-ci: flake - py.test -v --cov --cov-report=term tests --pg_tag all + py.test -svvv -rs --cov --cov-report=term tests --pg_tag all clean: find . -name __pycache__ |xargs rm -rf diff --git a/aiopg/__init__.py b/aiopg/__init__.py index 7577c81c..248fef2e 100644 --- a/aiopg/__init__.py +++ b/aiopg/__init__.py @@ -1,5 +1,6 @@ import re import sys +import warnings from collections import namedtuple from .connection import connect, Connection, TIMEOUT as DEFAULT_TIMEOUT @@ -7,6 +8,13 @@ from .pool import create_pool, Pool from .transaction import IsolationLevel, Transaction +warnings.filterwarnings( + 'always', '.*', + category=ResourceWarning, + module=r'aiopg(\.\w+)+', + append=False +) + __all__ = ('connect', 'create_pool', 'Connection', 'Cursor', 'Pool', 'version', 'version_info', 'DEFAULT_TIMEOUT', 'IsolationLevel', 'Transaction') diff --git a/aiopg/connection.py b/aiopg/connection.py index 37030fd4..f10b76a9 100755 --- a/aiopg/connection.py +++ b/aiopg/connection.py @@ -1,25 +1,27 @@ import asyncio import contextlib import errno +import platform import select import sys import traceback import warnings import weakref -import platform import psycopg2 -from psycopg2.extensions import ( - POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR) from psycopg2 import extras +from psycopg2.extensions import ( + POLL_OK, + POLL_READ, + POLL_WRITE, + POLL_ERROR, +) from .cursor import Cursor from .utils import _ContextManager, create_future - __all__ = ('connect',) - TIMEOUT = 60.0 # Windows specific error code, not in errno for some reason, and doesnt map @@ -118,7 +120,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs): self._cancelling = False self._cancellation_waiter = None self._echo = echo - self._conn_cursor = None + self._cursor_instance = None self._notifies = asyncio.Queue(loop=loop) self._weakref = weakref.ref(self) self._loop.add_reader(self._fileno, self._ready, self._weakref) @@ -198,7 +200,7 @@ def _fatal_error(self, message): self._loop.call_exception_handler({ 'message': message, 'connection': self, - }) + }) self.close() if self._waiter and not self._waiter.done(): self._waiter.set_exception(psycopg2.OperationalError(message)) @@ -273,28 +275,22 @@ def cursor(self, name=None, cursor_factory=None, NOTE: as of [TODO] any previously created created cursor from this connection will be closed """ - self.close_cursor() - self._last_usage = self._loop.time() - coro = self._cursor(name=name, cursor_factory=cursor_factory, + core = self._cursor(name=name, cursor_factory=cursor_factory, scrollable=scrollable, withhold=withhold, timeout=timeout) - return _ContextManager(coro) + return _ContextManager(core) - def cursor_created(self, cursor): - if self._conn_cursor and not self._conn_cursor.closed: - raise Exception("You can only have one cursor per connection") - - self._conn_cursor = cursor + async def _cursor(self, name=None, cursor_factory=None, + scrollable=None, withhold=False, timeout=None): - def cursor_closed(self, cursor): - if cursor != self._conn_cursor: - raise Exception("You can only have one cursor per connection") + if not self.closed_cursor: + warnings.warn(('You can only have one cursor per connection. ' + 'The cursor for connection will be closed forcibly' + ' {!r}.').format(self), ResourceWarning) - self._conn_cursor = None + self.free_cursor() - async def _cursor(self, name=None, cursor_factory=None, - scrollable=None, withhold=False, timeout=None): if timeout is None: timeout = self._timeout @@ -302,8 +298,8 @@ async def _cursor(self, name=None, cursor_factory=None, cursor_factory=cursor_factory, scrollable=scrollable, withhold=withhold) - cursor = Cursor(self, impl, timeout, self._echo) - return cursor + self._cursor_instance = Cursor(self, impl, timeout, self._echo) + return self._cursor_instance async def _cursor_impl(self, name=None, cursor_factory=None, scrollable=None, withhold=False): @@ -325,23 +321,30 @@ def _close(self): self._writing = False self._loop.remove_writer(self._fileno) - self.close_cursor() self._conn.close() + self.free_cursor() if self._waiter is not None and not self._waiter.done(): self._waiter.set_exception( psycopg2.OperationalError("Connection closed")) + @property + def closed_cursor(self): + if not self._cursor_instance: + return True + + return self._cursor_instance.closed + + def free_cursor(self): + if not self.closed_cursor: + self._cursor_instance.close() + def close(self): self._close() ret = create_future(self._loop) ret.set_result(None) return ret - def close_cursor(self): - if self._conn_cursor: - self._conn_cursor.close() - @property def closed(self): """Connection status. @@ -514,6 +517,25 @@ def echo(self): """Return echo mode status.""" return self._echo + def __repr__(self): + msg = ( + '<' + '{module_name}::{class_name} ' + 'isexecuting={isexecuting}, ' + 'closed={closed}, ' + 'echo={echo}, ' + 'cursor={cursor}' + '>' + ) + return msg.format( + module_name=type(self).__module__, + class_name=type(self).__name__, + echo=self.echo, + isexecuting=self._isexecuting(), + closed=bool(self.closed), + cursor=repr(self._cursor_instance) + ) + def __del__(self): try: _conn = self._conn diff --git a/aiopg/cursor.py b/aiopg/cursor.py index e615fa18..72446608 100644 --- a/aiopg/cursor.py +++ b/aiopg/cursor.py @@ -16,8 +16,6 @@ def __init__(self, conn, impl, timeout, echo): self._echo = echo self._transaction = Transaction(self, IsolationLevel.repeatable_read) - conn.cursor_created(self) - @property def echo(self): """Return echo mode status.""" @@ -52,7 +50,6 @@ def close(self): """Close the cursor now.""" if not self.closed: self._impl.close() - self._conn.cursor_closed(self) @property def closed(self): @@ -393,3 +390,18 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): self.close() return + + def __repr__(self): + msg = ( + '<' + '{module_name}::{class_name} ' + 'name={name}, ' + 'closed={closed}' + '>' + ) + return msg.format( + module_name=type(self).__module__, + class_name=type(self).__name__, + name=self.name, + closed=self.closed + ) diff --git a/aiopg/pool.py b/aiopg/pool.py index cb2a564e..fc91b1c6 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -3,15 +3,17 @@ import sys import warnings - from psycopg2.extensions import TRANSACTION_STATUS_IDLE from .connection import connect, TIMEOUT -from .log import logger -from .utils import (_PoolContextManager, _PoolConnectionContextManager, - _PoolCursorContextManager, _PoolAcquireContextManager, - ensure_future, create_future) - +from .utils import ( + _PoolContextManager, + _PoolConnectionContextManager, + _PoolCursorContextManager, + _PoolAcquireContextManager, + ensure_future, + create_future, +) PY_341 = sys.version_info >= (3, 4, 1) @@ -243,15 +245,17 @@ def release(self, conn): if not conn.closed: tran_status = conn._conn.get_transaction_status() if tran_status != TRANSACTION_STATUS_IDLE: - logger.warning( - "Invalid transaction status on released connection: %d", - tran_status) + warnings.warn( + ("Invalid transaction status on " + "released connection: {}").format(tran_status), + ResourceWarning + ) conn.close() return fut if self._closing: conn.close() else: - conn.close_cursor() # there may be weak-refs to these cursors + conn.free_cursor() self._free.append(conn) fut = ensure_future(self._wakeup(), loop=self._loop) return fut diff --git a/aiopg/sa/connection.py b/aiopg/sa/connection.py index 7ae957f9..b831aa52 100644 --- a/aiopg/sa/connection.py +++ b/aiopg/sa/connection.py @@ -16,6 +16,7 @@ def __init__(self, connection, engine): self._savepoint_seq = 0 self._engine = engine self._dialect = engine.dialect + self._cursor = None def execute(self, query, *multiparams, **params): """Executes a SQL query with optional parameters. @@ -57,8 +58,15 @@ def execute(self, query, *multiparams, **params): coro = self._execute(query, *multiparams, **params) return _SAConnectionContextManager(coro) + async def _get_cursor(self): + if self._cursor and not self._cursor.closed: + return self._cursor + + self._cursor = await self._connection.cursor() + return self._cursor + async def _execute(self, query, *multiparams, **params): - cursor = await self._connection.cursor() + cursor = await self._get_cursor() dp = _distill_params(multiparams, params) if len(dp) > 1: raise exc.ArgumentError("aiopg doesn't support executemany") @@ -104,6 +112,7 @@ async def _execute(self, query, *multiparams, **params): "and execution with parameters") post_processed_params = [compiled.construct_params()] result_map = None + await cursor.execute(str(compiled), post_processed_params[0]) else: raise exc.ArgumentError("sql statement should be str or " @@ -331,6 +340,7 @@ async def close(self): self._transaction = None # don't close underlying connection, it can be reused by pool # conn.close() + self._engine.release(self) self._connection = None self._engine = None diff --git a/tests/conftest.py b/tests/conftest.py index 7791f6f7..4dee55d2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,6 @@ import contextlib import gc import logging -import psycopg2 -import pytest import re import socket import sys @@ -12,12 +10,20 @@ import uuid import warnings - +import psycopg2 +import pytest from docker import APIClient import aiopg from aiopg import sa +warnings.filterwarnings( + 'error', '.*', + category=ResourceWarning, + module=r'aiopg(\.\w+)+', + append=False +) + @pytest.fixture(scope='session') def unused_port(): @@ -25,6 +31,7 @@ def f(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('127.0.0.1', 0)) return s.getsockname()[1] + return f @@ -183,11 +190,10 @@ def pg_params(pg_server): @pytest.fixture def make_connection(loop, pg_params): - conns = [] async def go(*, no_loop=False, **kwargs): - nonlocal conn + nonlocal conns params = pg_params.copy() params.update(kwargs) useloop = None if no_loop else loop @@ -221,42 +227,52 @@ async def go(*, no_loop=False, **kwargs): if pool is not None: pool.terminate() + loop.run_until_complete(pool.wait_closed()) @pytest.fixture def make_engine(loop, pg_params): - engine = None + engine = engine_use_loop = None async def go(*, use_loop=True, **kwargs): + nonlocal engine, engine_use_loop pg_params.update(kwargs) if use_loop: - engine = await sa.create_engine(loop=loop, **pg_params) + engine_use_loop = engine_use_loop or ( + await sa.create_engine(loop=loop, **pg_params) + ) + return engine_use_loop else: - engine = await sa.create_engine(**pg_params) - return engine + engine = engine or (await sa.create_engine(**pg_params)) + return engine yield go + if engine_use_loop is not None: + engine_use_loop.close() + loop.run_until_complete(engine_use_loop.wait_closed()) + if engine is not None: engine.close() loop.run_until_complete(engine.wait_closed()) @pytest.fixture -def make_sa_connection(make_engine): - conn = None +def make_sa_connection(make_engine, loop): + conns = [] engine = None async def go(*, use_loop=True, **kwargs): - nonlocal conn, engine - engine = await make_engine(use_loop=use_loop, **kwargs) + nonlocal conns, engine + engine = engine or (await make_engine(use_loop=use_loop, **kwargs)) conn = await engine.acquire() + conns.append(conn) return conn yield go - if conn is not None: - engine.release(conn) + for conn in conns: + loop.run_until_complete(conn.close()) class _AssertWarnsContext: diff --git a/tests/pep492/test_async_await.py b/tests/pep492/test_async_await.py index ff362118..5e2a72cb 100644 --- a/tests/pep492/test_async_await.py +++ b/tests/pep492/test_async_await.py @@ -1,6 +1,8 @@ import asyncio -import pytest + import psycopg2 +import pytest + import aiopg import aiopg.sa from aiopg.sa import SAConnection @@ -12,7 +14,7 @@ async def test_cursor_await(make_connection): cursor = await conn.cursor() await cursor.execute('SELECT 42;') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) cursor.close() @@ -21,7 +23,7 @@ async def test_connect_context_manager(loop, pg_params): cursor = await conn.cursor() await cursor.execute('SELECT 42') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) cursor.close() assert conn.closed @@ -33,7 +35,7 @@ async def test_connection_context_manager(make_connection): cursor = await conn.cursor() await cursor.execute('SELECT 42;') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) cursor.close() assert conn.closed @@ -44,20 +46,12 @@ async def test_cursor_create_with_context_manager(make_connection): async with conn.cursor() as cursor: await cursor.execute('SELECT 42;') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) assert not cursor.closed assert cursor.closed -async def test_two_cursor_create_with_context_manager(make_connection): - conn = await make_connection() - - async with conn.cursor() as cursor1, conn.cursor() as cursor2: - assert cursor1.closed - assert not cursor2.closed - - async def test_pool_context_manager_timeout(pg_params, loop): async with aiopg.create_pool(loop=loop, **pg_params, minsize=1, maxsize=1) as pool: @@ -75,7 +69,7 @@ async def test_pool_context_manager_timeout(pg_params, loop): with cursor_ctx as cursor: resp = await cursor.execute('SELECT 42;') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) assert cursor.closed assert pool.closed @@ -89,7 +83,7 @@ async def test_cursor_with_context_manager(make_connection): assert not cursor.closed async with cursor: resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) assert cursor.closed @@ -112,7 +106,7 @@ async def test_pool_context_manager(pg_params, loop): async with conn.cursor() as cursor: await cursor.execute('SELECT 42;') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) pool.release(conn) assert cursor.closed assert pool.closed @@ -124,7 +118,7 @@ async def test_create_pool_context_manager(pg_params, loop): async with conn.cursor() as cursor: await cursor.execute('SELECT 42;') resp = await cursor.fetchone() - assert resp == (42, ) + assert resp == (42,) assert cursor.closed assert conn.closed @@ -140,7 +134,7 @@ async def test_cursor_aiter(make_connection): await cursor.execute('SELECT generate_series(1, 5);') async for v in cursor: result.append(v) - assert result == [(1,), (2, ), (3, ), (4, ), (5, )] + assert result == [(1,), (2,), (3,), (4,), (5,)] cursor.close() assert conn.closed @@ -169,7 +163,7 @@ async def test_result_proxy_aiter(pg_params, loop): async with conn.execute(sql) as cursor: async for v in cursor: result.append(v) - assert result == [(1,), (2, ), (3, ), (4, ), (5, )] + assert result == [(1,), (2,), (3,), (4,), (5,)] assert cursor.closed assert conn.closed @@ -184,7 +178,7 @@ async def test_transaction_context_manager(pg_params, loop): async for v in cursor: result.append(v) assert tr.is_active - assert result == [(1,), (2, ), (3, ), (4, ), (5, )] + assert result == [(1,), (2,), (3,), (4,), (5,)] assert cursor.closed assert not tr.is_active @@ -243,7 +237,7 @@ async def test_transaction_context_manager_nested_commit(pg_params, loop): result.append(v) assert tr1.is_active assert tr2.is_active - assert result == [(1,), (2, ), (3, ), (4, ), (5, )] + assert result == [(1,), (2,), (3,), (4,), (5,)] assert cursor.closed assert not tr2.is_active @@ -267,5 +261,5 @@ async def test_sa_connection_execute(pg_params, loop): async with engine.acquire() as conn: async for value in conn.execute(sql): result.append(value) - assert result == [(1,), (2, ), (3, ), (4, ), (5, )] + assert result == [(1,), (2,), (3,), (4,), (5,)] assert conn.closed diff --git a/tests/pep492/test_sa_priority_name.py b/tests/pep492/test_sa_priority_name.py index 8d06e8bf..3a7c9daf 100644 --- a/tests/pep492/test_sa_priority_name.py +++ b/tests/pep492/test_sa_priority_name.py @@ -28,7 +28,7 @@ async def start(): async def test_priority_name(connect): await connect.execute(tbl.insert().values(id='test_id', name='test_name')) - row = await (await connect.execute(tbl.select())).fetchone() + row = await (await connect.execute(tbl.select())).first() assert row.name == 'test_name' assert row.id == 'test_id' @@ -39,7 +39,7 @@ async def test_priority_name_label(connect): [tbl.c.name.label('test_label_name'), tbl.c.id] ) query = query.select_from(tbl) - row = await (await connect.execute(query)).fetchone() + row = await (await connect.execute(query)).first() assert row.test_label_name == 'test_name' assert row.id == 'test_id' @@ -50,7 +50,7 @@ async def test_priority_name_and_label(connect): [tbl.c.name.label('test_label_name'), tbl.c.name, tbl.c.id] ) query = query.select_from(tbl) - row = await (await connect.execute(query)).fetchone() + row = await (await connect.execute(query)).first() assert row.test_label_name == 'test_name' assert row.name == 'test_name' assert row.id == 'test_id' @@ -60,7 +60,7 @@ async def test_priority_name_all_get(connect): await connect.execute(tbl.insert().values(id='test_id', name='test_name')) query = sa.select([tbl.c.name]) query = query.select_from(tbl) - row = await (await connect.execute(query)).fetchone() + row = await (await connect.execute(query)).first() assert row.name == 'test_name' assert row['name'] == 'test_name' assert row[0] == 'test_name' diff --git a/tests/test_connection.py b/tests/test_connection.py index b42ee7cd..814f2747 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -20,14 +20,8 @@ @pytest.fixture def connect(make_connection): - async def go(**kwargs): - conn = await make_connection(**kwargs) - conn2 = await make_connection(**kwargs) - cur = await conn2.cursor() - await cur.execute("DROP TABLE IF EXISTS foo") - await conn2.close() - return conn + return await make_connection(**kwargs) return go @@ -289,36 +283,6 @@ async def inner(): await task -async def test_cancelled_connection_is_usable_asap(connect, loop): - async def inner(future, cursor): - future.set_result(None) - await cursor.execute("SELECT pg_sleep(10)") - - fut = asyncio.Future(loop=loop) - conn = await connect() - cur = await conn.cursor() - task = ensure_future(inner(fut, cur), loop=loop) - await fut - await asyncio.sleep(0.1, loop=loop) - - task.cancel() - - delay = 0.001 - - for tick in range(100): - await asyncio.sleep(delay, loop=loop) - status = conn._conn.get_transaction_status() - if status == psycopg2.extensions.TRANSACTION_STATUS_IDLE: - cur = await conn.cursor() - await cur.execute("SELECT 1") - ret = await cur.fetchone() - assert (1,) == ret - break - delay *= 2 - else: - assert False, "Cancelled connection transaction status never got idle" - - async def test_cancelled_connection_is_not_usable_until_cancellation(connect, loop): async def inner(future, cursor): diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 0bb68f2d..d0e56f77 100644 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -4,72 +4,77 @@ import psycopg2 import psycopg2.tz import pytest + from aiopg.connection import TIMEOUT @pytest.fixture def connect(make_connection): - async def go(**kwargs): conn = await make_connection(**kwargs) - cur = await conn.cursor() - await cur.execute("DROP TABLE IF EXISTS tbl") - await cur.execute("CREATE TABLE tbl (id int, name varchar(255))") - for i in [(1, 'a'), (2, 'b'), (3, 'c')]: - await cur.execute("INSERT INTO tbl VALUES(%s, %s)", i) - await cur.execute("DROP TABLE IF EXISTS tbl2") - await cur.execute("""CREATE TABLE tbl2 - (id int, name varchar(255)) - WITH OIDS""") - await cur.execute("DROP FUNCTION IF EXISTS inc(val integer)") - await cur.execute("""CREATE FUNCTION inc(val integer) - RETURNS integer AS $$ - BEGIN - RETURN val + 1; - END; $$ - LANGUAGE PLPGSQL;""") + async with conn.cursor() as cur: + await cur.execute("DROP TABLE IF EXISTS tbl") + await cur.execute("CREATE TABLE tbl (id int, name varchar(255))") + for i in [(1, 'a'), (2, 'b'), (3, 'c')]: + await cur.execute("INSERT INTO tbl VALUES(%s, %s)", i) + await cur.execute("DROP TABLE IF EXISTS tbl2") + await cur.execute("""CREATE TABLE tbl2 + (id int, name varchar(255)) + WITH OIDS""") + await cur.execute("DROP FUNCTION IF EXISTS inc(val integer)") + await cur.execute("""CREATE FUNCTION inc(val integer) + RETURNS integer AS $$ + BEGIN + RETURN val + 1; + END; $$ + LANGUAGE PLPGSQL;""") return conn return go -async def test_description(connect): - conn = await connect() - cur = await conn.cursor() - assert cur.description is None - await cur.execute('SELECT * from tbl;') +@pytest.yield_fixture +def cursor(connect, loop): + async def go(): + return await (await connect()).cursor() - assert len(cur.description) == 2, \ - 'cursor.description describes too many columns' + cur = loop.run_until_complete(go()) + yield cur + cur.close() - assert len(cur.description[0]) == 7, \ - 'cursor.description[x] tuples must have 7 elements' - assert cur.description[0][0].lower() == 'id', \ - 'cursor.description[x][0] must return column name' +async def test_description(cursor): + async with cursor as cur: + assert cur.description is None + await cur.execute('SELECT * from tbl;') - assert cur.description[1][0].lower() == 'name', \ - 'cursor.description[x][0] must return column name' + assert len(cur.description) == 2, \ + 'cursor.description describes too many columns' - # Make sure self.description gets reset, cursor should be - # set to None in case of none resulting queries like DDL - await cur.execute('DROP TABLE IF EXISTS foobar;') - assert cur.description is None + assert len(cur.description[0]) == 7, \ + 'cursor.description[x] tuples must have 7 elements' + assert cur.description[0][0].lower() == 'id', \ + 'cursor.description[x][0] must return column name' -async def test_raw(connect): - conn = await connect() - cur = await conn.cursor() - assert cur._impl is cur.raw + assert cur.description[1][0].lower() == 'name', \ + 'cursor.description[x][0] must return column name' + # Make sure self.description gets reset, cursor should be + # set to None in case of none resulting queries like DDL + await cur.execute('DROP TABLE IF EXISTS foobar;') + assert cur.description is None -async def test_close(connect): - conn = await connect() - cur = await conn.cursor() - cur.close() - assert cur.closed + +async def test_raw(cursor): + assert cursor._impl is cursor.raw + + +async def test_close(cursor): + cursor.close() + assert cursor.closed with pytest.raises(psycopg2.InterfaceError): - await cur.execute('SELECT 1') + await cursor.execute('SELECT 1') async def test_close_twice(connect): @@ -89,153 +94,119 @@ async def test_connection(connect): assert cur.connection is conn -async def test_name(connect): - conn = await connect() - cur = await conn.cursor() - assert cur.name is None +async def test_name(cursor): + assert cursor.name is None -async def test_scrollable(connect): - conn = await connect() - cur = await conn.cursor() - assert cur.scrollable is None +async def test_scrollable(cursor): + assert cursor.scrollable is None with pytest.raises(psycopg2.ProgrammingError): - cur.scrollable = True + cursor.scrollable = True -async def test_withhold(connect): - conn = await connect() - cur = await conn.cursor() - assert not cur.withhold +async def test_withhold(cursor): + assert not cursor.withhold with pytest.raises(psycopg2.ProgrammingError): - cur.withhold = True - assert not cur.withhold + cursor.withhold = True + assert not cursor.withhold -async def test_execute(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT 1') - ret = await cur.fetchone() +async def test_execute(cursor): + await cursor.execute('SELECT 1') + ret = await cursor.fetchone() assert (1,) == ret -async def test_executemany(connect): - conn = await connect() - cur = await conn.cursor() +async def test_executemany(cursor): with pytest.raises(psycopg2.ProgrammingError): - await cur.executemany('SELECT %s', ['1', '2']) + await cursor.executemany('SELECT %s', ['1', '2']) -async def test_mogrify(connect): - conn = await connect() - cur = await conn.cursor() - ret = await cur.mogrify('SELECT %s', ['1']) +async def test_mogrify(cursor): + ret = await cursor.mogrify('SELECT %s', ['1']) assert b"SELECT '1'" == ret -async def test_setinputsizes(connect): - conn = await connect() - cur = await conn.cursor() - await cur.setinputsizes(10) +async def test_setinputsizes(cursor): + await cursor.setinputsizes(10) -async def test_fetchmany(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT * from tbl;') - ret = await cur.fetchmany() +async def test_fetchmany(cursor): + await cursor.execute('SELECT * from tbl;') + ret = await cursor.fetchmany() assert [(1, 'a')] == ret - await cur.execute('SELECT * from tbl;') - ret = await cur.fetchmany(2) + await cursor.execute('SELECT * from tbl;') + ret = await cursor.fetchmany(2) assert [(1, 'a'), (2, 'b')] == ret -async def test_fetchall(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT * from tbl;') - ret = await cur.fetchall() +async def test_fetchall(cursor): + await cursor.execute('SELECT * from tbl;') + ret = await cursor.fetchall() assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret -async def test_scroll(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT * from tbl;') - await cur.scroll(1) - ret = await cur.fetchone() +async def test_scroll(cursor): + await cursor.execute('SELECT * from tbl;') + await cursor.scroll(1) + ret = await cursor.fetchone() assert (2, 'b') == ret -async def test_arraysize(connect): - conn = await connect() - cur = await conn.cursor() - assert 1 == cur.arraysize +async def test_arraysize(cursor): + assert 1 == cursor.arraysize - cur.arraysize = 10 - assert 10 == cur.arraysize + cursor.arraysize = 10 + assert 10 == cursor.arraysize -async def test_itersize(connect): - conn = await connect() - cur = await conn.cursor() - assert 2000 == cur.itersize +async def test_itersize(cursor): + assert 2000 == cursor.itersize - cur.itersize = 10 - assert 10 == cur.itersize + cursor.itersize = 10 + assert 10 == cursor.itersize -async def test_rows(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT * from tbl') - assert 3 == cur.rowcount - assert 0 == cur.rownumber - await cur.fetchone() - assert 1 == cur.rownumber +async def test_rows(cursor): + await cursor.execute('SELECT * from tbl') + assert 3 == cursor.rowcount + assert 0 == cursor.rownumber + await cursor.fetchone() + assert 1 == cursor.rownumber - assert 0 == cur.lastrowid - await cur.execute('INSERT INTO tbl2 VALUES (%s, %s)', - (4, 'd')) - assert 0 != cur.lastrowid + assert 0 == cursor.lastrowid + await cursor.execute( + 'INSERT INTO tbl2 VALUES (%s, %s)', + (4, 'd') + ) + assert 0 != cursor.lastrowid -async def test_query(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT 1') - assert b'SELECT 1' == cur.query +async def test_query(cursor): + await cursor.execute('SELECT 1') + assert b'SELECT 1' == cursor.query -async def test_statusmessage(connect): - conn = await connect() - cur = await conn.cursor() - await cur.execute('SELECT 1') - assert 'SELECT 1' == cur.statusmessage +async def test_statusmessage(cursor): + await cursor.execute('SELECT 1') + assert 'SELECT 1' == cursor.statusmessage -async def test_tzinfo_factory(connect): - conn = await connect() - cur = await conn.cursor() - assert psycopg2.tz.FixedOffsetTimezone is cur.tzinfo_factory +async def test_tzinfo_factory(cursor): + assert psycopg2.tz.FixedOffsetTimezone is cursor.tzinfo_factory - cur.tzinfo_factory = psycopg2.tz.LocalTimezone - assert psycopg2.tz.LocalTimezone is cur.tzinfo_factory + cursor.tzinfo_factory = psycopg2.tz.LocalTimezone + assert psycopg2.tz.LocalTimezone is cursor.tzinfo_factory -async def test_nextset(connect): - conn = await connect() - cur = await conn.cursor() +async def test_nextset(cursor): with pytest.raises(psycopg2.NotSupportedError): - await cur.nextset() + await cursor.nextset() -async def test_setoutputsize(connect): - conn = await connect() - cur = await conn.cursor() - await cur.setoutputsize(4, 1) +async def test_setoutputsize(cursor): + await cursor.setoutputsize(4, 1) async def test_copy_family(connect): @@ -337,8 +308,12 @@ async def test_iter(connect): conn = await connect() cur = await conn.cursor() await cur.execute("SELECT * FROM tbl") + rows = [] + async for r in cur: + rows.append(r) + data = [(1, 'a'), (2, 'b'), (3, 'c')] - for item, tst in zip(cur, data): + for item, tst in zip(rows, data): assert item == tst diff --git a/tests/test_pool.py b/tests/test_pool.py index 5aa9a987..06b10f92 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -197,14 +197,16 @@ async def test_release_with_invalid_status(create_pool): await cur.execute('BEGIN') cur.close() - with mock.patch("aiopg.pool.logger") as m_log: + with mock.patch("aiopg.pool.warnings") as m_log: pool.release(conn) assert 9 == pool.freesize assert not pool._used assert conn.closed - m_log.warning.assert_called_with( - "Invalid transaction status on released connection: %d", - TRANSACTION_STATUS_INTRANS) + m_log.warn.assert_called_with( + "Invalid transaction status on " + "released connection: {}".format(TRANSACTION_STATUS_INTRANS), + ResourceWarning + ) async def test_default_event_loop(create_pool, loop): @@ -232,14 +234,16 @@ async def test_release_with_invalid_status_wait_release(create_pool): await cur.execute('BEGIN') cur.close() - with mock.patch("aiopg.pool.logger") as m_log: + with mock.patch("aiopg.pool.warnings") as m_log: await pool.release(conn) assert 9 == pool.freesize assert not pool._used assert conn.closed - m_log.warning.assert_called_with( - "Invalid transaction status on released connection: %d", - TRANSACTION_STATUS_INTRANS) + m_log.warn.assert_called_with( + "Invalid transaction status on " + "released connection: {}".format(TRANSACTION_STATUS_INTRANS), + ResourceWarning + ) async def test__fill_free(create_pool, loop): @@ -496,14 +500,15 @@ async def sleep(conn): with (await pool) as conn: with pytest.raises(asyncio.TimeoutError): await sleep(conn) + conn.close() assert 0 == pool.freesize assert 0 == pool.size with (await pool) as conn: - cur = await conn.cursor() - await cur.execute('SELECT 1;') - val = await cur.fetchone() - assert (1,) == val + async with conn.cursor() as cur: + await cur.execute('SELECT 1;') + val = await cur.fetchone() + assert (1,) == val async def test_drop_connection_if_timedout(make_connection, @@ -548,11 +553,11 @@ async def test_pool_on_connect(create_pool): async def cb(connection): nonlocal called - cur = await connection.cursor() - await cur.execute('SELECT 1') - data = await cur.fetchall() - assert [(1,)] == data - called = True + async with connection.cursor() as cur: + await cur.execute('SELECT 1') + data = await cur.fetchall() + assert [(1,)] == data + called = True pool = await create_pool(on_connect=cb) diff --git a/tests/test_sa_connection.py b/tests/test_sa_connection.py index 36053462..1ba7793d 100644 --- a/tests/test_sa_connection.py +++ b/tests/test_sa_connection.py @@ -6,7 +6,7 @@ sa = pytest.importorskip("aiopg.sa") # noqa -from sqlalchemy import MetaData, Table, Column, Integer, String +from sqlalchemy import MetaData, Table, Column, Integer, String, select, func from sqlalchemy.schema import DropTable, CreateTable import psycopg2 @@ -122,7 +122,8 @@ async def test_execute_sa_insert_positional_params(connect): async def test_scalar(connect): conn = await connect() - res = await conn.scalar(tbl.count()) + tbl.count + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1, res diff --git a/tests/test_sa_cursor.py b/tests/test_sa_cursor.py new file mode 100644 index 00000000..74ef6188 --- /dev/null +++ b/tests/test_sa_cursor.py @@ -0,0 +1,94 @@ +import warnings + +import pytest +import sqlalchemy as sa + +meta = sa.MetaData() +tbl = sa.Table( + 'sa_tbl5', meta, + sa.Column('ID', sa.String, primary_key=True, key='id'), + sa.Column('Name', sa.String(255), key='name'), +) + + +@pytest.fixture +def connect(make_sa_connection, loop): + async def start(): + conn = await make_sa_connection() + await conn.execute('DROP TABLE IF EXISTS sa_tbl5') + await conn.execute( + 'CREATE TABLE sa_tbl5 (' + '"ID" VARCHAR(255) NOT NULL, ' + '"Name" VARCHAR(255), ' + 'PRIMARY KEY ("ID"))' + ) + + await conn.execute( + tbl.insert().values(id='test1', name='test_name')) + await conn.execute( + tbl.insert().values(id='test2', name='test_name')) + await conn.execute( + tbl.insert().values(id='test3', name='test_name')) + + return conn + + return loop.run_until_complete(start()) + + +async def test_insert(connect): + await connect.execute(tbl.insert().values(id='test-4', name='test_name')) + await connect.execute(tbl.insert().values(id='test-5', name='test_name')) + assert 5 == len(await (await connect.execute(tbl.select())).fetchall()) + + +async def test_insert_make_engine(make_engine, connect): + engine = await make_engine() + async with engine.acquire() as conn: + assert conn._cursor is None + + await conn.execute(tbl.insert().values(id='test-4', name='test_name')) + assert conn._cursor.closed is True + + resp = await conn.execute(tbl.select()) + assert resp.cursor.closed is False + assert conn._cursor.closed is False + + await conn.execute(tbl.insert().values(id='test-5', name='test_name')) + assert conn._cursor.closed is True + + resp = await conn.execute(tbl.select()) + assert resp.cursor.closed is False + + assert conn._cursor.closed is True + + assert conn.closed == 0 + + assert 5 == len(await (await connect.execute(tbl.select())).fetchall()) + + +async def test_two_cursor_create_context_manager(make_connection): + conn = await make_connection() + + error_ms = ( + 'You can only have one cursor per connection. ' + 'The cursor for connection will be closed forcibly' + ' {!r}.' + ) + + with warnings.catch_warnings(record=True) as wars: + warnings.simplefilter("always") + async with conn.cursor() as cur: + error_ms = error_ms.format(conn) + assert cur.closed is False + + async with conn.cursor() as cur2: + assert cur.closed is True + assert cur2.closed is False + + assert len(wars) == 1 + war = wars.pop() + assert issubclass(war.category, ResourceWarning) + assert str(war.message) == error_ms + assert cur2.closed is True + + assert cur.closed is True diff --git a/tests/test_sa_transaction.py b/tests/test_sa_transaction.py index 3e3c422f..88ed7ffb 100644 --- a/tests/test_sa_transaction.py +++ b/tests/test_sa_transaction.py @@ -3,7 +3,7 @@ import pytest sa = pytest.importorskip("aiopg.sa") # noqa -from sqlalchemy import MetaData, Table, Column, Integer, String +from sqlalchemy import MetaData, Table, Column, Integer, String, select, func meta = MetaData() tbl = Table('sa_tbl2', meta, @@ -48,12 +48,12 @@ async def go(**kwargs): async def test_without_transactions(connect): conn1 = await connect() conn2 = await connect() - res1 = await conn1.scalar(tbl.count()) + res1 = await conn1.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await conn2.execute(tbl.delete()) - res2 = await conn1.scalar(tbl.count()) + res2 = await conn1.scalar(select([func.count()]).select_from(tbl)) assert 0 == res2 @@ -71,14 +71,14 @@ async def test_root_transaction(connect): assert tr.is_active await conn1.execute(tbl.delete()) - res1 = await conn2.scalar(tbl.count()) + res1 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await tr.commit() assert not tr.is_active assert not conn1.in_transaction - res2 = await conn2.scalar(tbl.count()) + res2 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 0 == res2 @@ -90,13 +90,13 @@ async def test_root_transaction_rollback(connect): assert tr.is_active await conn1.execute(tbl.delete()) - res1 = await conn2.scalar(tbl.count()) + res1 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await tr.rollback() assert not tr.is_active - res2 = await conn2.scalar(tbl.count()) + res2 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res2 @@ -108,13 +108,13 @@ async def test_root_transaction_close(connect): assert tr.is_active await conn1.execute(tbl.delete()) - res1 = await conn2.scalar(tbl.count()) + res1 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await tr.close() assert not tr.is_active - res2 = await conn2.scalar(tbl.count()) + res2 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res2 @@ -170,12 +170,12 @@ async def test_rollback_on_connection_close(connect): tr = await conn1.begin() await conn1.execute(tbl.delete()) - res1 = await conn2.scalar(tbl.count()) + res1 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await conn1.close() - res2 = await conn2.scalar(tbl.count()) + res2 = await conn2.scalar(select([func.count()]).select_from(tbl)) assert 1 == res2 del tr @@ -191,7 +191,7 @@ async def test_inner_transaction_rollback(connect): assert not tr2.is_active assert not tr1.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1 == res @@ -207,7 +207,7 @@ async def test_inner_transaction_close(connect): assert tr1.is_active await tr1.commit() - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res @@ -223,14 +223,14 @@ async def test_nested_transaction_commit(connect): assert not tr2.is_active assert tr1.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res await tr1.commit() assert not tr2.is_active assert not tr1.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res @@ -248,7 +248,7 @@ async def test_nested_transaction_commit_twice(connect): assert not tr2.is_active assert tr1.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res await tr1.close() @@ -266,14 +266,14 @@ async def test_nested_transaction_rollback(connect): assert not tr2.is_active assert tr1.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1 == res await tr1.commit() assert not tr2.is_active assert not tr1.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1 == res @@ -292,7 +292,7 @@ async def test_nested_transaction_rollback_twice(connect): assert tr1.is_active await tr1.commit() - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1 == res @@ -307,7 +307,7 @@ async def test_twophase_transaction_commit(xa_connect): await tr.commit() assert not tr.is_active - res = await conn.scalar(tbl.count()) + res = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res @@ -332,7 +332,7 @@ async def test_transactions_sequence(xa_connect): tr1 = await conn.begin() assert tr1 is conn._transaction await conn.execute(tbl.insert().values(name='a')) - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await tr1.commit() @@ -341,7 +341,7 @@ async def test_transactions_sequence(xa_connect): tr2 = await conn.begin() assert tr2 is conn._transaction await conn.execute(tbl.insert().values(name='b')) - res2 = await conn.scalar(tbl.count()) + res2 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res2 await tr2.rollback() @@ -350,7 +350,7 @@ async def test_transactions_sequence(xa_connect): tr3 = await conn.begin() assert tr3 is conn._transaction await conn.execute(tbl.insert().values(name='b')) - res3 = await conn.scalar(tbl.count()) + res3 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res3 await tr3.commit() @@ -364,50 +364,50 @@ async def test_transaction_mode(connect): tr1 = await conn.begin(isolation_level='SERIALIZABLE') await conn.execute(tbl.insert().values(name='a')) - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 1 == res1 await tr1.commit() tr2 = await conn.begin(isolation_level='REPEATABLE READ') await conn.execute(tbl.insert().values(name='b')) - res2 = await conn.scalar(tbl.count()) + res2 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 2 == res2 await tr2.commit() tr3 = await conn.begin(isolation_level='READ UNCOMMITTED') await conn.execute(tbl.insert().values(name='c')) - res3 = await conn.scalar(tbl.count()) + res3 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 3 == res3 await tr3.commit() tr4 = await conn.begin(readonly=True) assert tr4 is conn._transaction - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 3 == res1 await tr4.commit() tr5 = await conn.begin(isolation_level='READ UNCOMMITTED', readonly=True) - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 3 == res1 await tr5.commit() tr6 = await conn.begin(deferrable=True) await conn.execute(tbl.insert().values(name='f')) - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 4 == res1 await tr6.commit() tr7 = await conn.begin(isolation_level='REPEATABLE READ', deferrable=True) await conn.execute(tbl.insert().values(name='g')) - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 5 == res1 await tr7.commit() tr8 = await conn.begin(isolation_level='SERIALIZABLE', readonly=True, deferrable=True) assert tr8 is conn._transaction - res1 = await conn.scalar(tbl.count()) + res1 = await conn.scalar(select([func.count()]).select_from(tbl)) assert 5 == res1 await tr8.commit() diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 4e7c2e2a..1be68c4d 100644 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -1,5 +1,6 @@ import psycopg2 import pytest + from aiopg import IsolationLevel, Transaction from aiopg.transaction import IsolationCompiler @@ -8,7 +9,7 @@ def engine(make_engine, loop): async def start(): engine = await make_engine() - with (await engine) as cur: + async with engine.acquire() as cur: await cur.execute("DROP TABLE IF EXISTS tbl") await cur.execute("CREATE TABLE tbl (id int, " "name varchar(255))") @@ -27,7 +28,7 @@ async def start(): ]) async def test_transaction_oldstyle(engine, isolation_level, readonly, deferrable): - with (await engine) as cur: + async with engine.acquire() as cur: tr = Transaction(cur, isolation_level, readonly=readonly, deferrable=deferrable) await tr.begin() @@ -44,7 +45,11 @@ async def test_transaction_oldstyle(engine, isolation_level, readonly, async def two_begin(cur): tr = Transaction(cur, IsolationLevel.read_committed) await tr.begin() - await tr.begin() + try: + await tr.begin() + except psycopg2.ProgrammingError as e: + await tr.rollback() + raise e async def two_commit(cur): @@ -74,8 +79,11 @@ async def e_release_savepoint(cur): async def two_rollback_savepoint(cur): tr = Transaction(cur, IsolationLevel.read_committed) await tr.begin() - await tr.release_savepoint() - await tr.commit() + try: + await tr.release_savepoint() + except psycopg2.ProgrammingError as e: + await tr.commit() + raise e async def e_savepoint(cur): @@ -87,8 +95,12 @@ async def e_commit_savepoint(cur): tr = Transaction(cur, IsolationLevel.read_committed) await tr.begin() await tr.savepoint() - await tr.savepoint() - await tr.commit() + try: + await tr.savepoint() + except psycopg2.ProgrammingError as e: + await tr.rollback_savepoint() + await tr.commit() + raise e @pytest.mark.parametrize('fn', [ @@ -98,7 +110,7 @@ async def e_commit_savepoint(cur): ]) async def test_transaction_fail_oldstyle(engine, fn): with pytest.raises(psycopg2.ProgrammingError): - with (await engine) as cur: + async with engine.acquire() as cur: await fn(cur) @@ -119,11 +131,12 @@ def begin(self): async def test_transaction_finalization_warning(engine, monkeypatch): - with (await engine) as cur: + async with engine.acquire() as cur: tr = Transaction(cur, IsolationLevel.read_committed) def valid(x, _): assert x in [ + 'Invalid transaction status on released connection: 2', 'You have not closed transaction {!r}'.format(tr), 'You have not closed savepoint {!r}'.format(tr) ] @@ -134,18 +147,18 @@ def valid(x, _): async def test_transaction_readonly_insert_oldstyle(engine): - with (await engine) as cur: + async with engine.acquire() as cur: tr = Transaction(cur, IsolationLevel.serializable, readonly=True) await tr.begin() with pytest.raises(psycopg2.InternalError): await cur.execute("insert into tbl values(1, 'data')") - await tr.rollback() + await tr.rollback() async def test_transaction_readonly_oldstyle(engine): - with (await engine) as cur: + async with engine.acquire() as cur: tr = Transaction(cur, IsolationLevel.serializable, readonly=True) await tr.begin() @@ -158,7 +171,7 @@ async def test_transaction_readonly_oldstyle(engine): async def test_transaction_point_oldstyle(engine): - with (await engine) as cur: + async with engine.acquire() as cur: tr = Transaction(cur, IsolationLevel.read_committed) await tr.begin()