Skip to content

Commit

Permalink
Merge pull request neos#5229 from dlubitz/task/perfomance-cache-flush
Browse files Browse the repository at this point in the history
TASK: Improve performance on content cache flush
  • Loading branch information
skurfuerst authored Sep 18, 2024
2 parents 36842a3 + 02fb77a commit a0b6d26
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 83 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -52,88 +52,43 @@
* as otherwise this subprocess won't be called.
*
*
* The following scenario explains how to think about cache flushing.
*
* EventStore::commit block() finished
* ║ │
* ─────╬──────────────────────────!1!───────────┼─!2!────────▶
* SYNC POINT ▲│
* ╲ ╱
* ╲ ╱
* ╲ ╱
* ╲ ╱
* ─ ─ ─ ─ ─ ─ ─ ─╲─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─╱─ ─ ─ ─ ─ ─ ─ ─ ─ ─ async boundary
* ╲ SYNC POINT
* ╲ ║ ╱
* Projection::catchUp▼ │ │ │ ║╱ │
* ────────────────┼────┼────────┼─────╳───────────────┼──────▶
* │ │ │ ║ │
* update Projection │ │ ║ │
* state (old -> new) │ │ TX commit │
* │ │ (end of batch) │
* update sequence │ │
* number │ │
* │ │
* onAfterCatchUp
* onBefore (B)
* BatchCompleted
* (A)
*
* There are two natural places where the Fusion cache can be flushed:
* - A: onBeforeBatchCompleted (before ending the transaction in the projection)
* - B: onAfterCatchUp (after ending the transaction).
*
* We need to ensure that the system is eventually consistent, so the following invariants must hold:
* - After a change in the projection, some time later, the cache must have been flushed
* - at a re-rendering after the cache flush, the new content must be shown
* - when block() returns, ANY re-render (even if happening immediately) must return the new content.
* - (Eventual Consistency): Processes can be blocked arbitrarily long at any point in time indefinitely.
*
* The scenarios which are NOT allowed to happen are:
* - INVARIANT_1: after a change, the old content is still visible when all processes have ended.
* - INVARIANT_2: after a change, when rendering happens directly after block(), the old content
* is shown (e.g. because cache is not yet flushed).
*
* CASE A (cache flushed at onBeforeBatchCompleted only):
* - Let's assume the cache is flushed really quickly.
* - and AFTER the cache is flushed but BEFORE the transaction is committed,
* - another request hits the system - marked above with !1!
*
* THEN: the request will still load the old data, render the page based on the old data, and add
* the old data to the cache. The cache will not be flushed again because it has already been flushed.
*
* => INVARIANT_1 violated.
* => this case needs a cache flush at onAfterCatchUp; to ensure the system converges.
*
* CASE B (cache flushed on onAfterCatchUp only):
* - Let's assume the blocking has finished, and caches have not been flushed yet.
* - Then, during re-rendering, the old content is shown because the cache is still full
*
* => INVARIANT_2 violated.
* => this case needs a cache flush at onBeforeBatchCompleted.
*
* SUMMARY: we need to flush the cache at BOTH places.
* projection update
* call finished
* EventStore::commit
* ║ │
* ─────╬──────────────────────────!1!───────────┼────────!2!─▶
* ║ ▲│
* │ │
* │ │ NO async boundary anymore!
* │ │ => we can GUARANTEE that
* │ │ onAfterCatchUp has run
* │ │ SYNC before control is returned
* │ │ POINT to the caller.
* │ ║ │
* Projection::catchUp │ │ ║ ││
* ────────────────┼────┼──────────────╬───────┼──────────────▶
* │ │ ║ │
* update Projection │ ║ │
* state (old -> new) │ ║ │
* │ TX commit │
* update sequence (end of batch)│
* number │
* │
* onAfterCatchUp
* => e.g. flush
* Fusion cache
*
* @internal
*/
class GraphProjectorCatchUpHookForCacheFlushing implements CatchUpHookInterface
{
private static bool $enabled = true;

/**
* @var array<string,FlushNodeAggregateRequest>
*/
private array $flushNodeAggregateRequestsOnBeforeBatchCompleted = [];
/**
* @var array<string,FlushNodeAggregateRequest>
*/
private array $flushNodeAggregateRequestsOnAfterCatchUp = [];

/**
* @var array<string,FlushWorkspaceRequest>
*/
private array $flushWorkspaceRequestsOnBeforeBatchCompleted = [];

/**
* @var array<string,FlushWorkspaceRequest>
*/
Expand Down Expand Up @@ -259,7 +214,7 @@ private function scheduleCacheFlushJobForNodeAggregate(
NodeAggregate $nodeAggregate
): void {
// we store this in an associative array deduplicate.
$this->flushNodeAggregateRequestsOnBeforeBatchCompleted[$workspaceName->value . '__' . $nodeAggregate->nodeAggregateId->value] = FlushNodeAggregateRequest::create(
$this->flushNodeAggregateRequestsOnAfterCatchUp[$workspaceName->value . '__' . $nodeAggregate->nodeAggregateId->value] = FlushNodeAggregateRequest::create(
$contentRepository->id,
$workspaceName,
$nodeAggregate->nodeAggregateId,
Expand All @@ -273,7 +228,7 @@ private function scheduleCacheFlushJobForWorkspaceName(
WorkspaceName $workspaceName
): void {
// we store this in an associative array deduplicate.
$this->flushWorkspaceRequestsOnBeforeBatchCompleted[$workspaceName->value] = FlushWorkspaceRequest::create(
$this->flushWorkspaceRequestsOnAfterCatchUp[$workspaceName->value] = FlushWorkspaceRequest::create(
$contentRepository->id,
$workspaceName,
);
Expand All @@ -300,17 +255,6 @@ private function determineAncestorNodeAggregateIds(WorkspaceName $workspaceName,

public function onBeforeBatchCompleted(): void
{
foreach ($this->flushNodeAggregateRequestsOnBeforeBatchCompleted as $index => $request) {
$this->contentCacheFlusher->flushNodeAggregate($request, CacheFlushingStrategy::IMMEDIATE);
$this->flushNodeAggregateRequestsOnAfterCatchUp[$index] = $request;
}
$this->flushNodeAggregateRequestsOnBeforeBatchCompleted = [];

foreach ($this->flushWorkspaceRequestsOnBeforeBatchCompleted as $index => $request) {
$this->contentCacheFlusher->flushWorkspace($request, CacheFlushingStrategy::IMMEDIATE);
$this->flushWorkspaceRequestsOnAfterCatchUp[$index] = $request;
}
$this->flushWorkspaceRequestsOnBeforeBatchCompleted = [];
}

public function onAfterCatchUp(): void
Expand Down

0 comments on commit a0b6d26

Please sign in to comment.