Skip to content

Commit

Permalink
BUGFIX: fix race condition when flushing caches
Browse files Browse the repository at this point in the history
Resolves: #4219
  • Loading branch information
skurfuerst committed Apr 30, 2023
1 parent 59d2a06 commit 52be2f0
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 9 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,73 @@
* is not needed:
*
* By calling {@see self::disabled(\Closure)} in your code, all projection updates
* will never trigger catch up hooks.
*
* NOTE: This will only work when {@see CatchUpTriggerWithSynchronousOption::synchronously()} is called,
* will never trigger catch up hooks. This will only work when
* {@see CatchUpTriggerWithSynchronousOption::synchronously()} is called,
* as otherwise this subprocess won't be called.
*
*
* The following scenario explains how to think about cache flushing.
*
* EventStore::commit block() finished
* ║ │
* ─────╬──────────────────────────!1!───────────┼─!2!────────▶
* SYNC POINT ▲│
* ╲ ╱
* ╲ ╱
* ╲ ╱
* ╲ ╱
* ─ ─ ─ ─ ─ ─ ─ ─╲─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─╱─ ─ ─ ─ ─ ─ ─ ─ ─ ─ async boundary
* ╲ SYNC POINT
* ╲ ║ ╱
* Projection::catchUp▼ │ │ │ ║╱ │
* ────────────────┼────┼────────┼─────╳───────────────┼──────▶
* │ │ │ ║ │
* update Projection │ │ ║ │
* state (old -> new) │ │ TX commit │
* │ │ (end of batch) │
* update sequence │ │
* number │ │
* │ │
* onAfterCatchUp
* onBefore (B)
* BatchCompleted
* (A)
*
* There are two natural places where the Fusion cache can be flushed:
* - A: onBeforeBatchCompleted (before ending the transaction in the projection)
* - B: onAfterCatchUp (after ending the transaction).
*
* We need to ensure that the system is eventually consistent, so the following invariants must hold:
* - After a change in the projection, some time later, the cache must have been flushed
* - at a re-rendering after the cache flush, the new content must be shown
* - when block() returns, ANY re-render (even if happening immediately) must return the new content.
* - (Eventual Consistency): Processes can be blocked arbitrarily long at any point in time indefinitely.
*
* The scenarios which are NOT allowed to happen are:
* - INVARIANT_1: after a change, the old content is still visible when all processes have ended.
* - INVARIANT_2: after a change, when rendering happens directly after block(), the old content
* is shown (e.g. because cache is not yet flushed).
*
* CASE A (cache flushed at onBeforeBatchCompleted only):
* - Let's assume the cache is flushed really quickly.
* - and AFTER the cache is flushed but BEFORE the transaction is committed,
* - another request hits the system - marked above with !1!
*
* THEN: the request will still load the old data, render the page based on the old data, and add
* the old data to the cache. The cache will not be flushed again because it has already been flushed.
*
* => INVARIANT_1 violated.
* => this case needs a cache flush at onAfterCatchUp; to ensure the system converges.
*
* CASE B (cache flushed on onAfterCatchUp only):
* - Let's assume the blocking has finished, and caches have not been flushed yet.
* - Then, during re-rendering, the old content is shown because the cache is still full
*
* => INVARIANT_2 violated.
* => this case needs a cache flush at onBeforeBatchCompleted.
*
* SUMMARY: we need to flush the cache at BOTH places.
*
* @internal
*/
class GraphProjectorCatchUpHookForCacheFlushing implements CatchUpHookInterface
Expand Down Expand Up @@ -71,11 +133,13 @@ public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $even
return;
}

if ($eventInstance instanceof NodeAggregateWasRemoved
if (
$eventInstance instanceof NodeAggregateWasRemoved
// NOTE: when moving a node, we need to clear the cache not just after the move was completed,
// but also on the original location. Otherwise, we have the problem that the cache is not
// cleared, leading to presumably duplicate nodes in the UI.
|| $eventInstance instanceof NodeAggregateWasMoved) {
|| $eventInstance instanceof NodeAggregateWasMoved
) {
$nodeAggregate = $this->contentRepository->getContentGraph()->findNodeAggregateById(
$eventInstance->getContentStreamId(),
$eventInstance->getNodeAggregateId()
Expand Down Expand Up @@ -126,15 +190,19 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event
/**
* @var array<string,array<string,mixed>>
*/
protected array $cacheFlushes = [];
private array $cacheFlushesOnBeforeBatchCompleted = [];
/**
* @var array<string,array<string,mixed>>
*/
private array $cacheFlushesOnAfterCatchUp = [];

protected function scheduleCacheFlushJobForNodeAggregate(
ContentRepository $contentRepository,
ContentStreamId $contentStreamId,
NodeAggregateId $nodeAggregateId
): void {
// we store this in an associative array deduplicate.
$this->cacheFlushes[$contentStreamId->value . '__' . $nodeAggregateId->value] = [
$this->cacheFlushesOnBeforeBatchCompleted[$contentStreamId->value . '__' . $nodeAggregateId->value] = [
'cr' => $contentRepository,
'csi' => $contentStreamId,
'nai' => $nodeAggregateId
Expand All @@ -143,15 +211,20 @@ protected function scheduleCacheFlushJobForNodeAggregate(

public function onBeforeBatchCompleted(): void
{
foreach ($this->cacheFlushes as $entry) {
foreach ($this->cacheFlushesOnBeforeBatchCompleted as $k => $entry) {
$this->contentCacheFlusher->flushNodeAggregate($entry['cr'], $entry['csi'], $entry['nai']);
$this->cacheFlushesOnAfterCatchUp[$k] = $entry;
}
$this->cacheFlushes = [];
$this->cacheFlushesOnBeforeBatchCompleted = [];
}



public function onAfterCatchUp(): void
{
foreach ($this->cacheFlushesOnAfterCatchUp as $entry) {
$this->contentCacheFlusher->flushNodeAggregate($entry['cr'], $entry['csi'], $entry['nai']);
}
$this->cacheFlushesOnAfterCatchUp = [];
}
}

0 comments on commit 52be2f0

Please sign in to comment.