Skip to content

Commit

Permalink
Upload multiple filters
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinMind committed Nov 7, 2024
1 parent f4b5386 commit aeb1b36
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 73 deletions.
89 changes: 53 additions & 36 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import List

import waffle
from django_statsd.clients import statsd
Expand Down Expand Up @@ -28,8 +29,8 @@ def get_last_generation_time():
return get_config(MLBF_TIME_CONFIG_KEY, None, json_value=True)


def get_base_generation_time():
return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True)
def get_base_generation_time(block_type: BlockType):
return get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), None, json_value=True)


def get_blocklist_last_modified_time():
Expand Down Expand Up @@ -68,25 +69,11 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
# This timestamp represents the last time the MLBF was generated and uploaded.
# It could have been a base filter or a stash.
last_generation_time = get_last_generation_time()
# This timestamp represents the point in time when
# the base filter was generated and uploaded.
base_generation_time = get_base_generation_time()

mlbf = MLBF.generate_from_db(generation_time)

base_filter = (
MLBF.load_from_storage(base_generation_time)
if base_generation_time is not None
else None
)

previous_filter = (
# Only load previoous filter if there is a timestamp to use
# and that timestamp is not the same as the base_filter
MLBF.load_from_storage(last_generation_time)
if last_generation_time is not None
and (base_filter is None or base_filter.created_at != last_generation_time)
else base_filter
else None
)

changes_count = mlbf.blocks_changed_since_previous(
Expand All @@ -95,15 +82,40 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_changed', changes_count
)
need_update = (
force_base
or base_filter is None
or (
previous_filter is not None
and previous_filter.created_at < get_blocklist_last_modified_time()

base_filters_to_update: List[BlockType] = []

# Determine which base filters need to be re uploaded
for block_type in BlockType:
if force_base:
base_filters_to_update.append(block_type)
continue

base_generation_time = get_base_generation_time(block_type)
base_filter = (
MLBF.load_from_storage(base_generation_time)
if base_generation_time is not None
else None
)

if (
base_filter is None
or mlbf.blocks_changed_since_previous(block_type, base_filter)
> BASE_REPLACE_THRESHOLD
):
base_filters_to_update.append(block_type)

previous_filter_is_stale = (
previous_filter is not None
and previous_filter.created_at < get_blocklist_last_modified_time()
)

need_update = (
len(base_filters_to_update) > 0
or changes_count > 0
or previous_filter_is_stale
)

if not need_update:
log.info('No new/modified/deleted Blocks in database; skipping MLBF generation')
return
Expand All @@ -112,29 +124,34 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_count',
len(mlbf.data.blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.soft_blocked_count',
len(mlbf.data.soft_blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.not_blocked_count',
len(mlbf.data.not_blocked_items),
)

make_base_filter = (
force_base
or base_filter is None
or previous_filter is None
or mlbf.blocks_changed_since_previous(BlockType.BLOCKED, base_filter)
> BASE_REPLACE_THRESHOLD
update_stash = (
len(base_filters_to_update) < len(BlockType.choices)
and changes_count > 0
)

if make_base_filter:
mlbf.generate_and_write_filter()
else:
mlbf.generate_and_write_stash(previous_filter)
breakpoint()

upload_filter.delay(generation_time, is_base=make_base_filter)
mlbf.generate_and_write_stash(previous_filter) if update_stash else None

if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)
for block_type in base_filters_to_update:
mlbf.generate_and_write_filter(block_type)

upload_filter.delay(
generation_time,
filters_to_update=[key.name for key in base_filters_to_update],
update_stash=update_stash,
)

cleanup_old_files.delay(generation_time)

def process_blocklistsubmissions():
qs = BlocklistSubmission.objects.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.core.management.base import BaseCommand

from olympia.blocklist.models import BlockType
import olympia.core.logger
from olympia.blocklist.mlbf import MLBF

Expand Down Expand Up @@ -29,6 +30,11 @@ def add_arguments(self, parser):
'the database',
default=None,
)
parser.add_argument(
'--block-type',
help='Block type to export',
default=None,
)

def load_json(self, json_path):
with open(json_path) as json_file:
Expand All @@ -38,6 +44,7 @@ def load_json(self, json_path):
def handle(self, *args, **options):
log.debug('Exporting blocklist to file')
mlbf = MLBF.generate_from_db(options.get('id'))
block_type = BlockType[options.get('block_type')]

if options.get('block_guids_input'):
mlbf.blocked_items = list(
Expand All @@ -52,4 +59,4 @@ def handle(self, *args, **options):
)
)

mlbf.generate_and_write_filter()
mlbf.generate_and_write_filter(block_type)
21 changes: 14 additions & 7 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,32 @@ def hash_filter_inputs(cls, input_list):
for (guid, version) in input_list
]

@property
def filter_path(self):
return self.storage.path('filter')
def filter_path(self, block_type: BlockType):
return self.storage.path(f'filter-{block_type.name}')

@property
def stash_path(self):
return self.storage.path('stash.json')

def generate_and_write_filter(self):
def generate_and_write_filter(self, block_type: BlockType):
stats = {}

blocked = self.data[block_type]

not_blocked = [
self.data[_block_type]
for _block_type in BlockType
if _block_type != block_type
]

bloomfilter = generate_mlbf(
stats=stats,
blocked=self.data.blocked_items,
not_blocked=self.data.not_blocked_items,
blocked=blocked,
not_blocked=not_blocked,
)

# write bloomfilter
mlbf_path = self.filter_path
mlbf_path = self.filter_path(block_type)
with self.storage.open(mlbf_path, 'wb') as filter_file:
log.info(f'Writing to file {mlbf_path}')
bloomfilter.tofile(filter_file)
Expand Down
58 changes: 37 additions & 21 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
from datetime import datetime, timedelta
from typing import List

from django.conf import settings
from django.contrib.admin.models import CHANGE, LogEntry
Expand All @@ -21,10 +22,10 @@
REMOTE_SETTINGS_COLLECTION_MLBF,
)
from olympia.lib.remote_settings import RemoteSettings
from olympia.zadmin.models import set_config
from olympia.zadmin.models import get_config, set_config

from .mlbf import MLBF
from .models import BlocklistSubmission
from .models import BlocklistSubmission, BlockType
from .utils import (
datetime_to_ts,
)
Expand Down Expand Up @@ -88,28 +89,17 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, is_base=True):
def upload_filter(generation_time, filter_list=None, upload_stash=False):
filter_list: List[BlockType] = (
[] if filter_list is None else [BlockType[filter] for filter in filter_list]
)
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False
)
mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True)
if is_base:
# clear the collection for the base - we want to be the only filter
server.delete_all_records()
statsd.incr('blocklist.tasks.upload_filter.reset_collection')
# Then the bloomfilter
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE,
}
with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file:
attachment = ('filter.bin', filter_file, 'application/octet-stream')
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')
else:

if upload_stash:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
Expand All @@ -121,10 +111,36 @@ def upload_filter(generation_time, is_base=True):
server.publish_record(stash_upload_data)
statsd.incr('blocklist.tasks.upload_filter.upload_stash')

for block_type in filter_list:
data = {
'key_format': MLBF.KEY_FORMAT,
'generation_time': generation_time,
'attachment_type': BLOCKLIST_RECORD_MLBF_BASE,
}
with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file:
attachment = (
f'filter-{block_type}.bin',
filter_file,
'application/octet-stream',
)
server.publish_attachment(data, attachment)
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf')
statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')

oldest_base_filter_id = min(
get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), json_value=True)
for block_type in filter_list
)

server.delete_records_before(oldest_base_filter_id)
statsd.incr('blocklist.tasks.upload_filter.reset_collection')

server.complete_session()
set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True)
if is_base:
set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True)
for block_type in filter_list:
set_config(
MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True
)


@task
Expand Down
9 changes: 7 additions & 2 deletions src/olympia/blocklist/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
version_factory,
)
from olympia.blocklist.mlbf import MLBF
from olympia.blocklist.models import BlockType


class TestExportBlocklist(TestCase):
Expand Down Expand Up @@ -36,6 +37,10 @@ def test_command(self):
updated_by=user,
)

call_command('export_blocklist', '1')
call_command('export_blocklist', '1', '--block-type', BlockType.BLOCKED.name)
mlbf = MLBF.load_from_storage(1)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED))
call_command('export_blocklist', '1', '--block-type', BlockType.NOT_BLOCKED.name)
mlbf = MLBF.load_from_storage(1)
assert mlbf.storage.exists(mlbf.filter_path(BlockType.NOT_BLOCKED))

13 changes: 8 additions & 5 deletions src/olympia/blocklist/tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ def test_skip_update_unless_force_base(self):

assert self.mocks['olympia.blocklist.cron.upload_filter.delay'].called

# Check that a filter was created on the second attempt
# Check that both filters were created on the second attempt
mlbf = MLBF.load_from_storage(self.current_time)
assert mlbf.storage.exists(mlbf.filter_path)
assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED))
assert mlbf.storage.exists(mlbf.filter_path(BlockType.SOFT_BLOCKED))
assert not mlbf.storage.exists(mlbf.stash_path)

def test_skip_update_unless_no_base_mlbf(self):
Expand Down Expand Up @@ -271,7 +272,8 @@ def test_upload_stash_unless_enough_changes(self):
].call_args_list == [
mock.call(
self.current_time,
is_base=False,
filters_to_update=[],
update_stash=True,
)
]
mlbf = MLBF.load_from_storage(self.current_time)
Expand All @@ -288,7 +290,8 @@ def test_upload_stash_unless_enough_changes(self):
assert (
mock.call(
self.current_time,
is_base=True,
filters_to_update=[BlockType.BLOCKED],
update_stash=False,
)
in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list
)
Expand Down Expand Up @@ -356,7 +359,7 @@ def test_get_last_generation_time(self):

def test_get_base_generation_time(self):
assert get_base_generation_time() is None
set_config(MLBF_BASE_ID_CONFIG_KEY, 1)
set_config(MLBF_BASE_ID_CONFIG_KEY(BlockType.BLOCKED), 1)
assert get_base_generation_time() == 1


Expand Down
9 changes: 8 additions & 1 deletion src/olympia/constants/blocklist.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# How many guids should there be in the stashes before we make a new base.
from olympia.blocklist.models import BlockType


BASE_REPLACE_THRESHOLD = 5_000

# Config keys used to track recent mlbf ids
MLBF_TIME_CONFIG_KEY = 'blocklist_mlbf_generation_time'
MLBF_BASE_ID_CONFIG_KEY = 'blocklist_mlbf_base_id'


def MLBF_BASE_ID_CONFIG_KEY(block_type: BlockType):
return f'blocklist_mlbf_base_id_{block_type.name.lower()}'


REMOTE_SETTINGS_COLLECTION_MLBF = 'addons-bloomfilters'

0 comments on commit aeb1b36

Please sign in to comment.