Skip to content

Commit

Permalink
Add naive approach to backfilling (#314)
Browse files Browse the repository at this point in the history
* feat(backfill): Add naive backfill logic where we compare emitTime against lastModifiedTime

* Log urn as well as aspect class

* also log urn when event is skipped

* Better logging

---------

Co-authored-by: Derek Pham <[email protected]>
  • Loading branch information
derekpham and Derek Pham authored Nov 13, 2023
1 parent dc5d903 commit f57f34f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
22 changes: 22 additions & 0 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,28 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
final ASPECT oldValue = latest.getAspect() == null ? null : latest.getAspect();
final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit();

boolean isBackfillEvent = trackingContext != null
&& trackingContext.hasBackfill() && trackingContext.isBackfill();
if (isBackfillEvent) {
boolean shouldBackfill =
// the time in old audit stamp represents last modified time of the aspect
// if the record doesn't exist, it will be null, which means we should process the record as normal
oldAuditStamp != null && oldAuditStamp.hasTime()
// ingestionTrackingContext if not null should always have emitTime. If emitTime doesn't exist within
// a non-null IngestionTrackingContext, it should be investigated. We'll also skip backfilling in this case
&& trackingContext.hasEmitTime()
// we should only process this backfilling event if the emit time is greater than last modified time
&& trackingContext.getEmitTime() > oldAuditStamp.getTime();

log.info("Encounter backfill event. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. "
+ "Based on this information, shouldBackfill = {}.",
trackingContext, urn, aspectClass, oldAuditStamp, shouldBackfill);

if (!shouldBackfill) {
return new AddResult<>(oldValue, oldValue, aspectClass);
}
}

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ record IngestionTrackingContext includes BaseTrackingContext {
* The time at which the ingestion event was emitted into kafka.
*/
emitTime: optional long

/**
* Whether this event is a re-emitted event for backfilling purposes
*/
backfill: boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.annotation.Nullable;
import org.mockito.stubbing.OngoingStubbing;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static com.linkedin.common.AuditStamps.*;
Expand Down Expand Up @@ -436,4 +437,85 @@ public void testAtomicUpdateDisabledUsesMultipleTransactions() throws URISyntaxE

verify(_mockTransactionRunner, times(2)).run(any());
}

@DataProvider(name = "addBackfillForNoopCases")
public Object[][] addBackfillForNoopCases() {
AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L);

// case 1 - emitTime doesn't exist
IngestionTrackingContext contextWithNoEmitTime = new IngestionTrackingContext();
contextWithNoEmitTime.setBackfill(true);

// case 2 - emitTime < old stamp
IngestionTrackingContext contextWithSmallEmitTime = new IngestionTrackingContext();
contextWithSmallEmitTime.setBackfill(true);
contextWithSmallEmitTime.setEmitTime(5L);

return new Object[][] {
{ contextWithNoEmitTime, oldAuditStamp },
{ contextWithSmallEmitTime, oldAuditStamp }
};
}

@Test(description = "Each test case represents a scenario where a backfill event should NOT be backfilled",
dataProvider = "addBackfillForNoopCases")
public void testAddBackfillEmitTimeLargerThanOldAuditTime(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp
) throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");

ExtraInfo extraInfo = new ExtraInfo();
extraInfo.setAudit(oldAuditStamp);
when(_mockGetLatestFunction.apply(any(), eq(AspectFoo.class)))
.thenReturn(new BaseLocalDAO.AspectEntry<AspectFoo>(null, extraInfo));

DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager,
_dummyLocalDAO._transactionRunner);
dummyLocalDAO.setEmitAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAuditEvent(true);
dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true);
expectGetLatest(urn, AspectFoo.class,
Arrays.asList(makeAspectEntry(null, oldAuditStamp), makeAspectEntry(foo, _dummyAuditStamp)));

dummyLocalDAO.add(urn, foo, _dummyAuditStamp, ingestionTrackingContext);

verify(_mockTrackingEventProducer, times(1)).produceMetadataAuditEvent(urn, null, null);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null,
null, _dummyAuditStamp, ingestionTrackingContext, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockTrackingEventProducer);
}

@Test(description = "Event should be processed for backfill event")
public void testAddForBackfill() throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");

ExtraInfo extraInfo = new ExtraInfo();
AuditStamp oldAuditStamp = makeAuditStamp("nonSusActor", 5L);
extraInfo.setAudit(oldAuditStamp);
when(_mockGetLatestFunction.apply(any(), eq(AspectFoo.class)))
.thenReturn(new BaseLocalDAO.AspectEntry<AspectFoo>(null, extraInfo));

DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager,
_dummyLocalDAO._transactionRunner);
dummyLocalDAO.setEmitAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAuditEvent(true);
dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true);
expectGetLatest(urn, AspectFoo.class,
Arrays.asList(makeAspectEntry(null, oldAuditStamp), makeAspectEntry(foo, _dummyAuditStamp)));

IngestionTrackingContext ingestionTrackingContext = new IngestionTrackingContext();
ingestionTrackingContext.setBackfill(true);
ingestionTrackingContext.setEmitTime(6L);

dummyLocalDAO.add(urn, foo, _dummyAuditStamp, ingestionTrackingContext);

verify(_mockTrackingEventProducer, times(1)).produceMetadataAuditEvent(urn, null, foo);
verify(_mockTrackingEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null,
foo, _dummyAuditStamp, ingestionTrackingContext, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockTrackingEventProducer);
}
}

0 comments on commit f57f34f

Please sign in to comment.