Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added extras execute_batch for executemany #632

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions aiopg/extras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
def _paginate(seq, page_size):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import itertools 


def _paginate(seq, page_size):
    it = iter(seq)
    page = list(itertools.islice(it, 0, page_size))
    while page:
        yield page
        page = list(itertools.islice(it, 0, page_size))

I think it is better solution

Copy link

@andrey-berenda andrey-berenda Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this solution you can use iterators in _paginate
for example
_paginate(range(1, 8), 3)

"""Consume an iterable and return it in chunks.

Every chunk is at most `page_size`. Never return an empty chunk.
"""
page = []
count = len(seq)
it = iter(seq)
for s in range(count + 1):
try:
for i in range(page_size):
page.append(next(it))
yield page
page = []
except StopIteration:
if page:
yield page
return


async def execute_batch(cur, sql, argslist, page_size=100):
r"""Execute groups of statements in fewer server roundtrips.

Execute *sql* several times, against all parameters set (sequences or
mappings) found in *argslist*.

The function is semantically similar to

.. parsed-literal::

*cur*\.\ `~cursor.executemany`\ (\ *sql*\ , *argslist*\ )

but has a different implementation: Psycopg will join the statements into
fewer multi-statement commands, each one containing at most *page_size*
statements, resulting in a reduced number of server roundtrips.

After the execution of the function the `cursor.rowcount` property will
**not** contain a total result.

"""
for page in _paginate(argslist, page_size=page_size):
sqls = [cur.mogrify(sql, args) for args in page]
await cur.execute(b";".join(sqls))
54 changes: 54 additions & 0 deletions tests/test_extras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest

from aiopg.extras import _paginate, execute_batch


@pytest.fixture
def connect(make_connection):
async def go(**kwargs):
conn = await make_connection(**kwargs)
async with conn.cursor() as cur:
await cur.execute("DROP TABLE IF EXISTS tbl_extras")
await cur.execute("CREATE TABLE tbl_extras (id int)")
return conn

return go


@pytest.fixture
def cursor(connect, loop):
async def go():
return await (await connect()).cursor()

cur = loop.run_until_complete(go())
yield cur
cur.close()


def test__paginate():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pytest.mark.parametrize(
    'data, expected_pages',
    (
            (
                    (1, 2, 3, 4, 5, 6, 7), [
                        [1, 2, 3],
                        [4, 5, 6],
                        [7],
                    ]
            ),
            (
                    (1, 2, 3, 4, 5, 6), [
                        [1, 2, 3],
                        [4, 5, 6],
                    ]
            ),
    )
)
def test__paginate(data, expected_pages):
    for page, expected_page in itertools.zip_longest(
            _paginate(data, page_size=3),
            expected_pages,
    ):
        assert page == expected_page

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

data = [
[1, 2, 3],
[4, 5, 6],
[7],
]
for index, val in enumerate(_paginate((1, 2, 3, 4, 5, 6, 7), page_size=3)):
assert data[index] == list(val)


def test__paginate_even():
data = [
[1, 2, 3],
[4, 5, 6],
]
for index, val in enumerate(_paginate((1, 2, 3, 4, 5, 6), page_size=3)):
assert data[index] == list(val)


async def test_execute_batch(cursor):
args = [(1,), (2,), (3,), (4,)]
sql = 'insert into tbl_extras values(%s)'
await execute_batch(cursor, sql, argslist=args, page_size=3)

await cursor.execute('SELECT * from tbl_extras')
ret = await cursor.fetchall()
assert list(ret) == args