Skip to content

Commit

Permalink
refactor one connect one cursore (#538)
Browse files Browse the repository at this point in the history
  • Loading branch information
vir-mir authored Mar 13, 2019
1 parent 2e05b0d commit 9fdf7b9
Show file tree
Hide file tree
Showing 16 changed files with 447 additions and 329 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ vtest: flake
pytest tests

cov cover coverage: flake
py.test --cov=aiopg --cov-report=html --cov-report=term tests
py.test -svvv -rs --cov=aiopg --cov-report=html --cov-report=term tests
@echo "open file://`pwd`/htmlcov/index.html"

cov-ci: flake
py.test -v --cov --cov-report=term tests --pg_tag all
py.test -svvv -rs --cov --cov-report=term tests --pg_tag all

clean:
find . -name __pycache__ |xargs rm -rf
Expand Down
8 changes: 8 additions & 0 deletions aiopg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import re
import sys
import warnings
from collections import namedtuple

from .connection import connect, Connection, TIMEOUT as DEFAULT_TIMEOUT
from .cursor import Cursor
from .pool import create_pool, Pool
from .transaction import IsolationLevel, Transaction

warnings.filterwarnings(
'always', '.*',
category=ResourceWarning,
module=r'aiopg(\.\w+)+',
append=False
)

__all__ = ('connect', 'create_pool', 'Connection', 'Cursor', 'Pool',
'version', 'version_info', 'DEFAULT_TIMEOUT', 'IsolationLevel',
'Transaction')
Expand Down
80 changes: 51 additions & 29 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import asyncio
import contextlib
import errno
import platform
import select
import sys
import traceback
import warnings
import weakref
import platform

import psycopg2
from psycopg2.extensions import (
POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR)
from psycopg2 import extras
from psycopg2.extensions import (
POLL_OK,
POLL_READ,
POLL_WRITE,
POLL_ERROR,
)

from .cursor import Cursor
from .utils import _ContextManager, create_future


__all__ = ('connect',)


TIMEOUT = 60.0

# Windows specific error code, not in errno for some reason, and doesnt map
Expand Down Expand Up @@ -118,7 +120,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
self._cancelling = False
self._cancellation_waiter = None
self._echo = echo
self._conn_cursor = None
self._cursor_instance = None
self._notifies = asyncio.Queue(loop=loop)
self._weakref = weakref.ref(self)
self._loop.add_reader(self._fileno, self._ready, self._weakref)
Expand Down Expand Up @@ -198,7 +200,7 @@ def _fatal_error(self, message):
self._loop.call_exception_handler({
'message': message,
'connection': self,
})
})
self.close()
if self._waiter and not self._waiter.done():
self._waiter.set_exception(psycopg2.OperationalError(message))
Expand Down Expand Up @@ -273,37 +275,31 @@ def cursor(self, name=None, cursor_factory=None,
NOTE: as of [TODO] any previously created created cursor from this
connection will be closed
"""
self.close_cursor()

self._last_usage = self._loop.time()
coro = self._cursor(name=name, cursor_factory=cursor_factory,
core = self._cursor(name=name, cursor_factory=cursor_factory,
scrollable=scrollable, withhold=withhold,
timeout=timeout)
return _ContextManager(coro)
return _ContextManager(core)

def cursor_created(self, cursor):
if self._conn_cursor and not self._conn_cursor.closed:
raise Exception("You can only have one cursor per connection")

self._conn_cursor = cursor
async def _cursor(self, name=None, cursor_factory=None,
scrollable=None, withhold=False, timeout=None):

def cursor_closed(self, cursor):
if cursor != self._conn_cursor:
raise Exception("You can only have one cursor per connection")
if not self.closed_cursor:
warnings.warn(('You can only have one cursor per connection. '
'The cursor for connection will be closed forcibly'
' {!r}.').format(self), ResourceWarning)

self._conn_cursor = None
self.free_cursor()

async def _cursor(self, name=None, cursor_factory=None,
scrollable=None, withhold=False, timeout=None):
if timeout is None:
timeout = self._timeout

impl = await self._cursor_impl(name=name,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold)
cursor = Cursor(self, impl, timeout, self._echo)
return cursor
self._cursor_instance = Cursor(self, impl, timeout, self._echo)
return self._cursor_instance

async def _cursor_impl(self, name=None, cursor_factory=None,
scrollable=None, withhold=False):
Expand All @@ -325,23 +321,30 @@ def _close(self):
self._writing = False
self._loop.remove_writer(self._fileno)

self.close_cursor()
self._conn.close()
self.free_cursor()

if self._waiter is not None and not self._waiter.done():
self._waiter.set_exception(
psycopg2.OperationalError("Connection closed"))

@property
def closed_cursor(self):
if not self._cursor_instance:
return True

return self._cursor_instance.closed

def free_cursor(self):
if not self.closed_cursor:
self._cursor_instance.close()

def close(self):
self._close()
ret = create_future(self._loop)
ret.set_result(None)
return ret

def close_cursor(self):
if self._conn_cursor:
self._conn_cursor.close()

@property
def closed(self):
"""Connection status.
Expand Down Expand Up @@ -514,6 +517,25 @@ def echo(self):
"""Return echo mode status."""
return self._echo

def __repr__(self):
msg = (
'<'
'{module_name}::{class_name} '
'isexecuting={isexecuting}, '
'closed={closed}, '
'echo={echo}, '
'cursor={cursor}'
'>'
)
return msg.format(
module_name=type(self).__module__,
class_name=type(self).__name__,
echo=self.echo,
isexecuting=self._isexecuting(),
closed=bool(self.closed),
cursor=repr(self._cursor_instance)
)

def __del__(self):
try:
_conn = self._conn
Expand Down
18 changes: 15 additions & 3 deletions aiopg/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ def __init__(self, conn, impl, timeout, echo):
self._echo = echo
self._transaction = Transaction(self, IsolationLevel.repeatable_read)

conn.cursor_created(self)

@property
def echo(self):
"""Return echo mode status."""
Expand Down Expand Up @@ -52,7 +50,6 @@ def close(self):
"""Close the cursor now."""
if not self.closed:
self._impl.close()
self._conn.cursor_closed(self)

@property
def closed(self):
Expand Down Expand Up @@ -393,3 +390,18 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()
return

def __repr__(self):
msg = (
'<'
'{module_name}::{class_name} '
'name={name}, '
'closed={closed}'
'>'
)
return msg.format(
module_name=type(self).__module__,
class_name=type(self).__name__,
name=self.name,
closed=self.closed
)
24 changes: 14 additions & 10 deletions aiopg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import sys
import warnings


from psycopg2.extensions import TRANSACTION_STATUS_IDLE

from .connection import connect, TIMEOUT
from .log import logger
from .utils import (_PoolContextManager, _PoolConnectionContextManager,
_PoolCursorContextManager, _PoolAcquireContextManager,
ensure_future, create_future)

from .utils import (
_PoolContextManager,
_PoolConnectionContextManager,
_PoolCursorContextManager,
_PoolAcquireContextManager,
ensure_future,
create_future,
)

PY_341 = sys.version_info >= (3, 4, 1)

Expand Down Expand Up @@ -243,15 +245,17 @@ def release(self, conn):
if not conn.closed:
tran_status = conn._conn.get_transaction_status()
if tran_status != TRANSACTION_STATUS_IDLE:
logger.warning(
"Invalid transaction status on released connection: %d",
tran_status)
warnings.warn(
("Invalid transaction status on "
"released connection: {}").format(tran_status),
ResourceWarning
)
conn.close()
return fut
if self._closing:
conn.close()
else:
conn.close_cursor() # there may be weak-refs to these cursors
conn.free_cursor()
self._free.append(conn)
fut = ensure_future(self._wakeup(), loop=self._loop)
return fut
Expand Down
12 changes: 11 additions & 1 deletion aiopg/sa/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self, connection, engine):
self._savepoint_seq = 0
self._engine = engine
self._dialect = engine.dialect
self._cursor = None

def execute(self, query, *multiparams, **params):
"""Executes a SQL query with optional parameters.
Expand Down Expand Up @@ -57,8 +58,15 @@ def execute(self, query, *multiparams, **params):
coro = self._execute(query, *multiparams, **params)
return _SAConnectionContextManager(coro)

async def _get_cursor(self):
if self._cursor and not self._cursor.closed:
return self._cursor

self._cursor = await self._connection.cursor()
return self._cursor

async def _execute(self, query, *multiparams, **params):
cursor = await self._connection.cursor()
cursor = await self._get_cursor()
dp = _distill_params(multiparams, params)
if len(dp) > 1:
raise exc.ArgumentError("aiopg doesn't support executemany")
Expand Down Expand Up @@ -104,6 +112,7 @@ async def _execute(self, query, *multiparams, **params):
"and execution with parameters")
post_processed_params = [compiled.construct_params()]
result_map = None

await cursor.execute(str(compiled), post_processed_params[0])
else:
raise exc.ArgumentError("sql statement should be str or "
Expand Down Expand Up @@ -331,6 +340,7 @@ async def close(self):
self._transaction = None
# don't close underlying connection, it can be reused by pool
# conn.close()

self._engine.release(self)
self._connection = None
self._engine = None
Expand Down
Loading

0 comments on commit 9fdf7b9

Please sign in to comment.