-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
load testing research #212
Comments
Well i managed to identify several problems There is not connection releasing in aenter section of transaction. async def __aenter__(self):
if not asyncio_current_task(loop=self.loop):
raise RuntimeError("The transaction must run within a task")
await self.db.push_transaction_async()
if self.db.transaction_depth_async() == 1:
await _run_no_result_sql(self.db, 'BEGIN') # there is not try finally section!!
return self If an error happens during executing BEGIN query the connection will be acquired forever. Recreating the pool when have error during cursor creating. async def cursor_async(self):
"""Acquire async cursor.
"""
await self.connect_async(loop=self._loop)
if self.transaction_depth_async() > 0:
conn = self.transaction_conn_async()
else:
conn = None
try:
return (await self._async_conn.cursor(conn=conn))
except:
await self.close_async() # no reason to do this!!
raise There is no reason to kill all connections from pool if one of them is broken. It leads to errors "connection/cursor already closed" After executing of close_async method we can't set _task_data = None async def close_async(self):
"""Close async connection.
"""
if self._async_wait:
await self._async_wait
if self._async_conn:
conn = self._async_conn
self._async_conn = None
self._async_wait = None
self._task_data = None
await conn.close() We have to wait until all connections in _task_data will be released by transactions otherwise we will have acid problem #209 Need to improve pool management#196 asyncio.Lock should help with this No releasing connection in AsyncPostgresqlConnection/AsyncMySQLConnection async def cursor(self, conn=None, *args, **kwargs):
"""Get a cursor for the specified transaction connection
or acquire from the pool.
"""
in_transaction = conn is not None
if not conn:
conn = await self.acquire()
cursor = await conn.cursor(*args, **kwargs) # if we get an error here the connection will be never released!!
cursor.release = functools.partial(
self.release_cursor, cursor,
in_transaction=in_transaction)
return cursor |
Here is I tried to make a new class of database class Transaction:
"""Asynchronous context manager (`async with`), similar to
`peewee.transaction()`. Will start new `asyncio` task for
transaction if not started already.
"""
def __init__(self, db, connection):
self.db = db
self.connection = connection
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
async with self.connection.cursor() as cursor:
if exc_type:
await cursor.execute("ROLLBACK")
else:
try:
await cursor.execute("COMMIT")
except:
await cursor.execute("ROLLBACK")
raise
finally:
self.db.release_connection(self.connection)
def get_task_id():
task = asyncio_current_task()
if task:
return id(task)
else:
raise Exception("no current_task")
class PostgresqlDatabaseExperimental(peewee.PostgresqlDatabase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._pool = None
self._async_lock = asyncio.Lock()
self._connections_in_transaction = {}
self._allow_sync = False
def set_allow_sync(self, value):
"""Allow or forbid sync queries for the database. See also
the :meth:`.allow_sync()` context manager.
"""
self._allow_sync = value
@contextlib.contextmanager
def allow_sync(self):
"""Allow sync queries within context. Close sync
connection on exit if connected.
Example::
with database.allow_sync():
PageBlock.create_table(True)
"""
old_allow_sync = self._allow_sync
self._allow_sync = True
try:
yield
except:
raise
finally:
self._allow_sync = old_allow_sync
try:
self.close()
except self.Error:
pass # already closed
def execute_sql(self, *args, **kwargs):
"""Sync execute SQL query, `allow_sync` must be set to True.
"""
assert self._allow_sync, (
"Error, sync query is not allowed! Call the `.set_allow_sync()` "
"or use the `.allow_sync()` context manager.")
if self._allow_sync in (logging.ERROR, logging.WARNING):
logging.log(self._allow_sync,
"Error, sync query is not allowed: %s %s" %
(str(args), str(kwargs)))
return super().execute_sql(*args, **kwargs)
def release_connection(self, connection):
if self._pool:
self._pool.release(connection)
task_id = get_task_id()
if task_id in self._connections_in_transaction:
del self._connections_in_transaction[task_id]
async def get_pool(self):
async with self._async_lock:
if self._pool is None:
self._pool = await aiopg.create_pool(database=self.database, **self.connect_params)
return self._pool
async def close_async(self):
"""Terminate all pool connections.
"""
async with self._async_lock:
if self._pool is not None:
self._pool.terminate()
await self._pool.wait_closed()
self._pool = None
async def fetch_results(self, cursor, query):
with peewee.__exception_wrapper__:
await cursor.execute(query.sql())
if isinstance(query, (peewee.Select, peewee.ModelCompoundSelectQuery)):
result = AsyncQueryWrapper(cursor=cursor, query=query)
return await result.fetchall()
if isinstance(query, peewee.Update):
if query._returning:
result = AsyncQueryWrapper(cursor=cursor, query=query)
return await result.fetchall()
return cursor.rowcount
def get_connection_in_transaction(self):
task_id = get_task_id()
if task_id in self._connections_in_transaction:
return self._connections_in_transaction[task_id]
async def async_get_connection(self):
connection = self.get_connection_in_transaction()
if connection:
return connection
pool = await self.get_pool()
return await pool.acquire()
def is_task_in_transaction(self):
task_id = get_task_id()
if task_id in self._connections_in_transaction:
return True
return False
async def transaction(self):
if self.is_task_in_transaction():
raise Exception("already in transaction")
connection = await self.async_get_connection()
try:
async with connection.cursor() as cursor:
await cursor.execute("BEGIN")
except:
self.release_connection(connection)
raise
return Transaction(self, connection)
async def async_execute(self, query):
connection = await self.async_get_connection()
try:
async with connection.cursor() as cursor:
return await self.fetch_results(cursor, query)
finally:
if not self.is_task_in_transaction():
self.release_connection(connection) But i realized that it will be a lot of changes and it is hard to make them as one iteration so i decided to save the changes here and refactor the code by small parts |
fix is merged but need additional testing |
Using patch for network immitation error and runing yandex-tank i have got errors:
'NoneType' object has no attribute 'cursor' #196
It is because of unproperly work of future async_wait As we can see in cursor async function we run connect_async firstly which should garantee that we have conn(connection). No transactions have used in the test. But conn is None because of race condition
Connection closed
cursor already closed
Connection closed and cursor already closed raises because pool has been closed in a task but other tasks does not know about it and still try to use dead connections.
The text was updated successfully, but these errors were encountered: