Skip to content

Commit

Permalink
feat(auditing): add AuditStamp to MAE schema (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdonn authored Aug 25, 2023
1 parent b4696db commit 5d73123
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ record AuditStamp {

/**
* When did the resource/association/sub-resource move into the specific lifecycle stage represented by this AuditEvent.
* i.e. createdon column of metadata_aspect
*/
time: Time

/**
* The entity (e.g. a member URN) which will be credited for moving the resource/association/sub-resource into the specific lifecycle stage. It is also the one used to authorize the change.
* i.e. createdby column of metadata_aspect
*/
actor: Urn

/**
* The entity (e.g. a service URN) which performs the change on behalf of the Actor and must be authorized to act as the Actor.
* i.e. createdfor column of metadata_aspect
*/
impersonator: optional Urn
}
26 changes: 13 additions & 13 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,14 +500,14 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn,
}

// send the audit events etc
return results.stream().map(x -> unwrapAddResultToUnion(urn, x, trackingContext)).collect(Collectors.toList());
return results.stream().map(x -> unwrapAddResultToUnion(urn, x, auditStamp, trackingContext)).collect(Collectors.toList());
}

public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends RecordTemplate> aspectValues, AuditStamp auditStamp) {
public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp auditStamp) {
return addMany(urn, aspectValues, auditStamp, null);
}

public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends RecordTemplate> aspectValues, AuditStamp auditStamp,
public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext) {
List<AspectUpdateLambda<? extends RecordTemplate>> aspectUpdateLambdas = aspectValues.stream()
.map(AspectUpdateLambda::new)
Expand All @@ -516,8 +516,8 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends Reco
return addMany(urn, aspectUpdateLambdas, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext);
}

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple, AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext) {
private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
AspectEntry<ASPECT> latest = getLatest(urn, updateTuple.getAspectClass());

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
Expand Down Expand Up @@ -548,12 +548,12 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN
}

private <ASPECT extends RecordTemplate> ASPECT_UNION unwrapAddResultToUnion(URN urn, AddResult<ASPECT> result,
@Nullable IngestionTrackingContext trackingContext) {
ASPECT rawResult = unwrapAddResult(urn, result, trackingContext);
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
ASPECT rawResult = unwrapAddResult(urn, result, auditStamp, trackingContext);
return ModelUtils.newEntityUnion(_aspectUnionClass, rawResult);
}

private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResult<ASPECT> result,
private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResult<ASPECT> result, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext) {
Class<ASPECT> aspectClass = result.getKlass();
final ASPECT oldValue = result.getOldValue();
Expand All @@ -576,9 +576,9 @@ private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResul
if (_emitAspectSpecificAuditEvent) {
if (_alwaysEmitAspectSpecificAuditEvent || oldValue != newValue) {
if (_trackingProducer != null) {
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, trackingContext);
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, trackingContext);
} else {
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue);
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp);
}
}
}
Expand Down Expand Up @@ -652,7 +652,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
final AddResult<ASPECT> result = runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
maxTransactionRetry);

return unwrapAddResult(urn, result, trackingContext);
return unwrapAddResult(urn, result, auditStamp, trackingContext);
}

/**
Expand Down Expand Up @@ -1161,10 +1161,10 @@ private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode
trackingContext.setTrackingId(TrackingUtils.getRandomUUID());
trackingContext.setEmitter("dao_backfill_endpoint");
trackingContext.setEmitTime(System.currentTimeMillis());
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, trackingContext);
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, null, trackingContext);
} else {
_producer.produceMetadataAuditEvent(urn, oldValue, aspect);
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect);
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao.producer;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
Expand Down Expand Up @@ -56,7 +57,8 @@ public abstract <ASPECT extends RecordTemplate> void produceMetadataAuditEvent(@
* @param urn {@link Urn} of the entity
* @param oldValue the value prior to the update, or null if there's none.
* @param newValue the value after the update
* @param auditStamp {@link AuditStamp} containing version auditing information for the metadata change
*/
public abstract <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue);
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao.producer;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
Expand All @@ -16,15 +17,16 @@ public BaseTrackingMetadataEventProducer(@Nonnull Class<SNAPSHOT> snapshotClass,
}

/**
* Same as inherited method {@link #produceAspectSpecificMetadataAuditEvent(Urn, RecordTemplate, RecordTemplate)}
* Same as inherited method {@link #produceAspectSpecificMetadataAuditEvent(Urn, RecordTemplate, RecordTemplate, AuditStamp)}
* but with tracking context.
* Produces an aspect specific Metadata Audit Event (MAE) after a metadata aspect is updated for an entity.
*
* @param urn {@link Urn} of the entity
* @param oldValue the value prior to the update, or null if there's none.
* @param newValue the value after the update
* @param trackingContext nullable tracking context passed in to be appended to produced MAEv5s
* @param auditStamp {@link AuditStamp} containing version auditing information for the metadata change
*/
public abstract <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable IngestionTrackingContext trackingContext);
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao.producer;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dummy.DummyAspect;
Expand Down Expand Up @@ -32,7 +33,7 @@ public <ASPECT extends RecordTemplate> void produceMetadataAuditEvent(@Nonnull U

@Override
public <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue) {
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue, @Nullable AuditStamp auditStamp) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public void setup() {
_mockTransactionRunner = spy(DummyTransactionRunner.class);
_dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockEventProducer, _mockTransactionRunner);
_dummyLocalDAO.setEmitAuditEvent(true);
_dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
_dummyAuditStamp = makeAuditStamp("foo", 1234);
}

Expand Down Expand Up @@ -249,6 +250,7 @@ public void testMAEEmissionAlways() throws URISyntaxException {
_dummyLocalDAO.add(urn, foo, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, foo);
verifyNoMoreInteractions(_mockEventProducer);
}
Expand All @@ -263,10 +265,13 @@ public void testMAEEmissionOnValueChange() throws URISyntaxException {
Arrays.asList(makeAspectEntry(null, null), makeAspectEntry(foo1, _dummyAuditStamp)));

_dummyLocalDAO.add(urn, foo1, _dummyAuditStamp);
_dummyLocalDAO.add(urn, foo2, _dummyAuditStamp);
AuditStamp auditStamp2 = makeAuditStamp("tester", 5678L);
_dummyLocalDAO.add(urn, foo2, auditStamp2);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp);
verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo1, foo2);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo1, foo2, auditStamp2);
verifyNoMoreInteractions(_mockEventProducer);
}

Expand All @@ -285,6 +290,7 @@ public void testMAEEmissionNoValueChange() throws URISyntaxException {
_dummyLocalDAO.add(urn, foo3, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo1, _dummyAuditStamp);
verifyNoMoreInteractions(_mockEventProducer);
}

Expand All @@ -300,6 +306,7 @@ public void testMAEWithNullValue() throws URISyntaxException {
_dummyLocalDAO.delete(urn, AspectFoo.class, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp);
// TODO: ensure MAE is produced with newValue set as null for soft deleted aspect
// verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, null);
verifyNoMoreInteractions(_mockEventProducer);
Expand All @@ -324,8 +331,8 @@ public void testMAEv5WithTracking() throws URISyntaxException {

verify(_mockTrackingEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockTrackingEventProducer, times(1)).produceMetadataAuditEvent(urn, foo, foo);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, mockTrackingContext);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo, foo, mockTrackingContext);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, foo, _dummyAuditStamp, mockTrackingContext);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, foo, foo, _dummyAuditStamp, mockTrackingContext);
verifyNoMoreInteractions(_mockTrackingEventProducer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public void testCreateDummyMetadataEventProducer() {
AspectFoo newValue = new AspectFoo().setValue("new");

producer.produceMetadataAuditEvent(urn, oldValue, newValue);
producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue);
producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ public void ensureSchemaUpToDate() {
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp) {

final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
final String impersonator = auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null;
final boolean urnExtraction = _urnPathExtractor != null && !(_urnPathExtractor instanceof EmptyPathExtractor);

final SqlUpdate sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction))
.setParameter("urn", urn.toString())
.setParameter("lastmodifiedon", new Timestamp(System.currentTimeMillis()).toString())
.setParameter("lastmodifiedon", new Timestamp(timestamp).toString())
.setParameter("lastmodifiedby", actor);

// If a non-default UrnPathExtractor is provided, the user MUST specify in their schema generation scripts
Expand Down Expand Up @@ -122,8 +123,6 @@ public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPEC
// Add local relationships if builder is provided.
addRelationships(urn, newValue, aspectClass);

final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();

AuditedAspect auditedAspect = new AuditedAspect()
.setAspect(RecordUtils.toJsonString(newValue))
.setCanonicalName(aspectClass.getCanonicalName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public void testBackfill() {
assertEquals(foo.get(), expected);

verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, expected, expected);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, expected, expected);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, expected, expected, null);
verifyNoMoreInteractions(_mockProducer);
}

Expand Down Expand Up @@ -829,7 +829,7 @@ public void testBackfillMultipleAspectsMultipleUrns() {
RecordTemplate aspect = aspects.get(urn).get(clazz);
assertEquals(backfilledAspects.get(urn).get(clazz).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
}
}
verifyNoMoreInteractions(_mockProducer);
Expand Down Expand Up @@ -881,7 +881,7 @@ public void testBackfillUsingSCSI() {
RecordTemplate aspect = aspects.get(urn).get(AspectBar.class);
assertEquals(backfilledAspects.get(urn).get(AspectBar.class).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
}
clearInvocations(_mockProducer);

Expand All @@ -897,7 +897,7 @@ public void testBackfillUsingSCSI() {
RecordTemplate aspect = aspects.get(urn).get(AspectBar.class);
assertEquals(backfilledAspects.get(urn).get(AspectBar.class).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null);
}
verifyNoMoreInteractions(_mockProducer);
assertEquals(dao.listUrns(indexFilter, null, 3).size(), 3);
Expand All @@ -911,7 +911,7 @@ public void testBackfillUsingSCSI() {
RecordTemplate aspect = aspects.get(urn).get(AspectBar.class);
assertEquals(backfilledAspects.get(urn).get(AspectBar.class).get(), aspect);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, aspect);
verify(_mockProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, aspect, null);
}
verifyNoMoreInteractions(_mockProducer);
assertEquals(dao.listUrns(indexFilter, null, 3).size(), 3);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace com.linkedin.testing.mxe.bar.annotatedAspectBar

import com.linkedin.avro2pegasus.events.KafkaAuditHeader
import com.linkedin.common.AuditStamp
import com.linkedin.metadata.events.ChangeType
import com.linkedin.metadata.events.IngestionTrackingContext
import com.linkedin.testing.BarUrn
Expand Down Expand Up @@ -41,4 +42,9 @@ record MetadataAuditEvent {
* Tracking context to identify the lifecycle of the trackable ingestion item.
*/
ingestionTrackingContext: optional union[null, IngestionTrackingContext] = null

/**
* Audit info (i.e. createdon, createdby, createdfor) to track the version history of metadata changes.
*/
auditStamp: union[null, AuditStamp]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace @(eventSpec.getNamespace())

import com.linkedin.avro2pegasus.events.KafkaAuditHeader
import com.linkedin.common.AuditStamp
import com.linkedin.metadata.events.ChangeType
import com.linkedin.metadata.events.IngestionTrackingContext
import @eventSpec.getUrnType()
Expand Down Expand Up @@ -44,4 +45,9 @@ record MetadataAuditEvent {
* Tracking context to identify the lifecycle of the trackable ingestion item.
*/
ingestionTrackingContext: optional union[null, IngestionTrackingContext] = null

/**
* Audit info (i.e. createdon, createdby, createdfor) to track the version history of metadata changes.
*/
auditStamp: union[null, AuditStamp]
}
Loading

0 comments on commit 5d73123

Please sign in to comment.