Skip to content

Commit

Permalink
Upload multiple filters
Browse files Browse the repository at this point in the history
Remove unecessary and redundant code

Fix ordering of cache/stash + increase validity of tests

Upload multiple filters

More logs + correct handling of attachment_type

Verify cron passes correct args to task

TMP: Ignore soft blocks

Add waffle switch

Fix invalid class reference

Update to correct waffle switch

Update to fix the test

reafactoring
  • Loading branch information
KevinMind committed Nov 13, 2024
1 parent ce66a9f commit 227f07e
Show file tree
Hide file tree
Showing 11 changed files with 509 additions and 214 deletions.
115 changes: 59 additions & 56 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import List

import waffle
from django_statsd.clients import statsd
Expand All @@ -13,7 +14,7 @@

from .mlbf import MLBF
from .models import Block, BlocklistSubmission, BlockType
from .tasks import cleanup_old_files, process_blocklistsubmission, upload_filter
from .tasks import process_blocklistsubmission, upload_filter
from .utils import datetime_to_ts


Expand All @@ -28,8 +29,8 @@ def get_last_generation_time():
return get_config(MLBF_TIME_CONFIG_KEY, None, json_value=True)


def get_base_generation_time():
return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True)
def get_base_generation_time(block_type: BlockType):
return get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), None, json_value=True)


def get_blocklist_last_modified_time():
Expand Down Expand Up @@ -64,76 +65,78 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
# An add-on version/file from after this time can't be reliably asserted -
# there may be false positives or false negatives.
# https://github.com/mozilla/addons-server/issues/13695
generation_time = get_generation_time()
# This timestamp represents the last time the MLBF was generated and uploaded.
# It could have been a base filter or a stash.
last_generation_time = get_last_generation_time()
# This timestamp represents the point in time when
# the base filter was generated and uploaded.
base_generation_time = get_base_generation_time()

mlbf = MLBF.generate_from_db(generation_time)

base_filter = (
MLBF.load_from_storage(base_generation_time)
if base_generation_time is not None
else None
mlbf = MLBF.generate_from_db(get_generation_time())
previous_filter = MLBF.load_from_storage(
# This timestamp represents the last time the MLBF was generated and uploaded.
# It could have been a base filter or a stash.
get_last_generation_time()
)

previous_filter = (
# Only load previoous filter if there is a timestamp to use
# and that timestamp is not the same as the base_filter
MLBF.load_from_storage(last_generation_time)
if last_generation_time is not None
and (base_filter is None or base_filter.created_at != last_generation_time)
else base_filter
)
base_filters_to_update: List[BlockType] = []
update_stash = False

# Determine which base filters need to be re uploaded
# and whether the stash needs to be updated
for block_type in BlockType:
# This prevents us from updating a stash or filter based on new soft blocks
# until we are ready to enable soft blocking.
if block_type == BlockType.SOFT_BLOCKED and not waffle.switch_is_active(
'enable-soft-blocking'
):
continue

base_filter = MLBF.load_from_storage(get_base_generation_time(block_type))

should_update_filter = (
force_base
or base_filter is None
or mlbf.blocks_changed_since_previous(block_type, base_filter)
> BASE_REPLACE_THRESHOLD
)

changes_count = mlbf.blocks_changed_since_previous(
BlockType.BLOCKED, previous_filter
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_changed', changes_count
)
need_update = (
force_base
or base_filter is None
or (
previous_filter is not None
and previous_filter.created_at < get_blocklist_last_modified_time()
should_update_stash = (
mlbf.blocks_changed_since_previous(
block_type, previous_filter or base_filter
)
> 0
)
or changes_count > 0
)
if not need_update:

# add this block type to the list of filters to be re-uploaded
if should_update_filter:
base_filters_to_update.append(block_type)
# only update the stash if we should AND if
# we aren't already reuploading the filter for this block type
elif should_update_stash:
update_stash = True

skip_update = len(base_filters_to_update) == 0 and not update_stash
if skip_update:
log.info('No new/modified/deleted Blocks in database; skipping MLBF generation')
return
return mlbf.delete()

statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_count',
len(mlbf.data.blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.soft_blocked_count',
len(mlbf.data.soft_blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.not_blocked_count',
len(mlbf.data.not_blocked_items),
)

make_base_filter = (
force_base
or base_filter is None
or previous_filter is None
or mlbf.blocks_changed_since_previous(BlockType.BLOCKED, base_filter)
> BASE_REPLACE_THRESHOLD
)

if make_base_filter:
mlbf.generate_and_write_filter()
else:
mlbf.generate_and_write_stash(previous_filter)
mlbf.generate_and_write_stash(previous_filter) if update_stash else None

upload_filter.delay(generation_time, is_base=make_base_filter)
for block_type in base_filters_to_update:
mlbf.generate_and_write_filter(block_type)

if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)
upload_filter.delay(
mlbf.created_at,
filter_list=[key.name for key in base_filters_to_update],
upload_stash=update_stash,
)


def process_blocklistsubmissions():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import olympia.core.logger
from olympia.blocklist.mlbf import MLBF
from olympia.blocklist.models import BlockType


log = olympia.core.logger.getLogger('z.amo.blocklist')
Expand All @@ -29,6 +30,11 @@ def add_arguments(self, parser):
'the database',
default=None,
)
parser.add_argument(
'--block-type',
help='Block type to export',
default=None,
)

def load_json(self, json_path):
with open(json_path) as json_file:
Expand All @@ -38,6 +44,7 @@ def load_json(self, json_path):
def handle(self, *args, **options):
log.debug('Exporting blocklist to file')
mlbf = MLBF.generate_from_db(options.get('id'))
block_type = BlockType[options.get('block_type')]

if options.get('block_guids_input'):
mlbf.blocked_items = list(
Expand All @@ -52,4 +59,4 @@ def handle(self, *args, **options):
)
)

mlbf.generate_and_write_filter()
mlbf.generate_and_write_filter(block_type)
33 changes: 24 additions & 9 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class BaseMLBFLoader:
def __init__(self, storage: SafeStorage):
self.storage = storage

def data_type_key(self, key: MLBFDataType) -> str:
@classmethod
def data_type_key(cls, key: MLBFDataType) -> str:
return key.name.lower()

@cached_property
Expand Down Expand Up @@ -208,32 +209,46 @@ def hash_filter_inputs(cls, input_list):
for (guid, version) in input_list
]

@property
def filter_path(self):
return self.storage.path('filter')
def filter_path(self, block_type: BlockType):
return self.storage.path(f'filter-{BaseMLBFLoader.data_type_key(block_type)}')

@property
def stash_path(self):
return self.storage.path('stash.json')

def generate_and_write_filter(self):
def delete(self):
if self.storage.exists(self.storage.base_location):
self.storage.rm_stored_dir(self.storage.base_location)
log.info(f'Deleted {self.storage.base_location}')

def generate_and_write_filter(self, block_type: BlockType):
stats = {}

blocked = self.data[block_type]

not_blocked = [
self.data[_block_type]
for _block_type in BlockType
if _block_type != block_type
]

bloomfilter = generate_mlbf(
stats=stats,
blocked=self.data.blocked_items,
not_blocked=self.data.not_blocked_items,
blocked=blocked,
not_blocked=not_blocked,
)

# write bloomfilter
mlbf_path = self.filter_path
mlbf_path = self.filter_path(block_type)
with self.storage.open(mlbf_path, 'wb') as filter_file:
log.info(f'Writing to file {mlbf_path}')
bloomfilter.tofile(filter_file)
stats['mlbf_filesize'] = os.stat(mlbf_path).st_size

log.info(json.dumps(stats))

return bloomfilter

def generate_diffs(
self, previous_mlbf: 'MLBF' = None
) -> Dict[BlockType, Tuple[List[str], List[str], int]]:
Expand Down Expand Up @@ -277,7 +292,7 @@ def generate_and_write_stash(self, previous_mlbf: 'MLBF' = None):
'unblocked': blocked_removed,
}

if waffle.switch_is_active('mlbf-soft-blocks-enabled'):
if waffle.switch_is_active('enable-soft-blocking'):
soft_blocked_added, soft_blocked_removed, _ = diffs[BlockType.SOFT_BLOCKED]
stash_json['softblocked'] = soft_blocked_added
stash_json['unblocked'] = [
Expand Down
80 changes: 60 additions & 20 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
from datetime import datetime, timedelta
from typing import List

from django.conf import settings
from django.contrib.admin.models import CHANGE, LogEntry
Expand All @@ -21,10 +22,10 @@
REMOTE_SETTINGS_COLLECTION_MLBF,
)
from olympia.lib.remote_settings import RemoteSettings
from olympia.zadmin.models import set_config
from olympia.zadmin.models import get_config, set_config

from .mlbf import MLBF
from .models import BlocklistSubmission
from .models import BlocklistSubmission, BlockType
from .utils import (
datetime_to_ts,
)
Expand All @@ -35,7 +36,15 @@
bracket_open_regex = re.compile(r'(?<!\\){')
bracket_close_regex = re.compile(r'(?<!\\)}')

BLOCKLIST_RECORD_MLBF_BASE = 'bloomfilter-base'

def BLOCKLIST_RECORD_MLBF_BASE(block_type: BlockType):
match block_type:
case BlockType.SOFT_BLOCKED:
return 'softblocks-bloomfilter-base'
case BlockType.BLOCKED:
return 'blocked-bloomfilter-base'
case _:
raise ValueError(f'Unknown block type: {block_type}')


@task
Expand Down Expand Up @@ -88,28 +97,57 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, is_base=True):
def upload_filter(generation_time, filter_list=None, upload_stash=False):
# We cannot send enum values to tasks so we serialize them as strings
# and deserialize them here back to the enum values.
filter_list: List[BlockType] = (
[] if filter_list is None else [BlockType[filter] for filter in filter_list]
)
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False
)
mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True)
is_base = len(filter_list) > 0
oldest_base_filter_id = None

if is_base:
# clear the collection for the base - we want to be the only filter
server.delete_all_records()
statsd.incr('blocklist.tasks.upload_filter.reset_collection')
# Then the bloomfilter
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE,
}
with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
for block_type in filter_list:
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE(block_type),
}
with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
base_filter_id = get_config(
MLBF_BASE_ID_CONFIG_KEY(block_type), json_value=True
)
if base_filter_id is not None:
if oldest_base_filter_id is None:
oldest_base_filter_id = base_filter_id
else:
oldest_base_filter_id = min(oldest_base_filter_id, base_filter_id)

statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')
else:

if oldest_base_filter_id is not None:
for record in server.records():
record_time = (
record['stash_time']
if 'stash_time' in record
else record['generation_time']
)
if record_time < oldest_base_filter_id:
server.delete_record(record['id'])

cleanup_old_files.delay(base_filter_id=oldest_base_filter_id)
statsd.incr('blocklist.tasks.upload_filter.reset_collection')

# It is possible to upload a stash and a filter in the same task
if upload_stash:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
Expand All @@ -123,8 +161,10 @@ def upload_filter(generation_time, is_base=True):

server.complete_session()
set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True)
if is_base:
set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True)
for block_type in filter_list:
set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True
)


@task
Expand Down
Loading

0 comments on commit 227f07e

Please sign in to comment.