Skip to content

Commit

Permalink
fix(entity-service): prevent muatition of systemMetdata on prev
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Dec 10, 2024
1 parent 61fffb2 commit bb14a04
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2536,16 +2536,12 @@ private UpdateAspectResult ingestAspectToLocalDB(
Optional<SystemMetadata> latestSystemMetadataDiff =
systemMetadataDiff(
txContext,
writeItem.getUrn(),
previousBatchAspect.getSystemMetadata(),
writeItem.getSystemMetadata(),
databaseAspect == null ? null : databaseAspect.getSystemMetadata());

if (latestSystemMetadataDiff.isPresent()) {
// Update previous version since that is what is re-written
previousBatchAspect
.getEntityAspect()
.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadataDiff.get()));

// Inserts & update order is not guaranteed, flush the insert for potential updates within
// same tx
if (databaseAspect == null && txContext != null) {
Expand All @@ -2560,13 +2556,25 @@ private UpdateAspectResult ingestAspectToLocalDB(
conditionalLogLevel(
txContext,
String.format(
"Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newAspect: %s",
"Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newMetadata: %s newSystemMetadata: %s",
previousBatchAspect.getAspectName(),
previousBatchAspect.getUrn(),
txContext != null,
databaseAspect == null ? null : databaseAspect.getEntityAspect(),
previousBatchAspect.getEntityAspect()));
aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
previousBatchAspect.getEntityAspect().getMetadata(),
latestSystemMetadataDiff.get()));

aspectDao.saveAspect(
txContext,
previousBatchAspect.getUrnRaw(),
previousBatchAspect.getAspectName(),
previousBatchAspect.getMetadataRaw(),
previousBatchAspect.getCreatedBy(),
null,
previousBatchAspect.getCreatedOn(),
RecordUtils.toJsonString(latestSystemMetadataDiff.get()),
previousBatchAspect.getVersion(),
false);

// metrics
aspectDao.incrementWriteMetrics(
Expand Down Expand Up @@ -2661,13 +2669,14 @@ private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspec

private static Optional<SystemMetadata> systemMetadataDiff(
@Nullable TransactionContext txContext,
@Nonnull Urn urn,
@Nullable SystemMetadata previous,
@Nonnull SystemMetadata current,
@Nullable SystemMetadata database) {

SystemMetadata latestSystemMetadata = GenericRecordUtils.copy(previous, SystemMetadata.class);

latestSystemMetadata.setLastRunId(previous.getRunId(), SetMode.REMOVE_IF_NULL);
latestSystemMetadata.setLastRunId(latestSystemMetadata.getRunId(), SetMode.REMOVE_IF_NULL);
latestSystemMetadata.setLastObserved(current.getLastObserved(), SetMode.IGNORE_NULL);
latestSystemMetadata.setRunId(current.getRunId(), SetMode.REMOVE_IF_NULL);

Expand All @@ -2677,7 +2686,8 @@ private static Optional<SystemMetadata> systemMetadataDiff(
conditionalLogLevel(
txContext,
String.format(
"systemMetdataDiff: %s != %s AND %s",
"systemMetdataDiff urn %s, %s != %s AND %s",
urn,
RecordUtils.toJsonString(latestSystemMetadata),
previous == null ? null : RecordUtils.toJsonString(previous),
database == null ? null : RecordUtils.toJsonString(database)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ public void testReingestLineageAspect() throws Exception {
restateChangeLog.setSystemMetadata(futureSystemMetadata);
restateChangeLog.setPreviousAspectValue(aspect);
restateChangeLog.setPreviousSystemMetadata(
simulatePullFromDB(futureSystemMetadata, SystemMetadata.class));
simulatePullFromDB(initialSystemMetadata, SystemMetadata.class));
restateChangeLog.setEntityKeyAspect(
GenericRecordUtils.serializeAspect(
EntityKeyUtils.convertUrnToEntityKey(
Expand Down Expand Up @@ -705,8 +705,7 @@ public void testReingestLineageProposal() throws Exception {
restateChangeLog.setAspect(genericAspect);
restateChangeLog.setSystemMetadata(futureSystemMetadata);
restateChangeLog.setPreviousAspectValue(genericAspect);
restateChangeLog.setPreviousSystemMetadata(
simulatePullFromDB(futureSystemMetadata, SystemMetadata.class));
restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class));

Map<String, RecordTemplate> latestAspects =
_entityServiceImpl.getLatestAspectsForUrn(
Expand Down

0 comments on commit bb14a04

Please sign in to comment.