Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/9.0' into task/schema-postgresql
Browse files Browse the repository at this point in the history
  • Loading branch information
kitsunet committed Dec 16, 2024
2 parents 7e15553 + f0c6111 commit b1168c4
Show file tree
Hide file tree
Showing 144 changed files with 6,947 additions and 1,979 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,4 @@ public function contentStream(): string
{
return $this->tableNamePrefix . '_contentstream';
}

public function checkpoint(): string
{
return $this->tableNamePrefix . '_checkpoint';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@
use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPublished;
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceRebaseFailed;
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\NodeTags;
Expand All @@ -73,7 +71,6 @@
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
use Neos\ContentRepository\Core\SharedModel\Node\ReferenceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;

/**
Expand All @@ -91,20 +88,13 @@ final class DoctrineDbalContentGraphProjection implements ContentGraphProjection

public const RELATION_DEFAULT_OFFSET = 128;

private DbalCheckpointStorage $checkpointStorage;

public function __construct(
private readonly Connection $dbal,
private readonly ProjectionContentGraph $projectionContentGraph,
private readonly ContentGraphTableNames $tableNames,
private readonly DimensionSpacePointsRepository $dimensionSpacePointsRepository,
private readonly ContentGraphReadModelInterface $contentGraphReadModel
) {
$this->checkpointStorage = new DbalCheckpointStorage(
$this->dbal,
$this->tableNames->checkpoint(),
self::class
);
}

public function setUp(): void
Expand All @@ -118,18 +108,10 @@ public function setUp(): void
throw new \RuntimeException(sprintf('Failed to setup projection %s: %s', self::class, $e->getMessage()), 1716478255, $e);
}
}
$this->checkpointStorage->setUp();
}

public function status(): ProjectionStatus
{
$checkpointStorageStatus = $this->checkpointStorage->status();
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) {
return ProjectionStatus::error($checkpointStorageStatus->details);
}
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) {
return ProjectionStatus::setupRequired($checkpointStorageStatus->details);
}
try {
$this->dbal->connect();
} catch (\Throwable $e) {
Expand All @@ -147,59 +129,16 @@ public function status(): ProjectionStatus
return ProjectionStatus::ok();
}

public function reset(): void
public function resetState(): void
{
$this->truncateDatabaseTables();

$this->checkpointStorage->acquireLock();
$this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none());
}

public function getCheckpointStorage(): DbalCheckpointStorage
{
return $this->checkpointStorage;
}

public function getState(): ContentGraphReadModelInterface
{
return $this->contentGraphReadModel;
}

public function canHandle(EventInterface $event): bool
{
return in_array($event::class, [
ContentStreamWasClosed::class,
ContentStreamWasCreated::class,
ContentStreamWasForked::class,
ContentStreamWasRemoved::class,
ContentStreamWasReopened::class,
DimensionShineThroughWasAdded::class,
DimensionSpacePointWasMoved::class,
NodeAggregateNameWasChanged::class,
NodeAggregateTypeWasChanged::class,
NodeAggregateWasMoved::class,
NodeAggregateWasRemoved::class,
NodeAggregateWithNodeWasCreated::class,
NodeGeneralizationVariantWasCreated::class,
NodePeerVariantWasCreated::class,
NodePropertiesWereSet::class,
NodeReferencesWereSet::class,
NodeSpecializationVariantWasCreated::class,
RootNodeAggregateDimensionsWereUpdated::class,
RootNodeAggregateWithNodeWasCreated::class,
RootWorkspaceWasCreated::class,
SubtreeWasTagged::class,
SubtreeWasUntagged::class,
WorkspaceBaseWorkspaceWasChanged::class,
WorkspaceRebaseFailed::class,
WorkspaceWasCreated::class,
WorkspaceWasDiscarded::class,
WorkspaceWasPublished::class,
WorkspaceWasRebased::class,
WorkspaceWasRemoved::class,
]) || $event instanceof EmbedsContentStreamId;
}

public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
{
match ($event::class) {
Expand Down Expand Up @@ -232,7 +171,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
WorkspaceWasPublished::class => $this->whenWorkspaceWasPublished($event),
WorkspaceWasRebased::class => $this->whenWorkspaceWasRebased($event),
WorkspaceWasRemoved::class => $this->whenWorkspaceWasRemoved($event),
default => $event instanceof EmbedsContentStreamId || throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))),
default => null,
};
if (
$event instanceof EmbedsContentStreamId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\DimensionSpacePointsRepository;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\NodeFactory;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\ProjectionContentGraph;
use Neos\ContentRepository\Core\Factory\ProjectionFactoryDependencies;
use Neos\ContentRepository\Core\Factory\SubscriberFactoryDependencies;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionFactoryInterface;

/**
Expand All @@ -24,8 +24,7 @@ public function __construct(
}

public function build(
ProjectionFactoryDependencies $projectionFactoryDependencies,
array $options,
SubscriberFactoryDependencies $projectionFactoryDependencies,
): DoctrineDbalContentGraphProjection {
$tableNames = ContentGraphTableNames::create(
$projectionFactoryDependencies->contentRepositoryId
Expand All @@ -35,7 +34,7 @@ public function build(

$nodeFactory = new NodeFactory(
$projectionFactoryDependencies->contentRepositoryId,
$projectionFactoryDependencies->propertyConverter,
$projectionFactoryDependencies->getPropertyConverter(),
$dimensionSpacePointsRepository
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
namespace Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\ContentStreamForking;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeCreation;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeModification;
Expand All @@ -26,7 +25,6 @@
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeVariation;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\SubtreeTagging;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\SchemaBuilder\HypergraphSchemaBuilder;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\NodeCreation\Event\NodeAggregateWithNodeWasCreated;
Expand All @@ -41,12 +39,10 @@
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\EventStore\Model\EventEnvelope;

/**
Expand All @@ -66,7 +62,6 @@ final class HypergraphProjection implements ContentGraphProjectionInterface
use NodeTypeChange;
use NodeVariation;

private DbalCheckpointStorage $checkpointStorage;
private ProjectionHypergraph $projectionHypergraph;

public function __construct(
Expand All @@ -75,11 +70,6 @@ public function __construct(
private readonly ContentGraphReadModelInterface $contentGraphReadModel
) {
$this->projectionHypergraph = new ProjectionHypergraph($this->dbal, $this->tableNamePrefix);
$this->checkpointStorage = new DbalCheckpointStorage(
$this->dbal,
$this->tableNamePrefix . '_checkpoint',
self::class
);
}


Expand All @@ -97,18 +87,10 @@ public function setUp(): void
create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');
$this->checkpointStorage->setUp();
}

public function status(): ProjectionStatus
{
$checkpointStorageStatus = $this->checkpointStorage->status();
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) {
return ProjectionStatus::error($checkpointStorageStatus->details);
}
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) {
return ProjectionStatus::setupRequired($checkpointStorageStatus->details);
}
try {
$this->getDatabaseConnection()->connect();
} catch (\Throwable $e) {
Expand All @@ -135,12 +117,9 @@ private function determineRequiredSqlStatements(): array
return DbalSchemaDiff::determineRequiredSqlStatements($this->dbal, $schema);
}

public function reset(): void
public function resetState(): void
{
$this->truncateDatabaseTables();

$this->checkpointStorage->acquireLock();
$this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none());
}

private function truncateDatabaseTables(): void
Expand All @@ -151,39 +130,6 @@ private function truncateDatabaseTables(): void
$this->dbal->executeQuery('TRUNCATE table ' . $this->tableNamePrefix . '_restrictionhyperrelation');
}

public function canHandle(EventInterface $event): bool
{
return in_array($event::class, [
// ContentStreamForking
ContentStreamWasForked::class,
// NodeCreation
RootNodeAggregateWithNodeWasCreated::class,
NodeAggregateWithNodeWasCreated::class,
// SubtreeTagging
SubtreeWasTagged::class,
SubtreeWasUntagged::class,
// NodeModification
NodePropertiesWereSet::class,
// NodeReferencing
NodeReferencesWereSet::class,
// NodeRemoval
NodeAggregateWasRemoved::class,
// NodeRenaming
NodeAggregateNameWasChanged::class,
// NodeTypeChange
NodeAggregateTypeWasChanged::class,
// NodeVariation
NodeSpecializationVariantWasCreated::class,
NodeGeneralizationVariantWasCreated::class,
NodePeerVariantWasCreated::class,
// TODO: not yet supported:
//ContentStreamWasRemoved::class,
//DimensionSpacePointWasMoved::class,
//DimensionShineThroughWasAdded::class,
//NodeAggregateWasMoved::class,
]);
}

public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
{
match ($event::class) {
Expand All @@ -209,7 +155,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
NodeSpecializationVariantWasCreated::class => $this->whenNodeSpecializationVariantWasCreated($event),
NodeGeneralizationVariantWasCreated::class => $this->whenNodeGeneralizationVariantWasCreated($event),
NodePeerVariantWasCreated::class => $this->whenNodePeerVariantWasCreated($event),
default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))),
default => null,
};
}

Expand All @@ -228,11 +174,6 @@ public function inSimulation(\Closure $fn): mixed
}
}

public function getCheckpointStorage(): DbalCheckpointStorage
{
return $this->checkpointStorage;
}

public function getState(): ContentGraphReadModelInterface
{
return $this->contentGraphReadModel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Doctrine\DBAL\Connection;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\HypergraphProjection;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Repository\NodeFactory;
use Neos\ContentRepository\Core\Factory\ProjectionFactoryDependencies;
use Neos\ContentRepository\Core\Factory\SubscriberFactoryDependencies;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionFactoryInterface;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;

Expand All @@ -28,16 +28,15 @@ public static function graphProjectionTableNamePrefix(
}

public function build(
ProjectionFactoryDependencies $projectionFactoryDependencies,
array $options,
SubscriberFactoryDependencies $projectionFactoryDependencies,
): HypergraphProjection {
$tableNamePrefix = self::graphProjectionTableNamePrefix(
$projectionFactoryDependencies->contentRepositoryId
);

$nodeFactory = new NodeFactory(
$projectionFactoryDependencies->contentRepositoryId,
$projectionFactoryDependencies->propertyConverter
$projectionFactoryDependencies->getPropertyConverter()
);

return new HypergraphProjection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntries;
use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntryType;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\EventEnvelope;
use Neos\Flow\Annotations as Flow;

/**
* We had some race conditions in projections, where {@see \Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage} was not working properly.
* We had some race conditions in projections
* We saw some non-deterministic, random errors when running the tests - unluckily only on Linux, not on OSX:
* On OSX, forking a new subprocess in {@see SubprocessProjectionCatchUpTrigger} is *WAY* slower than in Linux;
* and thus the race conditions which appears if two projector instances of the same class run concurrently
Expand Down Expand Up @@ -73,7 +74,7 @@
*
* When {@see onBeforeEvent} is called, we know that we are inside applyEvent() in the diagram above,
* thus we know the lock *HAS* been acquired.
* When {@see onBeforeBatchCompleted}is called, we know the lock will be released directly afterwards.
* When {@see onAfterCatchUp}is called, we know the lock will be released directly afterwards.
*
* We track these timings across processes in a single Redis Stream. Because Redis is single-threaded,
* we can be sure that we observe the correct, total order of interleavings *across multiple processes*
Expand Down Expand Up @@ -107,7 +108,7 @@ final class RaceTrackerCatchUpHook implements CatchUpHookInterface
protected $configuration;
private bool $inCriticalSection = false;

public function onBeforeCatchUp(): void
public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void
{
RedisInterleavingLogger::connect($this->configuration['redis']['host'], $this->configuration['redis']['port']);
}
Expand All @@ -126,16 +127,16 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event
{
}

public function onBeforeBatchCompleted(): void
public function onAfterBatchCompleted(): void
{
}

public function onAfterCatchUp(): void
{
// we only want to track relevant lock release calls (i.e. if we were in the event processing loop before)
if ($this->inCriticalSection) {
$this->inCriticalSection = false;
RedisInterleavingLogger::trace(TraceEntryType::LockWillBeReleasedIfItWasAcquiredBefore);
}
}

public function onAfterCatchUp(): void
{
}
}
Loading

0 comments on commit b1168c4

Please sign in to comment.