Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUGFIX: Harden events exporter #5374

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,9 @@ public function iExpectTheFollowingJsonL(PyStringNode $string): void
$eventsWithoutRandomIds = [];

foreach ($exportedEvents as $exportedEvent) {
// we have to remove the event id and initiatingTimestamp to make the events diff able
// we have to remove the event ids to make the events diff-able
$eventsWithoutRandomIds[] = $exportedEvent
->withIdentifier('random-event-uuid')
->processMetadata(function (array $metadata) {
$metadata[InitiatingEventMetadata::INITIATING_TIMESTAMP] = 'random-time';
return $metadata;
});
->withIdentifier('random-event-uuid');
}

Assert::assertSame($string->getRaw(), ExportedEvents::fromIterable($eventsWithoutRandomIds)->toJsonl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Feature: As a user of the CR I want to export the event stream using the EventExportProcessor

Background:
Given the current date and time is "2023-03-16T12:00:00+01:00"
Given using the following content dimensions:
| Identifier | Values | Generalizations |
| language | de, gsw, fr | gsw->de |
Expand All @@ -20,14 +21,12 @@ Feature: As a user of the CR I want to export the event stream using the EventEx
| Key | Value |
| nodeAggregateId | "lady-eleonode-rootford" |
| nodeTypeName | "Neos.ContentRepository:Root" |
And the event NodeAggregateWithNodeWasCreated was published with payload:
And the command CreateNodeAggregateWithNode is executed with payload:
| Key | Value |
| workspaceName | "live" |
| contentStreamId | "cs-identifier" |
| nodeAggregateId | "nody-mc-nodeface" |
| nodeTypeName | "Neos.ContentRepository.Testing:Document" |
| originDimensionSpacePoint | {"language":"de"} |
| coveredDimensionSpacePoints | [{"language":"de"},{"language":"gsw"},{"language":"fr"}] |
| parentNodeAggregateId | "lady-eleonode-rootford" |
| nodeName | "child-document" |
| nodeAggregateClassification | "regular" |
Expand All @@ -37,7 +36,7 @@ Feature: As a user of the CR I want to export the event stream using the EventEx
When the events are exported
Then I expect the following jsonl:
"""
{"identifier":"random-event-uuid","type":"RootNodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"lady-eleonode-rootford","nodeTypeName":"Neos.ContentRepository:Root","coveredDimensionSpacePoints":[{"language":"de"},{"language":"gsw"},{"language":"fr"}],"nodeAggregateClassification":"root"},"metadata":{"commandClass":"Neos\\ContentRepository\\Core\\Feature\\RootNodeCreation\\Command\\CreateRootNodeAggregateWithNode","commandPayload":{"workspaceName":"live","nodeAggregateId":"lady-eleonode-rootford","nodeTypeName":"Neos.ContentRepository:Root","tetheredDescendantNodeAggregateIds":[]},"initiatingUserId":"system","initiatingTimestamp":"random-time"}}
{"identifier":"random-event-uuid","type":"NodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"nody-mc-nodeface","nodeTypeName":"Neos.ContentRepository.Testing:Document","originDimensionSpacePoint":{"language":"de"},"succeedingSiblingsForCoverage":[{"dimensionSpacePoint":{"language":"de"},"nodeAggregateId":null},{"dimensionSpacePoint":{"language":"gsw"},"nodeAggregateId":null},{"dimensionSpacePoint":{"language":"fr"},"nodeAggregateId":null}],"parentNodeAggregateId":"lady-eleonode-rootford","nodeName":"child-document","initialPropertyValues":[],"nodeAggregateClassification":"regular","nodeReferences":[]},"metadata":{"initiatingTimestamp":"random-time"}}
{"identifier":"random-event-uuid","type":"RootNodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"lady-eleonode-rootford","nodeTypeName":"Neos.ContentRepository:Root","coveredDimensionSpacePoints":[{"language":"de"},{"language":"gsw"},{"language":"fr"}],"nodeAggregateClassification":"root"},"metadata":{"initiatingUserId":"system","initiatingTimestamp":"2023-03-16T12:00:00+01:00"}}
{"identifier":"random-event-uuid","type":"NodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"nody-mc-nodeface","nodeTypeName":"Neos.ContentRepository.Testing:Document","originDimensionSpacePoint":{"language":"de"},"succeedingSiblingsForCoverage":[{"dimensionSpacePoint":{"language":"de"},"nodeAggregateId":null},{"dimensionSpacePoint":{"language":"gsw"},"nodeAggregateId":null}],"parentNodeAggregateId":"lady-eleonode-rootford","nodeName":"child-document","initialPropertyValues":[],"nodeAggregateClassification":"regular","nodeReferences":[]},"metadata":{"initiatingUserId":"system","initiatingTimestamp":"2023-03-16T12:00:00+01:00"}}

"""
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ public static function fromRawEvent(Event $event): self
// unset content stream id as this is overwritten during import
unset($payload['contentStreamId'], $payload['workspaceName']);

$metaData = $event->metadata?->value ?? [];
unset($metaData['commandClass'], $metaData['commandPayload']);

return new self(
$event->id->value,
$event->type->value,
$payload,
$event->metadata?->value ?? [],
$metaData,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DISCUSSION: WHY do we export the metadata at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
$this->targetWorkspaceName,
$this->keepEventIds,
$serviceFactoryDependencies->eventStore,
$serviceFactoryDependencies->eventNormalizer,
$serviceFactoryDependencies->contentRepository,
$serviceFactoryDependencies->eventNormalizer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

namespace Neos\ContentRepository\Export\Processors;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Feature\ContentStreamEventStreamName;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated;
use Neos\ContentRepository\Core\Feature\WorkspaceEventStreamName;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Export\Event\ValueObject\ExportedEvent;
use Neos\ContentRepository\Export\ProcessingContext;
Expand All @@ -20,8 +21,11 @@
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Event\EventTypes;
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStream\EventStreamFilter;
use Neos\EventStore\Model\EventStream\ExpectedVersion;
use Neos\Flow\Utility\Algorithms;

Expand All @@ -34,8 +38,7 @@ public function __construct(
private WorkspaceName $targetWorkspaceName,
private bool $keepEventIds,
private EventStoreInterface $eventStore,
private EventNormalizer $eventNormalizer,
private ContentRepository $contentRepository,
private EventNormalizer $eventNormalizer
) {
}

Expand All @@ -48,15 +51,27 @@ public function run(ProcessingContext $context): void
/** @var array<string, string> $eventIdMap */
$eventIdMap = [];

$workspace = $this->contentRepository->findWorkspaceByName($this->targetWorkspaceName);
if ($workspace === null) {
throw new \InvalidArgumentException("Workspace {$this->targetWorkspaceName} does not exist", 1729530978);
$rootWorkspaceContentStreamId = null;
foreach ($this->eventStore->load(
WorkspaceEventStreamName::fromWorkspaceName($this->targetWorkspaceName)->getEventStreamName(),
EventStreamFilter::create(EventTypes::create(EventType::fromString('RootWorkspaceWasCreated')))
) as $eventEnvelope) {
$rootWorkspaceWasCreatedEvent = $this->eventNormalizer->denormalize($eventEnvelope->event);
if (!$rootWorkspaceWasCreatedEvent instanceof RootWorkspaceWasCreated) {
throw new \RuntimeException(sprintf('Expected event of type %s got %s', RootWorkspaceWasCreated::class, $rootWorkspaceWasCreatedEvent::class), 1732109840);
}
$rootWorkspaceContentStreamId = $rootWorkspaceWasCreatedEvent->newContentStreamId;
break;
}

if ($rootWorkspaceContentStreamId === null) {
throw new \InvalidArgumentException(sprintf('Workspace "%s" does not exist or is not a root workspace', $this->targetWorkspaceName), 1729530978);
}

while (($line = fgets($eventFileResource)) !== false) {
$event =
ExportedEvent::fromJson(trim($line))
->processPayload(fn (array $payload) => [...$payload, 'contentStreamId' => $workspace->currentContentStreamId->value, 'workspaceName' => $this->targetWorkspaceName->value]);
->processPayload(fn (array $payload) => [...$payload, 'contentStreamId' => $rootWorkspaceContentStreamId->value, 'workspaceName' => $this->targetWorkspaceName->value]);
if (!$this->keepEventIds) {
try {
$newEventId = Algorithms::generateUUID();
Expand Down Expand Up @@ -96,11 +111,11 @@ public function run(ProcessingContext $context): void
$domainEvents[] = $this->eventNormalizer->normalize($domainEvent);
}

$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)->getEventStreamName();
$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($rootWorkspaceContentStreamId)->getEventStreamName();
try {
$this->eventStore->commit($contentStreamStreamName, Events::fromArray($domainEvents), ExpectedVersion::fromVersion(Version::first()));
} catch (ConcurrencyException $e) {
throw new \RuntimeException(sprintf('Failed to publish %d events because the event stream "%s" for workspace "%s". Please prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $workspace->workspaceName->value), 1729506818, $e);
throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e);
}
}
}
Loading