diff --git a/changelog.d/16061.misc b/changelog.d/16061.misc new file mode 100644 index 000000000000..37928b670f1e --- /dev/null +++ b/changelog.d/16061.misc @@ -0,0 +1 @@ +Fix database performance of read/write worker locks. diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 1680bf6168f9..54d40e7a3ab0 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -26,7 +26,6 @@ LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.engines import PostgresEngine from synapse.util import Clock from synapse.util.stringutils import random_string @@ -96,6 +95,10 @@ def __init__( self._acquiring_locks: Set[Tuple[str, str]] = set() + self._clock.looping_call( + self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0 + ) + @wrap_as_background_process("LockStore._on_shutdown") async def _on_shutdown(self) -> None: """Called when the server is shutting down""" @@ -216,6 +219,7 @@ async def try_acquire_read_write_lock( lock_name, lock_key, write, + db_autocommit=True, ) except self.database_engine.module.IntegrityError: return None @@ -233,61 +237,22 @@ def _try_acquire_read_write_lock_txn( # `worker_read_write_locks` and seeing if that fails any # constraints. If it doesn't then we have acquired the lock, # otherwise we haven't. - # - # Before that though we clear the table of any stale locks. now = self._clock.time_msec() token = random_string(6) - delete_sql = """ - DELETE FROM worker_read_write_locks - WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; - """ - - insert_sql = """ - INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?, ?) - """ - - if isinstance(self.database_engine, PostgresEngine): - # For Postgres we can send these queries at the same time. - txn.execute( - delete_sql + ";" + insert_sql, - ( - # DELETE args - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - # UPSERT args - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) - else: - # For SQLite these need to be two queries. - txn.execute( - delete_sql, - ( - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - ), - ) - txn.execute( - insert_sql, - ( - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) + self.db_pool.simple_insert_txn( + txn, + table="worker_read_write_locks", + values={ + "lock_name": lock_name, + "lock_key": lock_key, + "write_lock": write, + "instance_name": self._instance_name, + "token": token, + "last_renewed_ts": now, + }, + ) lock = Lock( self._reactor, @@ -351,6 +316,24 @@ def _try_acquire_multi_read_write_lock_txn( return locks + @wrap_as_background_process("_reap_stale_read_write_locks") + async def _reap_stale_read_write_locks(self) -> None: + delete_sql = """ + DELETE FROM worker_read_write_locks + WHERE last_renewed_ts < ? + """ + + def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None: + txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,)) + if txn.rowcount: + logger.info("Reaped %d stale locks", txn.rowcount) + + await self.db_pool.runInteraction( + "_reap_stale_read_write_locks", + reap_stale_read_write_locks_txn, + db_autocommit=True, + ) + class Lock: """An async context manager that manages an acquired lock, ensuring it is diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 383da83dfb2a..f541f1d6be1e 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -12,13 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. + from twisted.internet import defer, reactor from twisted.internet.base import ReactorBase from twisted.internet.defer import Deferred from twisted.test.proto_helpers import MemoryReactor from synapse.server import HomeServer -from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS +from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS from synapse.util import Clock from tests import unittest @@ -380,8 +381,8 @@ def test_maintain_lock(self) -> None: self.get_success(lock.__aenter__()) # Wait for ages with the lock, we should not be able to get the lock. - self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000) - self.pump() + for _ in range(0, 10): + self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000)) lock2 = self.get_success( self.store.try_acquire_read_write_lock("name", "key", write=True)