From 420ee855134e3bf336940807237d841aa64356cb Mon Sep 17 00:00:00 2001 From: Andrei Pashkin Date: Wed, 18 Mar 2020 12:46:38 +0300 Subject: [PATCH] Rework aiopg.sa.result module with Cython. --- .travis.yml | 2 +- aiopg/sa/{result.py => result.pyx} | 239 +++++++++++++++-------------- setup.py | 16 +- 3 files changed, 137 insertions(+), 120 deletions(-) rename aiopg/sa/{result.py => result.pyx} (67%) diff --git a/.travis.yml b/.travis.yml index 99dada51..a95faa2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,7 +23,7 @@ jobs: env: install: -- pip install -U setuptools +- pip install -U setuptools cython - python setup.py install - pip install -Ur requirements.txt - pip install codecov diff --git a/aiopg/sa/result.py b/aiopg/sa/result.pyx similarity index 67% rename from aiopg/sa/result.py rename to aiopg/sa/result.pyx index d58bf7b6..a6d8846d 100644 --- a/aiopg/sa/result.py +++ b/aiopg/sa/result.pyx @@ -1,69 +1,84 @@ -import weakref -from collections.abc import Mapping, Sequence - +#cython: language_level=3, boundscheck=False, wraparound=False, nonecheck=False +from cpython cimport Py_XINCREF, PyObject +from cpython.tuple cimport PyTuple_SET_ITEM, PyTuple_GET_ITEM, PyTuple_GetItem +from cpython.dict cimport PyDict_GetItem +from cpython.list cimport PyList_New, PyList_SET_ITEM from sqlalchemy.sql import expression, sqltypes from . import exc -class RowProxy(Mapping): - __slots__ = ('_result_proxy', '_row', '_processors', '_keymap') +cdef inline object _raise_on_ambigous_column(key): + raise exc.InvalidRequestError( + "Ambiguous column name %s in result set! " + "try 'use_labels' option on select statement." % repr(key) + ) + - def __init__(self, result_proxy, row, processors, keymap): +cdef class RowProxy: + cdef ResultMetaData _result_metadata + cdef tuple _row + cdef dict _key_to_index + + def __cinit__(self, result_metadata, row, key_to_index): """RowProxy objects are constructed by ResultProxy objects.""" - self._result_proxy = result_proxy + self._result_metadata = result_metadata self._row = row - self._processors = processors - self._keymap = keymap + self._key_to_index = key_to_index def __iter__(self): - return iter(self._result_proxy.keys) + return iter(self._result_metadata.keys) def __len__(self): return len(self._row) - def __getitem__(self, key): - try: - processor, obj, index = self._keymap[key] - except KeyError: - processor, obj, index = self._result_proxy._key_fallback(key) - # Do we need slicing at all? RowProxy now is Mapping not Sequence - # except TypeError: - # if isinstance(key, slice): - # l = [] - # for processor, value in zip(self._processors[key], - # self._row[key]): - # if processor is None: - # l.append(value) - # else: - # l.append(processor(value)) - # return tuple(l) - # else: - # raise - if index is None: - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in result set! " - "try 'use_labels' option on select statement." % key) - if processor is not None: - return processor(self._row[index]) - else: - return self._row[index] + cdef object _getitem(self, key): + cdef PyObject* item + cdef int index + cdef tuple data + + item = PyDict_GetItem(self._key_to_index, key) + + if item is NULL: + fallback_key = self._result_metadata._key_fallback(key) + if fallback_key is None: + _raise_on_ambigous_column(key) + item = PyDict_GetItem(self._key_to_index, fallback_key) + if item is NULL: + _raise_on_ambigous_column(key) + + index = item - def __getattr__(self, name): + return PyTuple_GET_ITEM(self._row, index) + + def __getitem__(self, item): + cdef PyObject* result + + if isinstance(item, int): + result = PyTuple_GetItem(self._row, item) + if result is NULL: + raise KeyError(item) + return result + return self._getitem(item) + + cdef object _getattr(self, str name): try: - return self[name] + return self._getitem(name) except KeyError as e: raise AttributeError(e.args[0]) + def __getattr__(self, str item): + return self._getattr(item) + def __contains__(self, key): - return self._result_proxy._has_key(self._row, key) + return self._result_metadata._has_key(self._row, key) __hash__ = None def __eq__(self, other): - if isinstance(other, RowProxy): + if hasattr(other, 'as_tuple'): return self.as_tuple() == other.as_tuple() - elif isinstance(other, Sequence): + elif hasattr(other, 'index') and hasattr(other, 'count'): return self.as_tuple() == other else: return NotImplemented @@ -71,26 +86,30 @@ def __eq__(self, other): def __ne__(self, other): return not self == other - def as_tuple(self): + cdef tuple as_tuple(self): return tuple(self[k] for k in self) def __repr__(self): return repr(self.as_tuple()) -class ResultMetaData(object): +cdef class ResultMetaData: """Handle cursor.description, applying additional info from an execution context.""" + cdef public list keys + cdef public dict _keymap + cdef public list _processors + cdef public dict _key_to_index - def __init__(self, result_proxy, cursor_description): - self._processors = processors = [] - + def __init__(self, ResultProxy result_proxy, cursor_description): map_type, map_column_name = self.result_map(result_proxy._result_map) # We do not strictly need to store the processor in the key mapping, # though it is faster in the Python version (probably because of the # saved attribute lookup self._processors) self._keymap = keymap = {} + self._processors = processors = [] + self._key_to_index = key_to_index = {} self.keys = [] dialect = result_proxy.dialect @@ -102,12 +121,11 @@ def __init__(self, result_proxy, cursor_description): assert dialect.case_sensitive, \ "Doesn't support case insensitive database connection" - # high precedence key values. - primary_keymap = {} - assert not dialect.description_encoding, \ "psycopg in py3k should not use this" + ambiguous = [] + for i, rec in enumerate(cursor_description): colname = rec[0] coltype = rec[1] @@ -116,43 +134,27 @@ def __init__(self, result_proxy, cursor_description): # if dialect.requires_name_normalize: # colname = dialect.normalize_name(colname) - name, obj, type_ = ( - map_column_name.get(colname, colname), - None, - map_type.get(colname, typemap.get(coltype, sqltypes.NULLTYPE)) + name = str(map_column_name.get(colname, colname)) + type_ = map_type.get( + colname, + typemap.get(coltype, sqltypes.NULLTYPE) ) - processor = type_._cached_result_processor(dialect, coltype) + if processor: + processors.append((processor, i)) - processors.append(processor) - rec = (processor, obj, i) - - # indexes as keys. This is only needed for the Python version of - # RowProxy (the C version uses a faster path for integer indexes). - primary_keymap[i] = rec + rec = (processor, i) - # populate primary keymap, looking for conflicts. - if primary_keymap.setdefault(name, rec) is not rec: - # place a record that doesn't have the "index" - this - # is interpreted later as an AmbiguousColumnError, - # but only when actually accessed. Columns - # colliding by name is not a problem if those names - # aren't used; integer access is always - # unambiguous. - primary_keymap[name] = rec = (None, obj, None) + if name in keymap: + ambiguous.append(name) + keymap[name] = rec + key_to_index[name] = i self.keys.append(name) - if obj: - for o in obj: - keymap[o] = rec - # technically we should be doing this but we - # are saving on callcounts by not doing so. - # if keymap.setdefault(o, rec) is not rec: - # keymap[o] = (None, obj, None) - - # overwrite keymap values with those of the - # high precedence keymap. - keymap.update(primary_keymap) + + for name in ambiguous: + keymap.pop(name) + key_to_index.pop(name) def result_map(self, data_map): data_map = data_map or {} @@ -166,50 +168,32 @@ def result_map(self, data_map): return map_type, map_column_name - def _key_fallback(self, key, raiseerr=True): + def _key_fallback(self, key): map = self._keymap result = None if isinstance(key, str): - result = map.get(key) + result = key # fallback for targeting a ColumnElement to a textual expression # this is a rare use case which only occurs when matching text() # or colummn('name') constructs to ColumnElements, or after a # pickle/unpickle roundtrip elif isinstance(key, expression.ColumnElement): if (key._label and key._label in map): - result = map[key._label] + result = key._label elif (hasattr(key, 'key') and key.key in map): # match is only on name. - result = map[key.key] - # search extra hard to make sure this - # isn't a column/label name overlap. - # this check isn't currently available if the row - # was unpickled. - if result is not None and result[1] is not None: - for obj in result[1]: - if key._compare_name_for_result(obj): - break - else: - result = None - if result is None: - if raiseerr: - raise exc.NoSuchColumnError( - "Could not locate column in row for column '%s'" % - expression._string_or_unprintable(key)) - else: - return None - else: - map[key] = result + result = key.key + return result def _has_key(self, row, key): if key in self._keymap: return True else: - return self._key_fallback(key, False) is not None + return self._key_fallback(key) is not None -class ResultProxy: +cdef class ResultProxy: """Wraps a DB-API cursor object to provide easier access to row columns. Individual columns may be accessed by their integer position, @@ -228,15 +212,20 @@ class ResultProxy: data using sqlalchemy TypeEngine objects, which are referenced from the originating SQL statement that produced this result set. """ - - def __init__(self, connection, cursor, dialect, result_map=None): + cdef object _dialect + cdef public object _result_map + cdef object _cursor + cdef object _connection + cdef object _rowcount + cdef object _metadata + + def __cinit__(self, connection, cursor, dialect, result_map=None): self._dialect = dialect self._result_map = result_map self._cursor = cursor self._connection = connection self._rowcount = cursor.rowcount self._metadata = None - self._weak = None self._init_metadata() @property @@ -290,10 +279,8 @@ def _init_metadata(self): cursor_description = self.cursor.description if cursor_description is not None: self._metadata = ResultMetaData(self, cursor_description) - self._weak = weakref.ref(self, lambda wr: self.cursor.close()) else: self.close() - self._weak = None @property def returns_rows(self): @@ -333,7 +320,9 @@ def close(self): self.cursor.close() # allow consistent errors self._cursor = None - self._weak = None + + def __del__(self): + self.close() def __aiter__(self): return self @@ -353,13 +342,29 @@ def _non_result(self): else: raise exc.ResourceClosedError("This result object is closed.") - def _process_rows(self, rows): - process_row = RowProxy + cdef list _process_rows(self, list rows): + cdef int i + cdef list results + + results = PyList_New(len(rows)) metadata = self._metadata - keymap = metadata._keymap - processors = metadata._processors - return [process_row(metadata, row, processors, keymap) - for row in rows] + key_to_index = metadata._key_to_index + processors = self._metadata._processors + i = 0 + for row in rows: + for processor, index in processors: + value = processor(PyTuple_GET_ITEM(row, index)) + PyTuple_SET_ITEM(row, index, value) + row_proxy = RowProxy( + metadata, + row, + key_to_index + ) + Py_XINCREF(row_proxy) + check = PyList_SET_ITEM(results, i, row_proxy) + i += 1 + + return results async def fetchall(self): """Fetch all rows, just like DB-API cursor.fetchall().""" diff --git a/setup.py b/setup.py index c4e41fe4..a981e690 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,10 @@ import os import re -from setuptools import setup, find_packages +from setuptools import setup, find_packages, Extension +from Cython.Build import cythonize + +CFLAGS = ['-O3'] install_requires = ['psycopg2-binary>=2.7.0'] extras_require = {'sa': ['sqlalchemy[postgresql_psycopg2binary]>=1.1']} @@ -50,6 +53,14 @@ def read_changelog(path='CHANGES.txt'): 'Framework :: AsyncIO', ] +EXTENSIONS = [ + Extension( + "aiopg.sa.result", + sources=["aiopg/sa/result.pyx"], + extra_compile_args=CFLAGS + ) +] + setup( name='aiopg', version=read_version(), @@ -76,5 +87,6 @@ def read_changelog(path='CHANGES.txt'): packages=find_packages(), install_requires=install_requires, extras_require=extras_require, - include_package_data=True + include_package_data=True, + ext_modules=cythonize(EXTENSIONS) )