Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework aiopg.sa.result module with Cython. #664

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
env:

install:
- pip install -U setuptools
- pip install -U setuptools cython
- python setup.py install
- pip install -Ur requirements.txt
- pip install codecov
Expand Down
256 changes: 140 additions & 116 deletions aiopg/sa/result.py → aiopg/sa/result.pyx
Original file line number Diff line number Diff line change
@@ -1,69 +1,82 @@
import weakref
from collections.abc import Mapping, Sequence

#cython: language_level=3, boundscheck=False, wraparound=False, nonecheck=False
from cpython cimport Py_XINCREF, PyObject
from cpython.tuple cimport PyTuple_SET_ITEM, PyTuple_GET_ITEM, PyTuple_GetItem
from cpython.dict cimport PyDict_GetItem
from cpython.list cimport PyList_New, PyList_SET_ITEM
from sqlalchemy.sql import expression, sqltypes

from . import exc


class RowProxy(Mapping):
__slots__ = ('_result_proxy', '_row', '_processors', '_keymap')
cdef class RowProxy:
cdef ResultMetaData _result_metadata
cdef tuple _row
cdef dict _key_to_index

def __init__(self, result_proxy, row, processors, keymap):
def __cinit__(self, result_metadata, row, key_to_index):
"""RowProxy objects are constructed by ResultProxy objects."""
self._result_proxy = result_proxy
self._result_metadata = result_metadata
self._row = row
self._processors = processors
self._keymap = keymap
self._key_to_index = key_to_index

def __iter__(self):
return iter(self._result_proxy.keys)
return iter(self._result_metadata.keys)

def __len__(self):
return len(self._row)

def __getitem__(self, key):
try:
processor, obj, index = self._keymap[key]
except KeyError:
processor, obj, index = self._result_proxy._key_fallback(key)
# Do we need slicing at all? RowProxy now is Mapping not Sequence
# except TypeError:
# if isinstance(key, slice):
# l = []
# for processor, value in zip(self._processors[key],
# self._row[key]):
# if processor is None:
# l.append(value)
# else:
# l.append(processor(value))
# return tuple(l)
# else:
# raise
cdef object _getitem(self, key):
cdef PyObject* item
cdef int index
cdef tuple data

item = PyDict_GetItem(self._key_to_index, key)
if item is NULL:
fallback_key = self._result_metadata._key_fallback(key)
if fallback_key is None:
raise KeyError(key)
item = PyDict_GetItem(self._key_to_index, fallback_key)
if item is NULL:
raise KeyError(key)
index = <object>item
if index is None:
raise exc.InvalidRequestError(
"Ambiguous column name '%s' in result set! "
"try 'use_labels' option on select statement." % key)
if processor is not None:
return processor(self._row[index])
else:
return self._row[index]
"Ambiguous column name %s in result set! "
"try 'use_labels' option on select statement." % repr(key)
)
item = PyTuple_GetItem(self._row, index)
if item is NULL:
return None
return <object>item

def __getitem__(self, item):
cdef PyObject* result

def __getattr__(self, name):
if isinstance(item, int):
result = PyTuple_GetItem(self._row, item)
if result is NULL:
raise KeyError(item)
return <object>result
return self._getitem(item)

cdef object _getattr(self, str name):
try:
return self[name]
return self._getitem(name)
except KeyError as e:
raise AttributeError(e.args[0])

def __getattr__(self, str item):
return self._getattr(item)

def __contains__(self, key):
return self._result_proxy._has_key(self._row, key)
return self._result_metadata._has_key(self._row, key)

__hash__ = None

def __eq__(self, other):
if isinstance(other, RowProxy):
if hasattr(other, 'as_tuple'):
return self.as_tuple() == other.as_tuple()
elif isinstance(other, Sequence):
elif hasattr(other, 'index') and hasattr(other, 'count'):
return self.as_tuple() == other
else:
return NotImplemented
Expand All @@ -77,20 +90,41 @@ def as_tuple(self):
def __repr__(self):
return repr(self.as_tuple())

def keys(self):
return list(self._result_metadata.keys)

def values(self):
return list(self._row)

def items(self):
return zip(self._result_metadata.keys, self._row)

class ResultMetaData(object):
def get(self, item, default=None):
try:
return self._getitem(item)
except KeyError:
if default is not None:
return default
return None


cdef class ResultMetaData:
"""Handle cursor.description, applying additional info from an execution
context."""
cdef public list keys
cdef public dict _keymap
cdef public list _processors
cdef public dict _key_to_index

def __init__(self, result_proxy, cursor_description):
self._processors = processors = []

def __init__(self, ResultProxy result_proxy, cursor_description):
map_type, map_column_name = self.result_map(result_proxy._result_map)

# We do not strictly need to store the processor in the key mapping,
# though it is faster in the Python version (probably because of the
# saved attribute lookup self._processors)
self._keymap = keymap = {}
self._processors = processors = []
self._key_to_index = key_to_index = {}
self.keys = []
dialect = result_proxy.dialect

Expand All @@ -102,12 +136,11 @@ def __init__(self, result_proxy, cursor_description):
assert dialect.case_sensitive, \
"Doesn't support case insensitive database connection"

# high precedence key values.
primary_keymap = {}

assert not dialect.description_encoding, \
"psycopg in py3k should not use this"

ambiguous = []

for i, rec in enumerate(cursor_description):
colname = rec[0]
coltype = rec[1]
Expand All @@ -116,43 +149,29 @@ def __init__(self, result_proxy, cursor_description):
# if dialect.requires_name_normalize:
# colname = dialect.normalize_name(colname)

name, obj, type_ = (
map_column_name.get(colname, colname),
None,
map_type.get(colname, typemap.get(coltype, sqltypes.NULLTYPE))
name = str(map_column_name.get(colname, colname))
type_ = map_type.get(
colname,
typemap.get(coltype, sqltypes.NULLTYPE)
)

processor = type_._cached_result_processor(dialect, coltype)

processors.append(processor)
rec = (processor, obj, i)
rec = (processor, i)

# indexes as keys. This is only needed for the Python version of
# RowProxy (the C version uses a faster path for integer indexes).
primary_keymap[i] = rec

# populate primary keymap, looking for conflicts.
if primary_keymap.setdefault(name, rec) is not rec:
# place a record that doesn't have the "index" - this
# is interpreted later as an AmbiguousColumnError,
# but only when actually accessed. Columns
# colliding by name is not a problem if those names
# aren't used; integer access is always
# unambiguous.
primary_keymap[name] = rec = (None, obj, None)
if name in keymap:
ambiguous.append(name)
keymap[name] = rec
key_to_index[name] = i

self.keys.append(name)
if obj:
for o in obj:
keymap[o] = rec
# technically we should be doing this but we
# are saving on callcounts by not doing so.
# if keymap.setdefault(o, rec) is not rec:
# keymap[o] = (None, obj, None)

# overwrite keymap values with those of the
# high precedence keymap.
keymap.update(primary_keymap)

for name in ambiguous:
keymap[name] = (None, None)
key_to_index[name] = None

for processor, i in keymap.values():
if processor is not None:
processors.append((processor, i))

def result_map(self, data_map):
data_map = data_map or {}
Expand All @@ -166,50 +185,30 @@ def result_map(self, data_map):

return map_type, map_column_name

def _key_fallback(self, key, raiseerr=True):
def _key_fallback(self, key):
map = self._keymap
result = None
if isinstance(key, str):
result = map.get(key)
# fallback for targeting a ColumnElement to a textual expression
# this is a rare use case which only occurs when matching text()
# or colummn('name') constructs to ColumnElements, or after a
# pickle/unpickle roundtrip
elif isinstance(key, expression.ColumnElement):
if isinstance(key, expression.ColumnElement):
if (key._label and key._label in map):
result = map[key._label]
result = key._label
elif (hasattr(key, 'key') and key.key in map):
# match is only on name.
result = map[key.key]
# search extra hard to make sure this
# isn't a column/label name overlap.
# this check isn't currently available if the row
# was unpickled.
if result is not None and result[1] is not None:
for obj in result[1]:
if key._compare_name_for_result(obj):
break
else:
result = None
if result is None:
if raiseerr:
raise exc.NoSuchColumnError(
"Could not locate column in row for column '%s'" %
expression._string_or_unprintable(key))
else:
return None
else:
map[key] = result
result = key.key

return result

def _has_key(self, row, key):
if key in self._keymap:
return True
else:
return self._key_fallback(key, False) is not None
return self._key_fallback(key) in self._keymap


class ResultProxy:
cdef class ResultProxy:
"""Wraps a DB-API cursor object to provide easier access to row columns.

Individual columns may be accessed by their integer position,
Expand All @@ -228,15 +227,20 @@ class ResultProxy:
data using sqlalchemy TypeEngine objects, which are referenced from
the originating SQL statement that produced this result set.
"""

def __init__(self, connection, cursor, dialect, result_map=None):
cdef object _dialect
cdef public object _result_map
cdef object _cursor
cdef object _connection
cdef object _rowcount
cdef object _metadata

def __cinit__(self, connection, cursor, dialect, result_map=None):
self._dialect = dialect
self._result_map = result_map
self._cursor = cursor
self._connection = connection
self._rowcount = cursor.rowcount
self._metadata = None
self._weak = None
self._init_metadata()

@property
Expand Down Expand Up @@ -290,10 +294,8 @@ def _init_metadata(self):
cursor_description = self.cursor.description
if cursor_description is not None:
self._metadata = ResultMetaData(self, cursor_description)
self._weak = weakref.ref(self, lambda wr: self.cursor.close())
else:
self.close()
self._weak = None

@property
def returns_rows(self):
Expand Down Expand Up @@ -333,7 +335,9 @@ def close(self):
self.cursor.close()
# allow consistent errors
self._cursor = None
self._weak = None

def __del__(self):
self.close()

def __aiter__(self):
return self
Expand All @@ -353,13 +357,33 @@ def _non_result(self):
else:
raise exc.ResourceClosedError("This result object is closed.")

def _process_rows(self, rows):
process_row = RowProxy
cdef list _process_rows(self, list rows):
cdef int i
cdef list results

results = PyList_New(len(rows))
metadata = self._metadata
keymap = metadata._keymap
processors = metadata._processors
return [process_row(metadata, row, processors, keymap)
for row in rows]
key_to_index = metadata._key_to_index
processors = self._metadata._processors
i = 0
for row in rows:
for processor, index in processors:
value = <object>PyTuple_GET_ITEM(row, index)
if value is not None:
value = processor(value)
Py_XINCREF(<PyObject*>value)

PyTuple_SET_ITEM(row, index, value)
row_proxy = RowProxy(
metadata,
row,
key_to_index
)
Py_XINCREF(<PyObject*>row_proxy)
check = PyList_SET_ITEM(results, i, row_proxy)
i += 1

return results

async def fetchall(self):
"""Fetch all rows, just like DB-API cursor.fetchall()."""
Expand Down
Loading