diff --git a/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.monopic b/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.monopic index c7fc9e98bb5..cfec39ec13c 100644 Binary files a/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.monopic and b/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.monopic differ diff --git a/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php b/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php index cf465824ce4..94f66e17ff0 100644 --- a/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php +++ b/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php @@ -52,67 +52,31 @@ * as otherwise this subprocess won't be called. * * - * The following scenario explains how to think about cache flushing. - * - * EventStore::commit block() finished - * ║ │ - * ─────╬──────────────────────────!1!───────────┼─!2!────────▶ - * SYNC POINT ▲│ - * ╲ ╱ - * ╲ ╱ - * ╲ ╱ - * ╲ ╱ - * ─ ─ ─ ─ ─ ─ ─ ─╲─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─╱─ ─ ─ ─ ─ ─ ─ ─ ─ ─ async boundary - * ╲ SYNC POINT - * ╲ ║ ╱ - * Projection::catchUp▼ │ │ │ ║╱ │ - * ────────────────┼────┼────────┼─────╳───────────────┼──────▶ - * │ │ │ ║ │ - * update Projection │ │ ║ │ - * state (old -> new) │ │ TX commit │ - * │ │ (end of batch) │ - * update sequence │ │ - * number │ │ - * │ │ - * onAfterCatchUp - * onBefore (B) - * BatchCompleted - * (A) - * - * There are two natural places where the Fusion cache can be flushed: - * - A: onBeforeBatchCompleted (before ending the transaction in the projection) - * - B: onAfterCatchUp (after ending the transaction). - * - * We need to ensure that the system is eventually consistent, so the following invariants must hold: - * - After a change in the projection, some time later, the cache must have been flushed - * - at a re-rendering after the cache flush, the new content must be shown - * - when block() returns, ANY re-render (even if happening immediately) must return the new content. - * - (Eventual Consistency): Processes can be blocked arbitrarily long at any point in time indefinitely. - * - * The scenarios which are NOT allowed to happen are: - * - INVARIANT_1: after a change, the old content is still visible when all processes have ended. - * - INVARIANT_2: after a change, when rendering happens directly after block(), the old content - * is shown (e.g. because cache is not yet flushed). - * - * CASE A (cache flushed at onBeforeBatchCompleted only): - * - Let's assume the cache is flushed really quickly. - * - and AFTER the cache is flushed but BEFORE the transaction is committed, - * - another request hits the system - marked above with !1! - * - * THEN: the request will still load the old data, render the page based on the old data, and add - * the old data to the cache. The cache will not be flushed again because it has already been flushed. - * - * => INVARIANT_1 violated. - * => this case needs a cache flush at onAfterCatchUp; to ensure the system converges. - * - * CASE B (cache flushed on onAfterCatchUp only): - * - Let's assume the blocking has finished, and caches have not been flushed yet. - * - Then, during re-rendering, the old content is shown because the cache is still full - * - * => INVARIANT_2 violated. - * => this case needs a cache flush at onBeforeBatchCompleted. - * - * SUMMARY: we need to flush the cache at BOTH places. + * projection update + * call finished + * EventStore::commit + * ║ │ + * ─────╬──────────────────────────!1!───────────┼────────!2!─▶ + * ║ ▲│ + * │ │ + * │ │ NO async boundary anymore! + * │ │ => we can GUARANTEE that + * │ │ onAfterCatchUp has run + * │ │ SYNC before control is returned + * │ │ POINT to the caller. + * │ ║ │ + * Projection::catchUp │ │ ║ ││ + * ────────────────┼────┼──────────────╬───────┼──────────────▶ + * │ │ ║ │ + * update Projection │ ║ │ + * state (old -> new) │ ║ │ + * │ TX commit │ + * update sequence (end of batch)│ + * number │ + * │ + * onAfterCatchUp + * => e.g. flush + * Fusion cache * * @internal */ @@ -120,20 +84,11 @@ class GraphProjectorCatchUpHookForCacheFlushing implements CatchUpHookInterface { private static bool $enabled = true; - /** - * @var array - */ - private array $flushNodeAggregateRequestsOnBeforeBatchCompleted = []; /** * @var array */ private array $flushNodeAggregateRequestsOnAfterCatchUp = []; - /** - * @var array - */ - private array $flushWorkspaceRequestsOnBeforeBatchCompleted = []; - /** * @var array */ @@ -259,7 +214,7 @@ private function scheduleCacheFlushJobForNodeAggregate( NodeAggregate $nodeAggregate ): void { // we store this in an associative array deduplicate. - $this->flushNodeAggregateRequestsOnBeforeBatchCompleted[$workspaceName->value . '__' . $nodeAggregate->nodeAggregateId->value] = FlushNodeAggregateRequest::create( + $this->flushNodeAggregateRequestsOnAfterCatchUp[$workspaceName->value . '__' . $nodeAggregate->nodeAggregateId->value] = FlushNodeAggregateRequest::create( $contentRepository->id, $workspaceName, $nodeAggregate->nodeAggregateId, @@ -273,7 +228,7 @@ private function scheduleCacheFlushJobForWorkspaceName( WorkspaceName $workspaceName ): void { // we store this in an associative array deduplicate. - $this->flushWorkspaceRequestsOnBeforeBatchCompleted[$workspaceName->value] = FlushWorkspaceRequest::create( + $this->flushWorkspaceRequestsOnAfterCatchUp[$workspaceName->value] = FlushWorkspaceRequest::create( $contentRepository->id, $workspaceName, ); @@ -300,17 +255,6 @@ private function determineAncestorNodeAggregateIds(WorkspaceName $workspaceName, public function onBeforeBatchCompleted(): void { - foreach ($this->flushNodeAggregateRequestsOnBeforeBatchCompleted as $index => $request) { - $this->contentCacheFlusher->flushNodeAggregate($request, CacheFlushingStrategy::IMMEDIATE); - $this->flushNodeAggregateRequestsOnAfterCatchUp[$index] = $request; - } - $this->flushNodeAggregateRequestsOnBeforeBatchCompleted = []; - - foreach ($this->flushWorkspaceRequestsOnBeforeBatchCompleted as $index => $request) { - $this->contentCacheFlusher->flushWorkspace($request, CacheFlushingStrategy::IMMEDIATE); - $this->flushWorkspaceRequestsOnAfterCatchUp[$index] = $request; - } - $this->flushWorkspaceRequestsOnBeforeBatchCompleted = []; } public function onAfterCatchUp(): void