From 1c5160b50fdf09174b6aff7056568c00a33ed9ba Mon Sep 17 00:00:00 2001 From: Kevin Meinhardt Date: Thu, 7 Nov 2024 12:14:19 +0100 Subject: [PATCH] Upload multiple filters --- src/olympia/blocklist/cron.py | 89 +++++++++++-------- .../management/commands/export_blocklist.py | 9 +- src/olympia/blocklist/mlbf.py | 21 +++-- src/olympia/blocklist/tasks.py | 58 +++++++----- src/olympia/blocklist/tests/test_commands.py | 9 +- src/olympia/blocklist/tests/test_cron.py | 13 +-- src/olympia/constants/blocklist.py | 9 +- 7 files changed, 135 insertions(+), 73 deletions(-) diff --git a/src/olympia/blocklist/cron.py b/src/olympia/blocklist/cron.py index 29399e83c68f..a77ab802ea7d 100644 --- a/src/olympia/blocklist/cron.py +++ b/src/olympia/blocklist/cron.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import List import waffle from django_statsd.clients import statsd @@ -28,8 +29,8 @@ def get_last_generation_time(): return get_config(MLBF_TIME_CONFIG_KEY, None, json_value=True) -def get_base_generation_time(): - return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True) +def get_base_generation_time(block_type: BlockType): + return get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), None, json_value=True) def get_blocklist_last_modified_time(): @@ -68,25 +69,11 @@ def _upload_mlbf_to_remote_settings(*, force_base=False): # This timestamp represents the last time the MLBF was generated and uploaded. # It could have been a base filter or a stash. last_generation_time = get_last_generation_time() - # This timestamp represents the point in time when - # the base filter was generated and uploaded. - base_generation_time = get_base_generation_time() - mlbf = MLBF.generate_from_db(generation_time) - - base_filter = ( - MLBF.load_from_storage(base_generation_time) - if base_generation_time is not None - else None - ) - previous_filter = ( - # Only load previoous filter if there is a timestamp to use - # and that timestamp is not the same as the base_filter MLBF.load_from_storage(last_generation_time) if last_generation_time is not None - and (base_filter is None or base_filter.created_at != last_generation_time) - else base_filter + else None ) changes_count = mlbf.blocks_changed_since_previous( @@ -95,15 +82,40 @@ def _upload_mlbf_to_remote_settings(*, force_base=False): statsd.incr( 'blocklist.cron.upload_mlbf_to_remote_settings.blocked_changed', changes_count ) - need_update = ( - force_base - or base_filter is None - or ( - previous_filter is not None - and previous_filter.created_at < get_blocklist_last_modified_time() + + base_filters_to_update: List[BlockType] = [] + + # Determine which base filters need to be re uploaded + for block_type in BlockType: + if force_base: + base_filters_to_update.append(block_type) + continue + + base_generation_time = get_base_generation_time(block_type) + base_filter = ( + MLBF.load_from_storage(base_generation_time) + if base_generation_time is not None + else None ) + + if ( + base_filter is None + or mlbf.blocks_changed_since_previous(block_type, base_filter) + > BASE_REPLACE_THRESHOLD + ): + base_filters_to_update.append(block_type) + + previous_filter_is_stale = ( + previous_filter is not None + and previous_filter.created_at < get_blocklist_last_modified_time() + ) + + need_update = ( + len(base_filters_to_update) > 0 or changes_count > 0 + or previous_filter_is_stale ) + if not need_update: log.info('No new/modified/deleted Blocks in database; skipping MLBF generation') return @@ -112,29 +124,34 @@ def _upload_mlbf_to_remote_settings(*, force_base=False): 'blocklist.cron.upload_mlbf_to_remote_settings.blocked_count', len(mlbf.data.blocked_items), ) + statsd.incr( + 'blocklist.cron.upload_mlbf_to_remote_settings.soft_blocked_count', + len(mlbf.data.soft_blocked_items), + ) statsd.incr( 'blocklist.cron.upload_mlbf_to_remote_settings.not_blocked_count', len(mlbf.data.not_blocked_items), ) - make_base_filter = ( - force_base - or base_filter is None - or previous_filter is None - or mlbf.blocks_changed_since_previous(BlockType.BLOCKED, base_filter) - > BASE_REPLACE_THRESHOLD + update_stash = ( + len(base_filters_to_update) < len(BlockType.choices) + and changes_count > 0 ) - if make_base_filter: - mlbf.generate_and_write_filter() - else: - mlbf.generate_and_write_stash(previous_filter) + breakpoint() - upload_filter.delay(generation_time, is_base=make_base_filter) + mlbf.generate_and_write_stash(previous_filter) if update_stash else None - if base_filter: - cleanup_old_files.delay(base_filter_id=base_filter.created_at) + for block_type in base_filters_to_update: + mlbf.generate_and_write_filter(block_type) + + upload_filter.delay( + generation_time, + filters_to_update=[key.name for key in base_filters_to_update], + update_stash=update_stash, + ) + cleanup_old_files.delay(generation_time) def process_blocklistsubmissions(): qs = BlocklistSubmission.objects.filter( diff --git a/src/olympia/blocklist/management/commands/export_blocklist.py b/src/olympia/blocklist/management/commands/export_blocklist.py index 1846ac92246f..2382d1b4b044 100644 --- a/src/olympia/blocklist/management/commands/export_blocklist.py +++ b/src/olympia/blocklist/management/commands/export_blocklist.py @@ -2,6 +2,7 @@ from django.core.management.base import BaseCommand +from olympia.blocklist.models import BlockType import olympia.core.logger from olympia.blocklist.mlbf import MLBF @@ -29,6 +30,11 @@ def add_arguments(self, parser): 'the database', default=None, ) + parser.add_argument( + '--block-type', + help='Block type to export', + default=None, + ) def load_json(self, json_path): with open(json_path) as json_file: @@ -38,6 +44,7 @@ def load_json(self, json_path): def handle(self, *args, **options): log.debug('Exporting blocklist to file') mlbf = MLBF.generate_from_db(options.get('id')) + block_type = BlockType[options.get('block_type')] if options.get('block_guids_input'): mlbf.blocked_items = list( @@ -52,4 +59,4 @@ def handle(self, *args, **options): ) ) - mlbf.generate_and_write_filter() + mlbf.generate_and_write_filter(block_type) diff --git a/src/olympia/blocklist/mlbf.py b/src/olympia/blocklist/mlbf.py index 9e75b71ff5a3..dcb67865e4d6 100644 --- a/src/olympia/blocklist/mlbf.py +++ b/src/olympia/blocklist/mlbf.py @@ -202,25 +202,32 @@ def hash_filter_inputs(cls, input_list): for (guid, version) in input_list ] - @property - def filter_path(self): - return self.storage.path('filter') + def filter_path(self, block_type: BlockType): + return self.storage.path(f'filter-{block_type.name}') @property def stash_path(self): return self.storage.path('stash.json') - def generate_and_write_filter(self): + def generate_and_write_filter(self, block_type: BlockType): stats = {} + blocked = self.data[block_type] + + not_blocked = [ + self.data[_block_type] + for _block_type in BlockType + if _block_type != block_type + ] + bloomfilter = generate_mlbf( stats=stats, - blocked=self.data.blocked_items, - not_blocked=self.data.not_blocked_items, + blocked=blocked, + not_blocked=not_blocked, ) # write bloomfilter - mlbf_path = self.filter_path + mlbf_path = self.filter_path(block_type) with self.storage.open(mlbf_path, 'wb') as filter_file: log.info(f'Writing to file {mlbf_path}') bloomfilter.tofile(filter_file) diff --git a/src/olympia/blocklist/tasks.py b/src/olympia/blocklist/tasks.py index 713f3f2332d6..900694f06d24 100644 --- a/src/olympia/blocklist/tasks.py +++ b/src/olympia/blocklist/tasks.py @@ -2,6 +2,7 @@ import os import re from datetime import datetime, timedelta +from typing import List from django.conf import settings from django.contrib.admin.models import CHANGE, LogEntry @@ -21,10 +22,10 @@ REMOTE_SETTINGS_COLLECTION_MLBF, ) from olympia.lib.remote_settings import RemoteSettings -from olympia.zadmin.models import set_config +from olympia.zadmin.models import get_config, set_config from .mlbf import MLBF -from .models import BlocklistSubmission +from .models import BlocklistSubmission, BlockType from .utils import ( datetime_to_ts, ) @@ -88,28 +89,17 @@ def monitor_remote_settings(): @task -def upload_filter(generation_time, is_base=True): +def upload_filter(generation_time, filter_list=None, upload_stash=False): + filter_list: List[BlockType] = ( + [] if filter_list is None else [BlockType[filter] for filter in filter_list] + ) bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET server = RemoteSettings( bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False ) mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True) - if is_base: - # clear the collection for the base - we want to be the only filter - server.delete_all_records() - statsd.incr('blocklist.tasks.upload_filter.reset_collection') - # Then the bloomfilter - data = { - 'key_format': MLBF.KEY_FORMAT, - 'generation_time': generation_time, - 'attachment_type': BLOCKLIST_RECORD_MLBF_BASE, - } - with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file: - attachment = ('filter.bin', filter_file, 'application/octet-stream') - server.publish_attachment(data, attachment) - statsd.incr('blocklist.tasks.upload_filter.upload_mlbf') - statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base') - else: + + if upload_stash: with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file: stash_data = json.load(stash_file) # If we have a stash, write that @@ -121,10 +111,36 @@ def upload_filter(generation_time, is_base=True): server.publish_record(stash_upload_data) statsd.incr('blocklist.tasks.upload_filter.upload_stash') + for block_type in filter_list: + data = { + 'key_format': MLBF.KEY_FORMAT, + 'generation_time': generation_time, + 'attachment_type': BLOCKLIST_RECORD_MLBF_BASE, + } + with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file: + attachment = ( + f'filter-{block_type}.bin', + filter_file, + 'application/octet-stream', + ) + server.publish_attachment(data, attachment) + statsd.incr('blocklist.tasks.upload_filter.upload_mlbf') + statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base') + + oldest_base_filter_id = min( + get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), json_value=True) + for block_type in filter_list + ) + + server.delete_records_before(oldest_base_filter_id) + statsd.incr('blocklist.tasks.upload_filter.reset_collection') + server.complete_session() set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True) - if is_base: - set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True) + for block_type in filter_list: + set_config( + MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True + ) @task diff --git a/src/olympia/blocklist/tests/test_commands.py b/src/olympia/blocklist/tests/test_commands.py index 3df8df064b6a..81fd8422a1a2 100644 --- a/src/olympia/blocklist/tests/test_commands.py +++ b/src/olympia/blocklist/tests/test_commands.py @@ -9,6 +9,7 @@ version_factory, ) from olympia.blocklist.mlbf import MLBF +from olympia.blocklist.models import BlockType class TestExportBlocklist(TestCase): @@ -36,6 +37,10 @@ def test_command(self): updated_by=user, ) - call_command('export_blocklist', '1') + call_command('export_blocklist', '1', '--block-type', BlockType.BLOCKED.name) mlbf = MLBF.load_from_storage(1) - assert mlbf.storage.exists(mlbf.filter_path) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + call_command('export_blocklist', '1', '--block-type', BlockType.NOT_BLOCKED.name) + mlbf = MLBF.load_from_storage(1) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.NOT_BLOCKED)) + diff --git a/src/olympia/blocklist/tests/test_cron.py b/src/olympia/blocklist/tests/test_cron.py index 63e0e7caf7f2..7de3717d5420 100644 --- a/src/olympia/blocklist/tests/test_cron.py +++ b/src/olympia/blocklist/tests/test_cron.py @@ -98,9 +98,10 @@ def test_skip_update_unless_force_base(self): assert self.mocks['olympia.blocklist.cron.upload_filter.delay'].called - # Check that a filter was created on the second attempt + # Check that both filters were created on the second attempt mlbf = MLBF.load_from_storage(self.current_time) - assert mlbf.storage.exists(mlbf.filter_path) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.SOFT_BLOCKED)) assert not mlbf.storage.exists(mlbf.stash_path) def test_skip_update_unless_no_base_mlbf(self): @@ -271,7 +272,8 @@ def test_upload_stash_unless_enough_changes(self): ].call_args_list == [ mock.call( self.current_time, - is_base=False, + filters_to_update=[], + update_stash=True, ) ] mlbf = MLBF.load_from_storage(self.current_time) @@ -288,7 +290,8 @@ def test_upload_stash_unless_enough_changes(self): assert ( mock.call( self.current_time, - is_base=True, + filters_to_update=[BlockType.BLOCKED], + update_stash=False, ) in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list ) @@ -356,7 +359,7 @@ def test_get_last_generation_time(self): def test_get_base_generation_time(self): assert get_base_generation_time() is None - set_config(MLBF_BASE_ID_CONFIG_KEY, 1) + set_config(MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED), 1) assert get_base_generation_time() == 1 diff --git a/src/olympia/constants/blocklist.py b/src/olympia/constants/blocklist.py index 8cda2e5c9b11..779004bf03da 100644 --- a/src/olympia/constants/blocklist.py +++ b/src/olympia/constants/blocklist.py @@ -1,8 +1,15 @@ # How many guids should there be in the stashes before we make a new base. +from olympia.blocklist.models import BlockType + + BASE_REPLACE_THRESHOLD = 5_000 # Config keys used to track recent mlbf ids MLBF_TIME_CONFIG_KEY = 'blocklist_mlbf_generation_time' -MLBF_BASE_ID_CONFIG_KEY = 'blocklist_mlbf_base_id' + + +def MLBF_BASE_ID_CONFIG_KEY(block_type: BlockType): + return f'blocklist_mlbf_base_id_{block_type.name.lower()}' + REMOTE_SETTINGS_COLLECTION_MLBF = 'addons-bloomfilters'