Skip to content

Commit

Permalink
Use internal data object for MLBF to accommodate soft/hard block vers…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
KevinMind committed Oct 17, 2024
1 parent ca227c9 commit be0a284
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 591 deletions.
29 changes: 16 additions & 13 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import waffle
from django_statsd.clients import statsd
from filtercascade import InvalidErrorRateException

import olympia.core.logger
from olympia.constants.blocklist import (
Expand All @@ -11,7 +12,7 @@
)
from olympia.zadmin.models import get_config

from .mlbf import MLBF
from .mlbf import MLBF, MLBFDataType
from .models import Block, BlocklistSubmission
from .tasks import cleanup_old_files, process_blocklistsubmission, upload_filter
from .utils import datetime_to_ts
Expand Down Expand Up @@ -108,33 +109,35 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):

statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_count',
len(mlbf.blocked_items),
len(mlbf.data.blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.not_blocked_count',
len(mlbf.not_blocked_items),
len(mlbf.data.not_blocked_items),
)

make_base_filter = (
force_base
or not base_generation_time
or base_filter is None
or mlbf.blocks_changed_since_previous(base_filter) > BASE_REPLACE_THRESHOLD
)

if previous_filter and not make_base_filter:
try:
mlbf.generate_and_write_stash(previous_filter)
except FileNotFoundError:
log.info("No previous blocked.json so we can't create a stash.")
# fallback to creating a new base if stash fails
make_base_filter = True
mlbf.generate_and_write_stash(previous_filter)
if make_base_filter:
mlbf.generate_and_write_filter()
try:
mlbf.generate_and_write_filter()
except InvalidErrorRateException:
log.warning(
'Invalid error rates, because '
'all versions are either blocked or not blocked'
)
return

upload_filter.delay(generation_time, is_base=make_base_filter)

if base_generation_time:
cleanup_old_files.delay(base_filter_id=base_generation_time)
if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)


def process_blocklistsubmissions():
Expand Down
256 changes: 127 additions & 129 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import os
import secrets
from enum import Enum
from typing import List, Optional, Set, Tuple

from django.utils.functional import cached_property

Expand All @@ -9,7 +11,8 @@

import olympia.core.logger
from olympia.amo.utils import SafeStorage
from olympia.constants.blocklist import BASE_REPLACE_THRESHOLD
from olympia.blocklist.models import BlockVersion
from olympia.blocklist.utils import datetime_to_ts


log = olympia.core.logger.getLogger('z.amo.blocklist')
Expand Down Expand Up @@ -49,21 +52,16 @@ def generate_mlbf(stats, blocked, not_blocked):
cascade.verify(include=blocked, exclude=not_blocked)
return cascade

class MLBFDataType(Enum):
BLOCKED = 'blocked'
# SOFT_BLOCKED = 'soft_blocked'
NOT_BLOCKED = 'not_blocked'

def fetch_blocked_from_db():
from olympia.blocklist.models import BlockVersion

qs = BlockVersion.objects.filter(version__file__is_signed=True).values_list(
qs = BlockVersion.objects.filter(version__file__is_signed=True, soft=False).values_list(
'block__guid', 'version__version', 'version_id', named=True
)
all_versions = {
block_version.version_id: (
block_version.block__guid,
block_version.version__version,
)
for block_version in qs
}
return all_versions
return list(qs)


def fetch_all_versions_from_db(excluding_version_ids=None):
Expand All @@ -72,42 +70,117 @@ def fetch_all_versions_from_db(excluding_version_ids=None):
qs = Version.unfiltered.exclude(id__in=excluding_version_ids or ()).values_list(
'addon__addonguid__guid', 'version'
)
return list(qs)
return set(qs)


class BaseMLBFLoader:
def __init__(self, storage: SafeStorage):
self.storage = storage

@cached_property
def _raw(self):
"""
raw serializable data for the given MLBFLoader.
"""
return {key.value: self[key] for key in MLBFDataType}

def __getitem__(self, key: MLBFDataType) -> List[str]:
return getattr(self, f'{key.value}_items')

@cached_property
def _cache_path(self):
return self.storage.path('cache.json')


@cached_property
def blocked_items(self) -> List[str]:
raise NotImplementedError

@cached_property
def not_blocked_items(self) -> List[str]:
raise NotImplementedError


class MLBFStorageLoader(BaseMLBFLoader):
def __init__(self, storage: SafeStorage):
super().__init__(storage)
with self.storage.open(self._cache_path, 'r') as f:
self._data = json.load(f)

@cached_property
def blocked_items(self) -> List[str]:
return self._data.get(MLBFDataType.BLOCKED.value)

@cached_property
def not_blocked_items(self) -> List[str]:
return self._data.get(MLBFDataType.NOT_BLOCKED.value)


class MLBFDataBaseLoader(BaseMLBFLoader):
def __init__(self, storage: SafeStorage):
super().__init__(storage)
self._version_excludes = []

# TODO: there is an edge case where you create a new filter from
# a previously used time stamp. THis could lead to invalid files
# a filter using the DB should either clear the storage files
# or raise to not allow reusing the same time stamp.
# it is possibly debatable whether you should be able to
# determine the created_at time as an argument at all

# Save the raw data to storage to be used by later instances
# of this filter.
with self.storage.open(self._cache_path, 'w') as f:
json.dump(self._raw, f)

@cached_property
def blocked_items(self) -> List[str]:
blocked = []

for blocked_version in fetch_blocked_from_db():
blocked.append(
(blocked_version.block__guid, blocked_version.version__version)
)
self._version_excludes.append(blocked_version.version_id)

return MLBF.hash_filter_inputs(blocked)

@cached_property
def not_blocked_items(self) -> List[str]:
# see blocked_items - we need self._version_excludes populated
blocked_items = self.blocked_items
# even though we exclude all the version ids in the query there's an
# edge case where the version string occurs twice for an addon so we
# ensure not_blocked_items doesn't contain any blocked_items.
return MLBF.hash_filter_inputs(
fetch_all_versions_from_db(self._version_excludes) - set(blocked_items)
)


class MLBF:
FILTER_FILE = 'filter'
STASH_FILE = 'stash'
KEY_FORMAT = '{guid}:{version}'

def __init__(self, created_at):
def __init__(
self,
created_at: str = datetime_to_ts(),
data_class: 'BaseMLBFLoader' = BaseMLBFLoader,
):
self.created_at = created_at
self.storage = SafeStorage(
root_setting='MLBF_STORAGE_PATH',
rel_location=str(self.created_at),
)
self.data: BaseMLBFLoader = data_class(storage=self.storage)

@classmethod
def hash_filter_inputs(cls, input_list):
"""Returns a set"""
return {
"""Returns a list"""
return [
cls.KEY_FORMAT.format(guid=guid, version=version)
for (guid, version) in input_list
}

@property
def _blocked_path(self):
return self.storage.path('blocked.json')

@cached_property
def blocked_items(self):
raise NotImplementedError

@property
def _not_blocked_path(self):
return self.storage.path('notblocked.json')

@cached_property
def not_blocked_items(self):
raise NotImplementedError
]

@property
def filter_path(self):
Expand All @@ -121,7 +194,8 @@ def generate_and_write_filter(self):
stats = {}

bloomfilter = generate_mlbf(
stats=stats, blocked=self.blocked_items, not_blocked=self.not_blocked_items
stats=stats, blocked=self.data.blocked_items,
not_blocked=self.data.not_blocked_items
)

# write bloomfilter
Expand All @@ -133,19 +207,19 @@ def generate_and_write_filter(self):

log.info(json.dumps(stats))

@classmethod
def generate_diffs(cls, previous, current):
previous = set(previous)
current = set(current)
def generate_diffs(self, previous_mlbf: 'MLBF' = None) -> Tuple[Set[str], Set[str], int]:
previous = set([] if previous_mlbf is None else previous_mlbf.data.blocked_items)
current = set(self.data.blocked_items)
extras = current - previous
deletes = previous - current
return extras, deletes
changed_count = (
len(extras) + len(deletes) if len(previous) > 0 else len(current)
)
return extras, deletes, changed_count

def generate_and_write_stash(self, previous_mlbf):
def generate_and_write_stash(self, previous_mlbf: 'MLBF' = None):
# compare previous with current blocks
extras, deletes = self.generate_diffs(
previous_mlbf.blocked_items, self.blocked_items
)
extras, deletes, _ = self.generate_diffs(previous_mlbf)
stash_json = {
'blocked': list(extras),
'unblocked': list(deletes),
Expand All @@ -156,94 +230,18 @@ def generate_and_write_stash(self, previous_mlbf):
log.info(f'Writing to file {stash_path}')
json.dump(stash_json, json_file)

def should_reset_base_filter(self, previous_bloom_filter):
try:
# compare base with current blocks
extras, deletes = self.generate_diffs(
previous_bloom_filter.blocked_items, self.blocked_items
)
return (len(extras) + len(deletes)) > BASE_REPLACE_THRESHOLD
except FileNotFoundError:
# when previous_base_mlfb._blocked_path doesn't exist
return True
def blocks_changed_since_previous(self, previous_mlbf: 'MLBF' = None):
return self.generate_diffs(previous_mlbf)[2]

def blocks_changed_since_previous(self, previous_bloom_filter):
@classmethod
def load_from_storage(cls, created_at: str = datetime_to_ts(), error_on_missing: bool = False) -> Optional['MLBF']:
try:
# compare base with current blocks
extras, deletes = self.generate_diffs(
previous_bloom_filter.blocked_items, self.blocked_items
)
return len(extras) + len(deletes)
return cls(created_at, data_class=MLBFStorageLoader)
except FileNotFoundError:
# when previous_bloom_filter._blocked_path doesn't exist
return len(self.blocked_items)
if error_on_missing:
raise
return None

@classmethod
def load_from_storage(cls, *args, **kwargs):
return StoredMLBF(*args, **kwargs)

@classmethod
def generate_from_db(cls, *args, **kwargs):
return DatabaseMLBF(*args, **kwargs)


class StoredMLBF(MLBF):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Raise exception if either cache files are missing
# This should fail fast and prevent continuing with invalid data
_ = self.blocked_items
_ = self.not_blocked_items

@cached_property
def blocked_items(self):
with self.storage.open(self._blocked_path, 'r') as json_file:
return json.load(json_file)

@cached_property
def not_blocked_items(self):
with self.storage.open(self._not_blocked_path, 'r') as json_file:
return json.load(json_file)


class DatabaseMLBF(MLBF):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Raise exception if either cache files are missing
# This should fail fast and prevent continuing with invalid data
_ = self.blocked_items
_ = self.not_blocked_items

@cached_property
def blocked_items(self):
blocked_ids_to_versions = fetch_blocked_from_db()
blocked = blocked_ids_to_versions.values()
# cache version ids so query in not_blocked_items is efficient
self._version_excludes = blocked_ids_to_versions.keys()
blocked_items = list(self.hash_filter_inputs(blocked))

with self.storage.open(self._blocked_path, 'w') as json_file:
log.info(f'Writing to file {self._blocked_path}')
json.dump(blocked_items, json_file)

return blocked_items

@cached_property
def not_blocked_items(self):
# see blocked_items - we need self._version_excludes populated
blocked_items = self.blocked_items
# even though we exclude all the version ids in the query there's an
# edge case where the version string occurs twice for an addon so we
# ensure not_blocked_items doesn't contain any blocked_items.
not_blocked_items = list(
self.hash_filter_inputs(fetch_all_versions_from_db(self._version_excludes))
- set(blocked_items)
)

with self.storage.open(self._not_blocked_path, 'w') as json_file:
log.info(f'Writing to file {self._not_blocked_path}')
json.dump(not_blocked_items, json_file)

return not_blocked_items
def generate_from_db(cls, created_at: str = datetime_to_ts()):
return cls(created_at, data_class=MLBFDataBaseLoader)
Loading

0 comments on commit be0a284

Please sign in to comment.