Skip to content

Commit

Permalink
feat: disable test mode to dual write in dao. (#435)
Browse files Browse the repository at this point in the history
* feat: disable test mode to dual write in dao.

* add tests for mode changes.

* skip post update hook in test mode.
  • Loading branch information
RealChrisL authored Sep 24, 2024
1 parent bf2aa2e commit 12491aa
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
23 changes: 12 additions & 11 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResul
if (_emitAspectSpecificAuditEvent) {
if (_alwaysEmitAspectSpecificAuditEvent || !oldAndNewEqual) {
if (_trackingProducer != null) {
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, trackingContext,
IngestionMode.LIVE);
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp,
trackingContext, IngestionMode.LIVE);
} else {
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue, auditStamp, IngestionMode.LIVE);
}
Expand Down Expand Up @@ -767,15 +767,16 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
final Class<ASPECT> aspectClass = updateLambda.getAspectClass();
checkValidAspect(aspectClass);
// dual-write to test table while test mode is enabled.
if (updateLambda.getIngestionParams().isTestMode()) {
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
maxTransactionRetry);
}
final AddResult<ASPECT> result = runInTransactionWithRetry(() -> aspectUpdateHelper(urn,
new AspectUpdateLambda<>(aspectClass, updateLambda.getUpdateLambda(),
updateLambda.getIngestionParams().setTestMode(false)), auditStamp, trackingContext), maxTransactionRetry);
return unwrapAddResult(urn, result, auditStamp, trackingContext);

// default test mode is false being set in
// {@link #rawAdd(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)}}
final AddResult<ASPECT> result =
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
maxTransactionRetry);

// skip MAE producing and post update hook in test mode
return updateLambda.getIngestionParams().isTestMode() ? result.newValue
: unwrapAddResult(urn, result, auditStamp, trackingContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,43 +339,20 @@ public void testAddOne() {
@Test
public void testAddOneInTestMode() {
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY && !_enableChangeLog) {
Clock mockClock = mock(Clock.class);
when(mockClock.millis()).thenReturn(_now);
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
dao.setClock(mockClock);
FooUrn urn = makeFooUrn(1);
String aspectName = ModelUtils.getAspectName(AspectFoo.class);
AspectFoo expected = new AspectFoo().setValue("foo");
Urn actor = Urns.createFromTypeSpecificString("test", "actor");
Urn impersonator = Urns.createFromTypeSpecificString("test", "impersonator");

dao.add(urn, expected, makeAuditStamp(actor, impersonator, _now), null, new IngestionParams().setTestMode(true));

EbeanMetadataAspect aspectTest = getTestMetadata(urn, aspectName, 0);

assertNotNull(aspectTest);
assertEquals(aspectTest.getKey().getUrn(), urn.toString());
assertEquals(aspectTest.getKey().getAspect(), aspectName);
assertEquals(aspectTest.getKey().getVersion(), 0);
assertEquals(aspectTest.getCreatedOn(), new Timestamp(_now));
assertEquals(aspectTest.getCreatedBy(), "urn:li:test:actor");

AspectFoo actualTest = RecordUtils.toRecordTemplate(AspectFoo.class, aspectTest.getMetadata());
assertEquals(actualTest, expected);
AspectFoo foo = new AspectFoo().setValue("foo");
IngestionParams ingestionParams = new IngestionParams().setTestMode(true);
dao.setAlwaysEmitAuditEvent(false);
dao.setAlwaysEmitAspectSpecificAuditEvent(false);

EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0);

assertNotNull(aspect);
assertEquals(aspect.getKey().getUrn(), urn.toString());
assertEquals(aspect.getKey().getAspect(), aspectName);
assertEquals(aspect.getKey().getVersion(), 0);
assertEquals(aspect.getCreatedOn(), new Timestamp(_now));
assertEquals(aspect.getCreatedBy(), "urn:li:test:actor");
dao.add(urn, foo, _dummyAuditStamp, null, ingestionParams);

AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata());
assertEquals(actual, expected);
verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, expected);
// no MAE should be emitted in test mode
verifyNoMoreInteractions(_mockProducer);

BaseLocalDAO.AspectEntry<AspectFoo> aspectEntry = dao.getLatest(urn, AspectFoo.class, true);
assertEquals(aspectEntry.getAspect().getValue(), "foo");
}
}

Expand Down Expand Up @@ -406,6 +383,30 @@ public void testAddTwo() {
verifyNoMoreInteractions(_mockProducer);
}

@Test
public void testAddTwoInTestMode() throws URISyntaxException {
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY && !_enableChangeLog) {
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectBar bar = new AspectBar().setValue("bar");
IngestionParams ingestionParams = new IngestionParams().setTestMode(true);
dao.setAlwaysEmitAuditEvent(false);
dao.setAlwaysEmitAspectSpecificAuditEvent(false);

dao.add(urn, foo, _dummyAuditStamp, null, ingestionParams);
dao.add(urn, bar, _dummyAuditStamp, null, ingestionParams);

// no MAE should be emitted in test mode
verifyNoMoreInteractions(_mockProducer);

BaseLocalDAO.AspectEntry<AspectFoo> aspectFooEntry = dao.getLatest(urn, AspectFoo.class, true);
assertEquals(aspectFooEntry.getAspect().getValue(), "foo");
BaseLocalDAO.AspectEntry<AspectBar> aspectBarEntry = dao.getLatest(urn, AspectBar.class, true);
assertEquals(aspectBarEntry.getAspect().getValue(), "bar");
}
}

@Test
public void testAddWithIngestionAnnotation() throws URISyntaxException {
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ ALTER TABLE metadata_entity_foo_test ADD a_aspectfoo JSON;
-- add bar aspect to foo entity
ALTER TABLE metadata_entity_foo ADD a_aspectbar JSON;

-- add bar aspect to foo test entity
ALTER TABLE metadata_entity_foo_test ADD a_aspectbar JSON;

-- add foobar aspect to foo entity
ALTER TABLE metadata_entity_foo ADD a_aspectfoobar JSON;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ ALTER TABLE metadata_entity_foo_test ADD a_aspectfoo JSON;
-- add bar aspect to foo entity
ALTER TABLE metadata_entity_foo ADD a_aspectbar JSON;

-- add bar aspect to foo test entity
ALTER TABLE metadata_entity_foo_test ADD a_aspectbar JSON;

-- add foobar aspect to foo entity
ALTER TABLE metadata_entity_foo ADD a_aspectfoobar JSON;

Expand Down

0 comments on commit 12491aa

Please sign in to comment.