Skip to content

Commit

Permalink
include ingestion mode in non-tracking producer (#368)
Browse files Browse the repository at this point in the history
  • Loading branch information
JiaoMaWHU authored Mar 5, 2024
1 parent c08c8c0 commit ea78917
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResul
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, trackingContext,
IngestionMode.LIVE);
} else {
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp);
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, IngestionMode.LIVE);
}
}
}
Expand Down Expand Up @@ -1271,7 +1271,7 @@ private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode

_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, trackingContext, ingestionMode);
} else {
_producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
_producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, ingestionMode);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionMode;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -58,7 +59,8 @@ public abstract <ASPECT extends RecordTemplate> void produceMetadataAuditEvent(@
* @param oldValue the value prior to the update, or null if there's none.
* @param newValue the value after the update
* @param auditStamp {@link AuditStamp} containing version auditing information for the metadata change
* @param ingestionMode {@link IngestionMode} of the change
*/
public abstract <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp);
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp, @Nullable IngestionMode ingestionMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public BaseTrackingMetadataEventProducer(@Nonnull Class<SNAPSHOT> snapshotClass,
}

/**
* Same as inherited method {@link #produceAspectSpecificMetadataAuditEvent(Urn, RecordTemplate, RecordTemplate, AuditStamp)}
* Same as inherited method {@link #produceAspectSpecificMetadataAuditEvent(Urn, RecordTemplate, RecordTemplate, AuditStamp, IngestionMode)}
* but with tracking context.
* Produces an aspect specific Metadata Audit Event (MAE) after a metadata aspect is updated for an entity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dummy.DummyAspect;
import com.linkedin.metadata.dummy.DummySnapshot;
import com.linkedin.metadata.events.IngestionMode;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -33,7 +34,7 @@ public <ASPECT extends RecordTemplate> void produceMetadataAuditEvent(@Nonnull U

@Override
public <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp) {
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp, @Nullable IngestionMode ingestionMode) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.testing.AspectVersioned;
import com.linkedin.testing.EntityAspectUnionVersioned;
Expand Down Expand Up @@ -94,13 +95,13 @@ public void testMAEEmissionOnVerChange() throws URISyntaxException {
_dummyLocalDAO.add(urn, ver020201OldValue, auditStamp4);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp, IngestionMode.LIVE);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo1, ver010101);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, ver010101, auditStamp2);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, ver010101, auditStamp2, IngestionMode.LIVE);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, ver010101, ver020101);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver010101, ver020101, auditStamp3);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver010101, ver020101, auditStamp3, IngestionMode.LIVE);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, ver020101, ver020201OldValue);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver020101, ver020201OldValue, auditStamp4);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, ver020101, ver020201OldValue, auditStamp4, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockEventProducer);
}

Expand All @@ -121,7 +122,7 @@ public void testMAEEmissionVerNoChange() throws URISyntaxException {
_dummyLocalDAO.add(urn, ver020101, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, ver020101);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, ver020101, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, ver020101, _dummyAuditStamp, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockEventProducer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testMAEEmissionAlways() throws URISyntaxException {
_dummyLocalDAO.add(urn, foo, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp, IngestionMode.LIVE);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, foo);
verifyNoMoreInteractions(_mockEventProducer);
}
Expand All @@ -271,9 +271,9 @@ public void testMAEEmissionOnValueChange() throws URISyntaxException {
_dummyLocalDAO.add(urn, foo2, auditStamp2);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp, IngestionMode.LIVE);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo1, foo2);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, foo2, auditStamp2);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, foo2, auditStamp2, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockEventProducer);
}

Expand All @@ -292,7 +292,7 @@ public void testMAEEmissionNoValueChange() throws URISyntaxException {
_dummyLocalDAO.add(urn, foo3, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockEventProducer);
}

Expand All @@ -308,7 +308,7 @@ public void testMAEWithNullValue() throws URISyntaxException {
_dummyLocalDAO.delete(urn, AspectFoo.class, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp, IngestionMode.LIVE);
// TODO: ensure MAE is produced with newValue set as null for soft deleted aspect
// verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, null);
verifyNoMoreInteractions(_mockEventProducer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public void testCreateDummyMetadataEventProducer() {
AspectFoo newValue = new AspectFoo().setValue("new");

producer.produceMetadataAuditEvent(urn, oldValue, newValue);
producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, null);
producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ public void testBackfill() {
Optional<AspectFoo> foo = dao.backfill(AspectFoo.class, urn);

assertEquals(foo.get(), expected);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, expected, expected, null);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, expected, expected, null, IngestionMode.BOOTSTRAP);
verifyNoMoreInteractions(_mockProducer);
}

Expand Down Expand Up @@ -856,7 +856,7 @@ public void testBackfillMultipleAspectsMultipleUrns() {
for (Class<? extends RecordTemplate> clazz : aspects.get(urn).keySet()) {
RecordTemplate aspect = aspects.get(urn).get(clazz);
assertEquals(backfilledAspects.get(urn).get(clazz).get(), aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, IngestionMode.BOOTSTRAP);
}
}
verifyNoMoreInteractions(_mockProducer);
Expand Down Expand Up @@ -892,7 +892,7 @@ public void testBackfillMAEOnlyPresentInDBSuccess() {
for (Class<? extends RecordTemplate> clazz : aspects.get(urn).keySet()) {
assertTrue(backfilledAspects.get(urn.toString()).contains(getAspectName(clazz)));
RecordTemplate metadata = aspects.get(urn).get(clazz);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null, IngestionMode.BOOTSTRAP);
}
assertFalse(backfilledAspects.get(urn.toString()).contains(getAspectName(AspectFooBar.class)));
}
Expand Down Expand Up @@ -929,7 +929,7 @@ public void testBackfillMAEOnlySelectedAspectsSuccess() {
for (Class<? extends RecordTemplate> clazz : aspects.get(urn).keySet()) {
assertTrue(backfilledAspects.get(urn.toString()).contains(getAspectName(clazz)));
RecordTemplate metadata = aspects.get(urn).get(clazz);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null, IngestionMode.BOOTSTRAP);
}
assertFalse(backfilledAspects.get(urn.toString()).contains(getAspectName(AspectBar.class)));
}
Expand Down Expand Up @@ -963,7 +963,7 @@ public void testBackfillMAENullAspectsSuccess() {
for (Class<? extends RecordTemplate> clazz : aspects.get(urn).keySet()) {
assertTrue(backfilledAspects.get(urn.toString()).contains(getAspectName(clazz)));
RecordTemplate metadata = aspects.get(urn).get(clazz);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, metadata, metadata, null, IngestionMode.BOOTSTRAP);
}
}
verifyNoMoreInteractions(_mockProducer);
Expand Down

0 comments on commit ea78917

Please sign in to comment.