From ea7891784a5703670dfaee84acdcb63dfa1f3c32 Mon Sep 17 00:00:00 2001 From: Jinxin Ma Date: Mon, 4 Mar 2024 16:22:31 -0800 Subject: [PATCH] include ingestion mode in non-tracking producer (#368) --- .../java/com/linkedin/metadata/dao/BaseLocalDAO.java | 4 ++-- .../dao/producer/BaseMetadataEventProducer.java | 4 +++- .../producer/BaseTrackingMetadataEventProducer.java | 2 +- .../dao/producer/DummyMetadataEventProducer.java | 3 ++- .../metadata/dao/BaseLocalDAOAspectVersionTest.java | 11 ++++++----- .../com/linkedin/metadata/dao/BaseLocalDAOTest.java | 10 +++++----- .../dao/producer/DummyMetadataEventProducerTest.java | 2 +- .../com/linkedin/metadata/dao/EbeanLocalDAOTest.java | 10 +++++----- 8 files changed, 25 insertions(+), 21 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index cb656b42b..8ac841c5a 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -654,7 +654,7 @@ private ASPECT unwrapAddResult(URN urn, AddResul _trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, trackingContext, IngestionMode.LIVE); } else { - _producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp); + _producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, IngestionMode.LIVE); } } } @@ -1271,7 +1271,7 @@ private void backfill(@Nonnull BackfillMode mode _trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, trackingContext, ingestionMode); } else { - _producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null); + _producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, ingestionMode); } } } diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java b/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java index cf2cc070f..54c08d801 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java @@ -5,6 +5,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.utils.ModelUtils; +import com.linkedin.metadata.events.IngestionMode; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -58,7 +59,8 @@ public abstract void produceMetadataAuditEvent(@ * @param oldValue the value prior to the update, or null if there's none. * @param newValue the value after the update * @param auditStamp {@link AuditStamp} containing version auditing information for the metadata change + * @param ingestionMode {@link IngestionMode} of the change */ public abstract void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn, - @Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp); + @Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp, @Nullable IngestionMode ingestionMode); } diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseTrackingMetadataEventProducer.java b/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseTrackingMetadataEventProducer.java index 0b6dbadb9..34099d469 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseTrackingMetadataEventProducer.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/producer/BaseTrackingMetadataEventProducer.java @@ -18,7 +18,7 @@ public BaseTrackingMetadataEventProducer(@Nonnull Class snapshotClass, } /** - * Same as inherited method {@link #produceAspectSpecificMetadataAuditEvent(Urn, RecordTemplate, RecordTemplate, AuditStamp)} + * Same as inherited method {@link #produceAspectSpecificMetadataAuditEvent(Urn, RecordTemplate, RecordTemplate, AuditStamp, IngestionMode)} * but with tracking context. * Produces an aspect specific Metadata Audit Event (MAE) after a metadata aspect is updated for an entity. * diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java b/dao-api/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java index c1029efab..e0c7229f7 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java @@ -5,6 +5,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.dummy.DummyAspect; import com.linkedin.metadata.dummy.DummySnapshot; +import com.linkedin.metadata.events.IngestionMode; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -33,7 +34,7 @@ public void produceMetadataAuditEvent(@Nonnull U @Override public void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn, - @Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp) { + @Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp, @Nullable IngestionMode ingestionMode) { // Do nothing } } diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOAspectVersionTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOAspectVersionTest.java index d1e1abdd7..ea66b40e8 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOAspectVersionTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOAspectVersionTest.java @@ -9,6 +9,7 @@ import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.tracking.BaseTrackingManager; import com.linkedin.metadata.dao.utils.RecordUtils; +import com.linkedin.metadata.events.IngestionMode; import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.testing.AspectVersioned; import com.linkedin.testing.EntityAspectUnionVersioned; @@ -94,13 +95,13 @@ public void testMAEEmissionOnVerChange() throws URISyntaxException { _dummyLocalDAO.add(urn, ver020201OldValue, auditStamp4); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp, IngestionMode.LIVE); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo1, ver010101); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, ver010101, auditStamp2); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, ver010101, auditStamp2, IngestionMode.LIVE); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, ver010101, ver020101); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver010101, ver020101, auditStamp3); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver010101, ver020101, auditStamp3, IngestionMode.LIVE); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, ver020101, ver020201OldValue); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver020101, ver020201OldValue, auditStamp4); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver020101, ver020201OldValue, auditStamp4, IngestionMode.LIVE); verifyNoMoreInteractions(_mockEventProducer); } @@ -121,7 +122,7 @@ public void testMAEEmissionVerNoChange() throws URISyntaxException { _dummyLocalDAO.add(urn, ver020101, _dummyAuditStamp); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, ver020101); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, ver020101, _dummyAuditStamp); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, ver020101, _dummyAuditStamp, IngestionMode.LIVE); verifyNoMoreInteractions(_mockEventProducer); } diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index 4f8fbb2cc..4a840441f 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -252,7 +252,7 @@ public void testMAEEmissionAlways() throws URISyntaxException { _dummyLocalDAO.add(urn, foo, _dummyAuditStamp); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp, IngestionMode.LIVE); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, foo); verifyNoMoreInteractions(_mockEventProducer); } @@ -271,9 +271,9 @@ public void testMAEEmissionOnValueChange() throws URISyntaxException { _dummyLocalDAO.add(urn, foo2, auditStamp2); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp, IngestionMode.LIVE); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo1, foo2); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, foo2, auditStamp2); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, foo2, auditStamp2, IngestionMode.LIVE); verifyNoMoreInteractions(_mockEventProducer); } @@ -292,7 +292,7 @@ public void testMAEEmissionNoValueChange() throws URISyntaxException { _dummyLocalDAO.add(urn, foo3, _dummyAuditStamp); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp, IngestionMode.LIVE); verifyNoMoreInteractions(_mockEventProducer); } @@ -308,7 +308,7 @@ public void testMAEWithNullValue() throws URISyntaxException { _dummyLocalDAO.delete(urn, AspectFoo.class, _dummyAuditStamp); verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo); - verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp); + verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp, IngestionMode.LIVE); // TODO: ensure MAE is produced with newValue set as null for soft deleted aspect // verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, null); verifyNoMoreInteractions(_mockEventProducer); diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java index 5e1700b64..6da549bab 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java @@ -18,6 +18,6 @@ public void testCreateDummyMetadataEventProducer() { AspectFoo newValue = new AspectFoo().setValue("new"); producer.produceMetadataAuditEvent(urn, oldValue, newValue); - producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, null); + producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, null, null); } } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index 5ba32001c..96a6b65fa 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -770,7 +770,7 @@ public void testBackfill() { Optional foo = dao.backfill(AspectFoo.class, urn); assertEquals(foo.get(), expected); - verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, expected, expected, null); + verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, expected, expected, null, IngestionMode.BOOTSTRAP); verifyNoMoreInteractions(_mockProducer); } @@ -856,7 +856,7 @@ public void testBackfillMultipleAspectsMultipleUrns() { for (Class clazz : aspects.get(urn).keySet()) { RecordTemplate aspect = aspects.get(urn).get(clazz); assertEquals(backfilledAspects.get(urn).get(clazz).get(), aspect); - verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null); + verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, IngestionMode.BOOTSTRAP); } } verifyNoMoreInteractions(_mockProducer); @@ -892,7 +892,7 @@ public void testBackfillMAEOnlyPresentInDBSuccess() { for (Class clazz : aspects.get(urn).keySet()) { assertTrue(backfilledAspects.get(urn.toString()).contains(getAspectName(clazz))); RecordTemplate metadata = aspects.get(urn).get(clazz); - verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null); + verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null, IngestionMode.BOOTSTRAP); } assertFalse(backfilledAspects.get(urn.toString()).contains(getAspectName(AspectFooBar.class))); } @@ -929,7 +929,7 @@ public void testBackfillMAEOnlySelectedAspectsSuccess() { for (Class clazz : aspects.get(urn).keySet()) { assertTrue(backfilledAspects.get(urn.toString()).contains(getAspectName(clazz))); RecordTemplate metadata = aspects.get(urn).get(clazz); - verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null); + verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null, IngestionMode.BOOTSTRAP); } assertFalse(backfilledAspects.get(urn.toString()).contains(getAspectName(AspectBar.class))); } @@ -963,7 +963,7 @@ public void testBackfillMAENullAspectsSuccess() { for (Class clazz : aspects.get(urn).keySet()) { assertTrue(backfilledAspects.get(urn.toString()).contains(getAspectName(clazz))); RecordTemplate metadata = aspects.get(urn).get(clazz); - verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null); + verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null, IngestionMode.BOOTSTRAP); } } verifyNoMoreInteractions(_mockProducer);