Skip to content

Commit

Permalink
feat: invoke test mode to dual write from ingestion in DAOs. (#404)
Browse files Browse the repository at this point in the history
* invoke test mode to dual write from ingestion in DAOs.

* refactor utils and add tests
  • Loading branch information
RealChrisL authored Aug 31, 2024
1 parent fa7063d commit 3c10b71
Show file tree
Hide file tree
Showing 21 changed files with 457 additions and 236 deletions.
43 changes: 28 additions & 15 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,9 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
}

// Save the newValue as the latest version
long largestVersion = saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted, trackingContext);
long largestVersion =
saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted,
trackingContext, ingestionParams.isTestMode());

// Apply retention policy
applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion);
Expand Down Expand Up @@ -588,7 +590,7 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends Reco

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
AspectEntry<ASPECT> latest = getLatest(urn, updateTuple.getAspectClass());
AspectEntry<ASPECT> latest = getLatest(urn, updateTuple.getAspectClass(), updateTuple.getIngestionParams().isTestMode());

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -743,11 +745,16 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
checkValidAspect(updateLambda.getAspectClass());

final AddResult<ASPECT> result = runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
maxTransactionRetry);

final Class<ASPECT> aspectClass = updateLambda.getAspectClass();
checkValidAspect(aspectClass);
// dual-write to test table while test mode is enabled.
if (updateLambda.getIngestionParams().isTestMode()) {
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
maxTransactionRetry);
}
final AddResult<ASPECT> result = runInTransactionWithRetry(() -> aspectUpdateHelper(urn,
new AspectUpdateLambda<>(aspectClass, updateLambda.getUpdateLambda(),
updateLambda.getIngestionParams().setTestMode(false)), auditStamp, trackingContext), maxTransactionRetry);
return unwrapAddResult(urn, result, auditStamp, trackingContext);
}

Expand Down Expand Up @@ -779,7 +786,7 @@ public <ASPECT extends RecordTemplate> void delete(@Nonnull URN urn, @Nonnull Cl
checkValidAspect(aspectClass);

runInTransactionWithRetry(() -> {
final AspectEntry<ASPECT> latest = getLatest(urn, aspectClass);
final AspectEntry<ASPECT> latest = getLatest(urn, aspectClass, false);
final IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE);
return addCommon(urn, latest, null, aspectClass, auditStamp, new DefaultEqualityTester<>(), trackingContext, ingestionParams);
}, maxTransactionRetry);
Expand Down Expand Up @@ -821,10 +828,11 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, @Nullable IngestionParams ingestionParams) {
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
final IngestionParams nonNullIngestionParams =
ingestionParams == null ? new IngestionParams().setIngestionMode(IngestionMode.LIVE).setTestMode(false)
: ingestionParams;
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
}

Expand Down Expand Up @@ -873,12 +881,13 @@ private <ASPECT extends RecordTemplate> void applyRetention(@Nonnull URN urn, @N
* @param newEntry {@link RecordTemplate} of the new latest value of aspect
* @param newAuditStamp the audit stamp for the operation
* @param isSoftDeleted flag to indicate if the previous latest value of aspect was soft deleted
* @param isTestMode whether the test mode is enabled or not
* @return the largest version
*/
protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn,
@Nonnull Class<ASPECT> aspectClass, @Nullable ASPECT oldEntry, @Nullable AuditStamp oldAuditStamp,
@Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted,
@Nullable IngestionTrackingContext trackingContext);
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode);

/**
* Saves the new value of an aspect to entity tables. This is used when backfilling metadata from the old schema to
Expand Down Expand Up @@ -1059,11 +1068,12 @@ public ListResult<UrnAspectEntry<URN>> getAspects(@Nonnull Set<Class<? extends R
*
* @param urn {@link Urn} for the entity
* @param aspectClass the type of aspect to get
* @param isTestMode whether the test mode is enabled or not
* @return {@link AspectEntry} corresponding to the latest version of specific aspect, if it exists
*/
@Nonnull
protected abstract <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(@Nonnull URN urn,
@Nonnull Class<ASPECT> aspectClass);
@Nonnull Class<ASPECT> aspectClass, boolean isTestMode);

/**
* Gets the next version to use for an entity's specific aspect type.
Expand All @@ -1083,10 +1093,11 @@ protected abstract <ASPECT extends RecordTemplate> long getNextVersion(@Nonnull
* @param aspectClass the type of aspect to insert
* @param auditStamp the {@link AuditStamp} for the aspect
* @param version the version for the aspect
* @param isTestMode whether the test mode is enabled or not
*/
protected abstract <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullable RecordTemplate value,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version,
@Nullable IngestionTrackingContext trackingContext);
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode);

/**
* Update an aspect for an entity with specific version and {@link AuditStamp} with optimistic locking.
Expand All @@ -1097,10 +1108,12 @@ protected abstract <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn,
* @param newAuditStamp the {@link AuditStamp} for the new aspect
* @param version the version for the aspect
* @param oldTimestamp the timestamp for the old aspect
* @param isTestMode whether the test mode is enabled or not
*/
protected abstract <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonnull URN urn,
@Nullable RecordTemplate value, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp newAuditStamp,
long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext);
long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext,
boolean isTestMode);

/**
* Returns a boolean representing if an Urn has any Aspects associated with it (i.e. if it exists in the DB).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ public <URN extends Urn> void removeEntity(@Nonnull URN urn) throws Exception {
* @param relationship the relationship to be persisted
* @param <RELATIONSHIP> relationship type. Must be a type defined in com.linkedin.metadata.relationship.
*/
public <RELATIONSHIP extends RecordTemplate> void addRelationship(@Nonnull RELATIONSHIP relationship)
public <RELATIONSHIP extends RecordTemplate> void addRelationship(@Nonnull RELATIONSHIP relationship, boolean isTestMode)
throws Exception {
addRelationship(relationship, RemovalOption.REMOVE_NONE);
addRelationship(relationship, RemovalOption.REMOVE_NONE, isTestMode);
}

/**
* Adds a relationship in the graph, with removal operations before adding.
*
Expand All @@ -76,8 +75,8 @@ public <RELATIONSHIP extends RecordTemplate> void addRelationship(@Nonnull RELAT
* @param <RELATIONSHIP> relationship type. Must be a type defined in com.linkedin.metadata.relationship.
*/
public <RELATIONSHIP extends RecordTemplate> void addRelationship(@Nonnull RELATIONSHIP relationship,
@Nonnull RemovalOption removalOption) throws Exception {
addRelationships(Collections.singletonList(relationship), removalOption);
@Nonnull RemovalOption removalOption, boolean isTestMode) throws Exception {
addRelationships(Collections.singletonList(relationship), removalOption, isTestMode);
}

/**
Expand All @@ -86,9 +85,9 @@ public <RELATIONSHIP extends RecordTemplate> void addRelationship(@Nonnull RELAT
* @param relationships the list of relationships to be persisted
* @param <RELATIONSHIP> relationship type. Must be a type defined in com.linkedin.metadata.relationship.
*/
public <RELATIONSHIP extends RecordTemplate> void addRelationships(@Nonnull List<RELATIONSHIP> relationships)
throws Exception {
addRelationships(relationships, RemovalOption.REMOVE_NONE);
public <RELATIONSHIP extends RecordTemplate> void addRelationships(@Nonnull List<RELATIONSHIP> relationships,
boolean isTestMode) throws Exception {
addRelationships(relationships, RemovalOption.REMOVE_NONE, isTestMode);
}

/**
Expand All @@ -99,7 +98,7 @@ public <RELATIONSHIP extends RecordTemplate> void addRelationships(@Nonnull List
* @param <RELATIONSHIP> relationship type. Must be a type defined in com.linkedin.metadata.relationship.
*/
public abstract <RELATIONSHIP extends RecordTemplate> void addRelationships(@Nonnull List<RELATIONSHIP> relationships,
@Nonnull RemovalOption removalOption) throws Exception;
@Nonnull RemovalOption removalOption, boolean isTestMode) throws Exception;

/**
* Deletes an relationship in the graph.
Expand All @@ -120,4 +119,4 @@ public <RELATIONSHIP extends RecordTemplate> void removeRelationship(@Nonnull RE
*/
public abstract <RELATIONSHIP extends RecordTemplate> void removeRelationships(
@Nonnull List<RELATIONSHIP> relationships) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DummyLocalDAO(Class<ENTITY_ASPECT_UNION> aspectClass, BiFunction<FooUrn,
@Override
protected <ASPECT extends RecordTemplate> long saveLatest(FooUrn urn, Class<ASPECT> aspectClass, ASPECT oldEntry,
AuditStamp oldAuditStamp, ASPECT newEntry, AuditStamp newAuditStamp, boolean isSoftDeleted,
@Nullable IngestionTrackingContext trackingContext) {
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
return 0;
}

Expand All @@ -101,7 +101,8 @@ protected <T> T runInTransactionWithRetry(Supplier<T> block, int maxTransactionR
}

@Override
protected <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(FooUrn urn, Class<ASPECT> aspectClass) {
protected <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(FooUrn urn, Class<ASPECT> aspectClass,
boolean isTestMode) {
return _getLatestFunction.apply(urn, aspectClass);
}

Expand All @@ -112,14 +113,15 @@ protected <ASPECT extends RecordTemplate> long getNextVersion(FooUrn urn, Class<

@Override
protected <ASPECT extends RecordTemplate> void insert(FooUrn urn, RecordTemplate value, Class<ASPECT> aspectClass,
AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext) {
AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {

}

@Override
protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonnull FooUrn urn,
@Nullable RecordTemplate value, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp newAuditStamp,
long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext) {
long version, @Nonnull Timestamp oldTimestamp, @Nullable IngestionTrackingContext trackingContext,
boolean isTestMode) {

}

Expand Down Expand Up @@ -356,7 +358,7 @@ public void testMAEv5WithOverride() throws URISyntaxException {
_dummyLocalDAO._transactionRunner);

// pretend there is already foo in the database
when(dummyLocalDAO.getLatest(urn, AspectFoo.class))
when(dummyLocalDAO.getLatest(urn, AspectFoo.class, false))
.thenReturn(new BaseLocalDAO.AspectEntry<>(foo, null, false));

// try to add foo again but with the OVERRIDE write mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public void ensureSchemaUpToDate() {
@Override
@Transactional
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext) {
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext);
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode) {
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext, isTestMode);
}

@Override
Expand All @@ -105,7 +105,8 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
@Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp,
@Nullable Timestamp oldTimestamp,
@Nullable IngestionTrackingContext ingestionTrackingContext) {
@Nullable IngestionTrackingContext ingestionTrackingContext,
boolean isTestMode) {

final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
Expand All @@ -115,10 +116,10 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
final SqlUpdate sqlUpdate;
if (oldTimestamp != null) {
sqlUpdate = _server.createSqlUpdate(
SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction));
SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction, isTestMode));
sqlUpdate.setParameter("oldTimestamp", oldTimestamp.toString());
} else {
sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction));
sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction, isTestMode));
}
sqlUpdate.setParameter("urn", urn.toString())
.setParameter("lastmodifiedon", new Timestamp(timestamp).toString())
Expand Down Expand Up @@ -161,18 +162,20 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
* @param keysCount number of keys to query
* @param position position of the key to start from
* @param includeSoftDeleted whether to include soft deleted aspect in the query
* @param isTestMode whether the operation is in test mode or not
*/
@Override
public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> aspectKeys, int keysCount, int position,
boolean includeSoftDeleted) {
boolean includeSoftDeleted, boolean isTestMode) {

final int end = Math.min(aspectKeys.size(), position + keysCount);
final Map<Class<ASPECT>, Set<Urn>> keysToQueryMap = new HashMap<>();
for (int index = position; index < end; index++) {
final Urn entityUrn = aspectKeys.get(index).getUrn();
final Class<ASPECT> aspectClass = (Class<ASPECT>) aspectKeys.get(index).getAspectClass();
if (checkColumnExists(getTableName(entityUrn), getAspectColumnName(aspectClass))) {
if (checkColumnExists(isTestMode ? getTestTableName(entityUrn) : getTableName(entityUrn),
getAspectColumnName(aspectClass))) {
keysToQueryMap.computeIfAbsent(aspectClass, unused -> new HashSet<>()).add(entityUrn);
}
}
Expand All @@ -181,8 +184,8 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
Map<String, Class<ASPECT>> selectStatements = keysToQueryMap.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted),
entry -> entry.getKey()));
entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted,
isTestMode), entry -> entry.getKey()));

// consolidate/join the results
final Map<SqlRow, Class<ASPECT>> sqlRows = new LinkedHashMap<>();
Expand Down
Loading

0 comments on commit 3c10b71

Please sign in to comment.