Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix rare bug that broke looping calls (#16210)
Browse files Browse the repository at this point in the history
* Fix rare bug that broke looping calls

We can't interact with the reactor from the main thread via looping
call.

Introduced in v1.90.0 / #15791.

* Newsfile
  • Loading branch information
erikjohnston authored Aug 30, 2023
1 parent 05d8245 commit a2e0d4c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
1 change: 1 addition & 0 deletions changelog.d/16210.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix rare bug that broke looping calls, which could lead to e.g. linearly increasing memory usage. Introduced in v1.90.0.
36 changes: 22 additions & 14 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary

from twisted.internet.interfaces import IReactorCore
from twisted.internet.task import LoopingCall

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
Expand All @@ -26,6 +26,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -358,7 +359,7 @@ class Lock:

def __init__(
self,
reactor: IReactorCore,
reactor: ISynapseReactor,
clock: Clock,
store: LockStore,
read_write: bool,
Expand All @@ -377,19 +378,25 @@ def __init__(

self._table = "worker_read_write_locks" if read_write else "worker_locks"

self._looping_call = clock.looping_call(
# We might be called from a non-main thread, so we defer setting up the
# looping call.
self._looping_call: Optional[LoopingCall] = None
reactor.callFromThread(self._setup_looping_call)

self._dropped = False

def _setup_looping_call(self) -> None:
self._looping_call = self._clock.looping_call(
self._renew,
_RENEWAL_INTERVAL_MS,
store,
clock,
read_write,
lock_name,
lock_key,
token,
self._store,
self._clock,
self._read_write,
self._lock_name,
self._lock_key,
self._token,
)

self._dropped = False

@staticmethod
@wrap_as_background_process("Lock._renew")
async def _renew(
Expand Down Expand Up @@ -459,7 +466,7 @@ async def release(self) -> None:
if self._dropped:
return

if self._looping_call.running:
if self._looping_call and self._looping_call.running:
self._looping_call.stop()

await self._store.db_pool.simple_delete(
Expand All @@ -486,8 +493,9 @@ def __del__(self) -> None:
# We should not be dropped without the lock being released (unless
# we're shutting down), but if we are then let's at least stop
# renewing the lock.
if self._looping_call.running:
self._looping_call.stop()
if self._looping_call and self._looping_call.running:
# We might be called from a non-main thread.
self._reactor.callFromThread(self._looping_call.stop)

if self._reactor.running:
logger.error(
Expand Down
2 changes: 2 additions & 0 deletions tests/storage/databases/main/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def test_timeout_lock(self) -> None:

# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
assert lock._looping_call
lock._looping_call.stop()

# Wait for the lock to timeout.
Expand Down Expand Up @@ -403,6 +404,7 @@ def test_timeout_lock(self) -> None:

# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
assert lock._looping_call
lock._looping_call.stop()

# Wait for the lock to timeout.
Expand Down

0 comments on commit a2e0d4c

Please sign in to comment.