From e121bb0411d89b749efb963ad9aa27697ca6abc7 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Tue, 10 Dec 2024 09:06:25 -0600 Subject: [PATCH] fix(entity-service): prevent muatition of systemMetdata on prev --- .../metadata/entity/EntityServiceImpl.java | 30 ++++++++++++------- .../metadata/entity/EntityServiceTest.java | 5 ++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 059a6b7ed0aea3..d14990f93d22d9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -2536,16 +2536,12 @@ private UpdateAspectResult ingestAspectToLocalDB( Optional latestSystemMetadataDiff = systemMetadataDiff( txContext, + writeItem.getUrn(), previousBatchAspect.getSystemMetadata(), writeItem.getSystemMetadata(), databaseAspect == null ? null : databaseAspect.getSystemMetadata()); if (latestSystemMetadataDiff.isPresent()) { - // Update previous version since that is what is re-written - previousBatchAspect - .getEntityAspect() - .setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadataDiff.get())); - // Inserts & update order is not guaranteed, flush the insert for potential updates within // same tx if (databaseAspect == null && txContext != null) { @@ -2560,13 +2556,25 @@ private UpdateAspectResult ingestAspectToLocalDB( conditionalLogLevel( txContext, String.format( - "Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newAspect: %s", + "Update aspect with name %s, urn %s, txContext: %s, databaseAspect: %s, newMetadata: %s newSystemMetadata: %s", previousBatchAspect.getAspectName(), previousBatchAspect.getUrn(), txContext != null, databaseAspect == null ? null : databaseAspect.getEntityAspect(), - previousBatchAspect.getEntityAspect())); - aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false); + previousBatchAspect.getEntityAspect().getMetadata(), + latestSystemMetadataDiff.get())); + + aspectDao.saveAspect( + txContext, + previousBatchAspect.getUrnRaw(), + previousBatchAspect.getAspectName(), + previousBatchAspect.getMetadataRaw(), + previousBatchAspect.getCreatedBy(), + null, + previousBatchAspect.getCreatedOn(), + RecordUtils.toJsonString(latestSystemMetadataDiff.get()), + previousBatchAspect.getVersion(), + false); // metrics aspectDao.incrementWriteMetrics( @@ -2661,13 +2669,14 @@ private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspec private static Optional systemMetadataDiff( @Nullable TransactionContext txContext, + @Nonnull Urn urn, @Nullable SystemMetadata previous, @Nonnull SystemMetadata current, @Nullable SystemMetadata database) { SystemMetadata latestSystemMetadata = GenericRecordUtils.copy(previous, SystemMetadata.class); - latestSystemMetadata.setLastRunId(previous.getRunId(), SetMode.REMOVE_IF_NULL); + latestSystemMetadata.setLastRunId(latestSystemMetadata.getRunId(), SetMode.REMOVE_IF_NULL); latestSystemMetadata.setLastObserved(current.getLastObserved(), SetMode.IGNORE_NULL); latestSystemMetadata.setRunId(current.getRunId(), SetMode.REMOVE_IF_NULL); @@ -2677,7 +2686,8 @@ private static Optional systemMetadataDiff( conditionalLogLevel( txContext, String.format( - "systemMetdataDiff: %s != %s AND %s", + "systemMetdataDiff urn %s, %s != %s AND %s", + urn, RecordUtils.toJsonString(latestSystemMetadata), previous == null ? null : RecordUtils.toJsonString(previous), database == null ? null : RecordUtils.toJsonString(database))); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index 18d277cacbbe26..4c42815a80f3f1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -627,7 +627,7 @@ public void testReingestLineageAspect() throws Exception { restateChangeLog.setSystemMetadata(futureSystemMetadata); restateChangeLog.setPreviousAspectValue(aspect); restateChangeLog.setPreviousSystemMetadata( - simulatePullFromDB(futureSystemMetadata, SystemMetadata.class)); + simulatePullFromDB(initialSystemMetadata, SystemMetadata.class)); restateChangeLog.setEntityKeyAspect( GenericRecordUtils.serializeAspect( EntityKeyUtils.convertUrnToEntityKey( @@ -705,8 +705,7 @@ public void testReingestLineageProposal() throws Exception { restateChangeLog.setAspect(genericAspect); restateChangeLog.setSystemMetadata(futureSystemMetadata); restateChangeLog.setPreviousAspectValue(genericAspect); - restateChangeLog.setPreviousSystemMetadata( - simulatePullFromDB(futureSystemMetadata, SystemMetadata.class)); + restateChangeLog.setPreviousSystemMetadata(simulatePullFromDB(metadata1, SystemMetadata.class)); Map latestAspects = _entityServiceImpl.getLatestAspectsForUrn(