Skip to content

Commit

Permalink
[r] Fix: Optimistic lock contention on HCA replicas (#6648)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Nov 8, 2024
1 parent ed40511 commit a37b871
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 20 deletions.
50 changes: 49 additions & 1 deletion src/azul/indexer/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
Entities = JSONs


class TooManyEntities(Exception):

def __init__(self, max_size: int):
super().__init__(max_size)


class Accumulator(metaclass=ABCMeta):
"""
Accumulates multiple values into a single value, not necessarily of the same
Expand Down Expand Up @@ -92,7 +98,12 @@ class SetAccumulator(Accumulator):
smallest values, it returns a sorted list of the first N distinct values.
"""

def __init__(self, max_size=None, key=None) -> None:
def __init__(self,
max_size=None,
key=None,
*,
raise_on_overflow: bool = False
) -> None:
"""
:param max_size: the maximum number of elements to retain
Expand All @@ -101,15 +112,50 @@ def __init__(self, max_size=None, key=None) -> None:
be used. With that default key, if any None values were
placed in the accumulator, the first element, and only the
first element of the returned list will be None.
:param raise_on_overflow: If true, raise TooManyEntities if the size of
the accumulated set would exceed max_size.
"""
super().__init__()
self.value = set()
self.max_size = max_size
self.key = none_safe_key(none_last=True) if key is None else key
self.raise_on_overflow = raise_on_overflow

def accumulate(self, value) -> bool:
"""
:return: True, if the given value was incorporated into the set
>>> s = SetAccumulator(max_size=3)
>>> s.accumulate(1)
True
>>> s.accumulate(1)
False
>>> s.accumulate(2)
True
>>> s.accumulate([1, 2, 3])
True
>>> s.accumulate([2, 3])
False
>>> s.accumulate(4)
False
>>> s.get()
[1, 2, 3]
>>> s = SetAccumulator(max_size=3, raise_on_overflow=True)
>>> s.accumulate([1, 2, 3])
True
>>> s.accumulate(4)
Traceback (most recent call last):
...
azul.indexer.aggregate.TooManyEntities: 3
"""
if self.max_size is None or len(self.value) < self.max_size:
before = len(self.value)
Expand All @@ -126,6 +172,8 @@ def accumulate(self, value) -> bool:
return False
else:
assert False
elif self.raise_on_overflow and value not in self.value:
raise TooManyEntities(self.max_size)
else:
return False

Expand Down
13 changes: 9 additions & 4 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,15 @@ def special_fields(self) -> SpecialFields:
@abstractmethod
def hot_entity_types(self) -> AbstractSet[str]:
"""
The types of entities that do not explicitly track their hubs in replica
documents in order to avoid a large list of hub references in the
replica document, and to avoid contention when updating that list during
indexing.
The types of inner entities that do not explicitly track their hubs in
replica documents in order to avoid a large list of hub references in
the replica document, and to avoid contention when updating that list
during indexing.
There is a brittle coupling between this method and the implementation
of the transformers, where the replicas are emitted. This is because
here we use the transformed types (e.g. "donors") and the latter uses
the untransformed types (e.g. "donor_organism").
"""
raise NotImplementedError

Expand Down
7 changes: 2 additions & 5 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,7 @@ def _transform(self,
for linked_entity in linked:
yield self._replica(
linked_entity,
# Datasets are linked to every file in their snapshot,
# making an explicit list of hub IDs for the dataset both
# redundant and impractically large. Therefore, we leave the
# hub IDs field empty for datasets and rely on the tenet
# that every file is an implicit hub of its parent dataset.
# There is a brittle coupling between here and
# :meth:`MetadataPlugin.hot_entity_types`.
file_hub=None if linked_entity.entity_type == 'anvil_dataset' else entity.entity_id,
)
9 changes: 8 additions & 1 deletion src/azul/plugins/metadata/hca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,14 @@ def special_fields(self) -> SpecialFields:

@property
def hot_entity_types(self) -> AbstractSet[str]:
return {'projects'}
return {
'projects',
'donors',
'analysis_protocols',
'imaging_protocols',
'library_preparation_protocols',
'sequencing_protocols'
}

@property
def facets(self) -> Sequence[str]:
Expand Down
4 changes: 4 additions & 0 deletions src/azul/plugins/metadata/hca/indexer/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ def _accumulator(self, field) -> Accumulator | None:
none_safe_itemgetter('value', 'unit')))
elif field == 'donor_count':
return UniqueValueCountAccumulator()
elif field == 'document_id':
return SetAccumulator(max_size=100, raise_on_overflow=True)
else:
return super()._accumulator(field)

Expand Down Expand Up @@ -197,6 +199,8 @@ class ProtocolAggregator(SimpleAggregator):
def _accumulator(self, field) -> Accumulator | None:
if field == 'assay_type':
return FrequencySetAccumulator(max_size=100)
elif field == 'document_id':
return SetAccumulator(max_size=100, raise_on_overflow=True)
else:
return super()._accumulator(field)

Expand Down
31 changes: 22 additions & 9 deletions src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from enum import (
Enum,
)
from itertools import (
chain,
)
import logging
import re
from typing import (
Expand Down Expand Up @@ -1472,15 +1475,25 @@ def _transform(self,
file_id = file.ref.entity_id
yield self._contribution(contents, file_id)
if config.enable_replicas:
yield self._replica(self.api_bundle.ref, file_hub=file_id)
# Projects are linked to every file in their snapshot,
# making an explicit list of hub IDs for the project both
# redundant and impractically large. Therefore, we leave the
# hub IDs field empty for projects and rely on the tenet
# that every file is an implicit hub of its parent project.
yield self._replica(self._api_project.ref, file_hub=None)
for linked_entity in visitor.entities:
yield self._replica(linked_entity, file_hub=file_id)

def is_hot(entity_type: EntityType) -> bool:
# There is a brittle coupling between this function and
# :meth:`MetadataPlugin.hot_entity_types`.
if entity_type == 'donor_organism':
return True
elif entity_type == 'project':
return True
elif entity_type.endswith('_protocol'):
return True
else:
return False

for ref in chain(
[self.api_bundle.ref, self._api_project.ref],
visitor.entities
):
yield self._replica(ref,
file_hub=None if is_hot(ref.entity_type) else file_id)

def matrix_stratification_values(self, file: api.File) -> JSON:
"""
Expand Down

0 comments on commit a37b871

Please sign in to comment.