Skip to content

Commit

Permalink
Bugfix: Really delete message hashes (#967)
Browse files Browse the repository at this point in the history
  • Loading branch information
squeaky-pl authored Nov 4, 2024
1 parent 0a425da commit bce77be
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
8 changes: 8 additions & 0 deletions inbox/mailsync/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from inbox.models.folder import Folder
from inbox.models.message import MessageCategory
from inbox.models.session import session_scope
from inbox.models.util import delete_message_hashes
from inbox.util.concurrency import retry_with_logging
from inbox.util.debug import bind_context
from inbox.util.itert import chunk
Expand Down Expand Up @@ -90,6 +91,8 @@ def _run_impl(self):
interruptible_threading.sleep(self.message_ttl.total_seconds())

def check(self, current_time):
dangling_sha256s = set()

with session_scope(self.namespace_id) as db_session:
dangling_messages = (
db_session.query(Message)
Expand Down Expand Up @@ -138,6 +141,9 @@ def check(self, current_time):
# Also need to explicitly delete, so that message shows up in
# db_session.deleted.
db_session.delete(message)

dangling_sha256s.add(message.data_sha256)

if not thread.messages:
# We don't eagerly delete empty Threads because there's a
# race condition between deleting a Thread and creating a
Expand All @@ -164,6 +170,8 @@ def check(self, current_time):
# transaction.
db_session.commit()

delete_message_hashes(self.namespace_id, self.account_id, dangling_sha256s)

def gc_deleted_categories(self):
# Delete categories which have been deleted on the backend.
# Go through all the categories and check if there are messages
Expand Down
48 changes: 36 additions & 12 deletions inbox/models/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import math
import time
from collections import OrderedDict
from collections.abc import Iterable
from typing import Optional

import limitlion
Expand Down Expand Up @@ -341,18 +342,7 @@ def _batch_delete(
message_ids = [m[0] for m in messages]
message_hashes = [m[1] for m in messages]

with session_scope(account_id) as db_session:
existing_hashes = list(
db_session.query(Message.data_sha256)
.filter(Message.data_sha256.in_(message_hashes))
.filter(Message.namespace_id != id_)
.distinct()
)
existing_hashes = [h[0] for h in existing_hashes]

remove_hashes = set(message_hashes) - set(existing_hashes)
if dry_run is False:
delete_from_blockstore(*list(remove_hashes))
delete_message_hashes(id_, account_id, message_hashes, dry_run=dry_run)

with session_scope(account_id) as db_session:
message_query = db_session.query(Message).filter(
Expand Down Expand Up @@ -454,3 +444,37 @@ def purge_transactions(
)
except Exception as e:
log.critical("Exception encountered during deletion", exception=e)


def delete_message_hashes(
namespace_id: int,
account_id: int,
message_hashes: Iterable[str],
dry_run: bool = False,
) -> None:
"""
Delete messages from the blockstore.
Args:
namespace_id: The namespace_id of the messages.
account_id: The account_id of the messages.
message_hashes: The data_sha256 hashes of the messages.
dry_run: If True, don't actually delete the data.
"""
if not message_hashes:
return

# First check if the messagea still exists in another namespace
# If they do, we don't want to delete the data.
with session_scope(account_id) as db_session:
existing_hashes = [
data_sha256
for data_sha256, in db_session.query(Message.data_sha256)
.filter(Message.data_sha256.in_(message_hashes))
.filter(Message.namespace_id != namespace_id)
.distinct()
]

remove_hashes = set(message_hashes) - set(existing_hashes)
if dry_run is False:
delete_from_blockstore(*remove_hashes)
13 changes: 10 additions & 3 deletions inbox/util/blockstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from inbox.config import config
from inbox.logging import get_logger
from inbox.util.itert import chunk
from inbox.util.stats import statsd_client

log = get_logger()
Expand Down Expand Up @@ -332,9 +333,15 @@ def _delete_from_s3_bucket(
# Boto pools connections at the class level
bucket = get_s3_bucket(bucket_name)

bucket.delete_objects(
Delete={"Objects": [{"Key": key} for key in data_sha256_hashes], "Quiet": True}
)
# As per https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_objects.html
# we can only delete 1000 objects at a time.
for data_sha256_hashes_chunk in chunk(data_sha256_hashes, 1000):
bucket.delete_objects(
Delete={
"Objects": [{"Key": key} for key in data_sha256_hashes_chunk],
"Quiet": True,
}
)

end = time.time()
latency_millis = (end - start) * 1000
Expand Down

0 comments on commit bce77be

Please sign in to comment.