diff --git a/changelog.d/16112.misc b/changelog.d/16112.misc new file mode 100644 index 000000000000..05a58c1348a8 --- /dev/null +++ b/changelog.d/16112.misc @@ -0,0 +1 @@ +Rename pagination and purge locks and add comments to explain why they exist and how they work. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a90d99c4d676..f9915e5a3f05 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -63,7 +63,7 @@ ) from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME +from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( make_deferred_yieldable, @@ -1245,7 +1245,7 @@ async def _process_incoming_pdus_in_room_inner( # while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME` # lock. async with self._worker_lock_handler.acquire_read_write_lock( - DELETE_ROOM_LOCK_NAME, room_id, write=False + NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): await self._federation_event_handler.on_receive_pdu( origin, event diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d485f21e49f1..a74db1dccffa 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -53,7 +53,7 @@ from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler -from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME +from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -1034,7 +1034,7 @@ async def create_and_send_nonmember_event( ) async with self._worker_lock_handler.acquire_read_write_lock( - DELETE_ROOM_LOCK_NAME, room_id, write=False + NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): return await self._create_and_send_nonmember_event_locked( requester=requester, @@ -1978,7 +1978,7 @@ async def _send_dummy_events_to_fill_extremities(self) -> None: for room_id in room_ids: async with self._worker_lock_handler.acquire_read_write_lock( - DELETE_ROOM_LOCK_NAME, room_id, write=False + NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): dummy_event_sent = await self._send_dummy_event_for_room(room_id) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index da3465847092..1be6ebc6d92e 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -24,6 +24,7 @@ from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig from synapse.handlers.room import ShutdownRoomResponse +from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin @@ -46,9 +47,10 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 -PURGE_HISTORY_LOCK_NAME = "purge_history_lock" - -DELETE_ROOM_LOCK_NAME = "delete_room_lock" +# This is used to avoid purging a room several time at the same moment, +# and also paginating during a purge. Pagination can trigger backfill, +# which would create old events locally, and would potentially clash with the room delete. +PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock" @attr.s(slots=True, auto_attribs=True) @@ -363,7 +365,7 @@ async def _purge_history( self._purges_in_progress_by_room.add(room_id) try: async with self._worker_locks.acquire_read_write_lock( - PURGE_HISTORY_LOCK_NAME, room_id, write=True + PURGE_PAGINATION_LOCK_NAME, room_id, write=True ): await self._storage_controllers.purge_events.purge_history( room_id, token, delete_local_events @@ -421,7 +423,10 @@ async def purge_room(self, room_id: str, force: bool = False) -> None: force: set true to skip checking for joined users. """ async with self._worker_locks.acquire_multi_read_write_lock( - [(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)], + [ + (PURGE_PAGINATION_LOCK_NAME, room_id), + (NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id), + ], write=True, ): # first check that we have no users in this room @@ -483,7 +488,7 @@ async def get_messages( room_token = from_token.room_key async with self._worker_locks.acquire_read_write_lock( - PURGE_HISTORY_LOCK_NAME, room_id, write=False + PURGE_PAGINATION_LOCK_NAME, room_id, write=False ): (membership, member_event_id) = (None, None) if not use_admin_priviledge: @@ -761,7 +766,7 @@ async def _shutdown_and_purge_room( self._purges_in_progress_by_room.add(room_id) try: async with self._worker_locks.acquire_read_write_lock( - PURGE_HISTORY_LOCK_NAME, room_id, write=True + PURGE_PAGINATION_LOCK_NAME, room_id, write=True ): self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN self._delete_by_id[ diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e3cdf2bc6169..1d8d4a72e7a2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -39,7 +39,7 @@ from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler -from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME +from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging import opentracing from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process @@ -621,7 +621,7 @@ async def update_membership( async with self.member_as_limiter.queue(as_id): async with self.member_linearizer.queue(key): async with self._worker_lock_handler.acquire_read_write_lock( - DELETE_ROOM_LOCK_NAME, room_id, write=False + NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): with opentracing.start_active_span("update_membership_locked"): result = await self.update_membership_locked( diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 72df773a8623..58efe7116b70 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -42,7 +42,11 @@ from synapse.server import HomeServer -DELETE_ROOM_LOCK_NAME = "delete_room_lock" +# This lock is used to avoid creating an event while we are purging the room. +# We take a read lock when creating an event, and a write one when purging a room. +# This is because it is fine to create several events concurrently, since referenced events +# will not disappear under our feet as long as we don't delete the room. +NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock" class WorkerLocksHandler: diff --git a/synapse/rest/client/room_upgrade_rest_servlet.py b/synapse/rest/client/room_upgrade_rest_servlet.py index 4a5d9e13e736..b1f6b5d1b7ec 100644 --- a/synapse/rest/client/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/room_upgrade_rest_servlet.py @@ -17,7 +17,7 @@ from synapse.api.errors import Codes, ShadowBanError, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME +from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -81,7 +81,7 @@ async def on_POST( try: async with self._worker_lock_handler.acquire_read_write_lock( - DELETE_ROOM_LOCK_NAME, room_id, write=False + NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): new_room_id = await self._room_creation_handler.upgrade_room( requester, room_id, new_version diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 35cd1089d6c9..abd1d149dbc5 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -45,7 +45,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME +from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( SynapseTags, @@ -357,7 +357,7 @@ async def _process_event_persist_queue_task( # it. We might already have taken out the lock, but since this is just a # "read" lock its inherently reentrant. async with self.hs.get_worker_locks_handler().acquire_read_write_lock( - DELETE_ROOM_LOCK_NAME, room_id, write=False + NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False ): if isinstance(task, _PersistEventsTask): return await self._persist_event_batch(room_id, task)