Skip to content

Commit

Permalink
Update DAO to use @gma.aspect.ingestion annotation (#371)
Browse files Browse the repository at this point in the history
* Update DAO to use @gma.aspect.ingestion annotation

* revert change

* bump version

* address comment

* revert

* removed

---------

Co-authored-by: Jesse Jia <[email protected]>
  • Loading branch information
zhixuanjia and Jesse Jia authored Mar 16, 2024
1 parent a8e0aab commit a64d0ca
Show file tree
Hide file tree
Showing 21 changed files with 309 additions and 62 deletions.
2 changes: 1 addition & 1 deletion dao-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ apply plugin: 'pegasus'
apply from: "$rootDir/gradle/java-publishing.gradle"

dependencies {
compile project(':gradle-plugins:metadata-annotations-lib')
compile project(':core-models')
compile project(':validators')
compile externalDependency.javatuples
Expand All @@ -13,7 +14,6 @@ dependencies {
dataModel project(':validators')

compileOnly externalDependency.lombok

annotationProcessor externalDependency.lombok

testCompile project(':testing:core-models-testing')
Expand Down
93 changes: 73 additions & 20 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import com.linkedin.data.schema.validation.ValidationResult;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.annotations.AspectIngestionAnnotation;
import com.linkedin.metadata.annotations.AspectIngestionAnnotationArray;
import com.linkedin.metadata.annotations.Mode;
import com.linkedin.metadata.annotations.UrnFilter;
import com.linkedin.metadata.annotations.UrnFilterArray;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
Expand All @@ -26,6 +31,7 @@
import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.tracking.TrackingUtils;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
Expand Down Expand Up @@ -167,8 +173,8 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected final BaseMetadataEventProducer _producer;
protected final BaseTrackingMetadataEventProducer _trackingProducer;
protected final LocalDAOStorageConfig _storageConfig;

protected final BaseTrackingManager _trackingManager;
protected UrnPathExtractor<URN> _urnPathExtractor;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -210,14 +216,15 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
* @param urnClass class of the URN type
*/
public BaseLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull Class<URN> urnClass) {
@Nonnull Class<URN> urnClass, @Nonnull UrnPathExtractor<URN> urnPathExtractor) {
super(aspectUnionClass);
_producer = producer;
_storageConfig = LocalDAOStorageConfig.builder().build();
_aspectUnionClass = aspectUnionClass;
_trackingManager = null;
_trackingProducer = null;
_urnClass = urnClass;
_urnPathExtractor = urnPathExtractor;
}

/**
Expand All @@ -230,14 +237,15 @@ public BaseLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Base
* @param urnClass class of the URN type
*/
public BaseLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer trackingProducer,
@Nonnull BaseTrackingManager trackingManager, @Nonnull Class<URN> urnClass) {
@Nonnull BaseTrackingManager trackingManager, @Nonnull Class<URN> urnClass, @Nonnull UrnPathExtractor<URN> urnPathExtractor) {
super(aspectUnionClass);
_producer = null;
_storageConfig = LocalDAOStorageConfig.builder().build();
_aspectUnionClass = aspectUnionClass;
_trackingManager = trackingManager;
_trackingProducer = trackingProducer;
_urnClass = urnClass;
_urnPathExtractor = urnPathExtractor;
}

/**
Expand All @@ -248,14 +256,15 @@ public BaseLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Base
* @param urnClass class of the URN type
*/
public BaseLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull LocalDAOStorageConfig storageConfig,
@Nonnull Class<URN> urnClass) {
@Nonnull Class<URN> urnClass, @Nonnull UrnPathExtractor<URN> urnPathExtractor) {
super(storageConfig.getAspectStorageConfigMap().keySet());
_producer = producer;
_storageConfig = storageConfig;
_aspectUnionClass = producer.getAspectUnionClass();
_trackingManager = null;
_trackingProducer = null;
_urnClass = urnClass;
_urnPathExtractor = urnPathExtractor;
}

/**
Expand All @@ -268,14 +277,15 @@ public BaseLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull LocalD
*
*/
public BaseLocalDAO(@Nonnull BaseTrackingMetadataEventProducer trackingProducer, @Nonnull LocalDAOStorageConfig storageConfig,
@Nonnull BaseTrackingManager trackingManager, @Nonnull Class<URN> urnClass) {
@Nonnull BaseTrackingManager trackingManager, @Nonnull Class<URN> urnClass, @Nonnull UrnPathExtractor<URN> urnPathExtractor) {
super(storageConfig.getAspectStorageConfigMap().keySet());
_producer = null;
_storageConfig = storageConfig;
_aspectUnionClass = trackingProducer.getAspectUnionClass();
_trackingManager = trackingManager;
_trackingProducer = trackingProducer;
_urnClass = urnClass;
_urnPathExtractor = urnPathExtractor;
}

/**
Expand Down Expand Up @@ -473,24 +483,11 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
}
}

final boolean oldAndNewEqual = (oldValue == null && newValue == null) || (oldValue != null && newValue != null && equalityTester.equals(
oldValue, newValue));

final IngestionMode ingestionMode = ingestionParams.getIngestionMode();

// Skip saving for the following scenarios
if (ingestionMode != IngestionMode.LIVE_OVERRIDE // ensure that the new metadata received is skippable (i.e. not marked as a forced write).
&& (oldAndNewEqual || (aspectVersionSkipWrite(newValue, oldValue)))) { // values are equal or newValue ver < oldValue ver
// Logic determines whether an update to aspect should be persisted.
if (!shouldUpdateAspect(ingestionParams.getIngestionMode(), urn, oldValue, newValue, aspectClass, auditStamp, equalityTester)) {
return new AddResult<>(oldValue, oldValue, aspectClass);
}

if (ingestionMode == IngestionMode.LIVE_OVERRIDE) {
log.info((String.format(
"Received ingestion event with LIVE_OVERRIDE write mode. urn: %s, aspectClass: %s, auditStamp: %s,"
+ "newValue == oldValue: %b. An MAE will %sbe emitted.", urn, aspectClass, auditStamp, oldAndNewEqual,
oldAndNewEqual ? "not " : "")));
}

// Save the newValue as the latest version
long largestVersion = saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted, trackingContext);

Expand Down Expand Up @@ -1520,5 +1517,61 @@ protected boolean aspectVersionSkipWrite(@Nullable RecordTemplate newValue, @Nul
return aspectVersionComparator(newValue, oldValue) == -1;
}

/**
* The logic determines if we will update the aspect.
*/
private <ASPECT extends RecordTemplate> boolean shouldUpdateAspect(IngestionMode ingestionMode, URN urn, ASPECT oldValue,
ASPECT newValue, Class<ASPECT> aspectClass, AuditStamp auditStamp, EqualityTester<ASPECT> equalityTester) {

final boolean oldAndNewEqual = (oldValue == null && newValue == null) || (oldValue != null && newValue != null && equalityTester.equals(
oldValue, newValue));

AspectIngestionAnnotationArray ingestionAnnotations = parseIngestionModeFromAnnotation(aspectClass);
AspectIngestionAnnotation annotation = findIngestionAnnotationForEntity(ingestionAnnotations, urn);
Mode mode = annotation == null || !annotation.hasMode() ? Mode.DEFAULT : annotation.getMode();

// Skip saving for the following scenarios
if (mode != Mode.FORCE_UPDATE
&& ingestionMode != IngestionMode.LIVE_OVERRIDE // ensure that the new metadata received is skippable (i.e. not marked as a forced write).
&& (oldAndNewEqual || aspectVersionSkipWrite(newValue, oldValue))) { // values are equal or newValue ver < oldValue ver
return false;
}

if (ingestionMode == IngestionMode.LIVE_OVERRIDE) {
log.info((String.format(
"Received ingestion event with LIVE_OVERRIDE write mode. urn: %s, aspectClass: %s, auditStamp: %s,"
+ "newValue == oldValue: %b. An MAE will %sbe emitted.", urn, aspectClass, auditStamp, oldAndNewEqual,
oldAndNewEqual ? "not " : "")));
return true;
}

if (mode == Mode.FORCE_UPDATE) {
// If no filters specified in the annotation, FORCE_UPDATE
if (!annotation.hasFilter() || annotation.getFilter() == null) {
log.info((String.format("@gma.aspect.ingestion is FORCE_UPDATE on aspect %s and no filters set in annotation."
+ " Force update aspect.", aspectClass.getCanonicalName())));
return true;
}

UrnFilterArray filters = annotation.getFilter();
Map<String, Object> urnPaths = _urnPathExtractor.extractPaths(urn);

// If there are filters in annotation, at least one filter conditions has to be met.
boolean atLeastOneFilterPass = false;
for (UrnFilter filter : filters) {
if (urnPaths.containsKey(filter.getPath())
&& urnPaths.get(filter.getPath()).toString().equals(filter.getValue())) {
atLeastOneFilterPass = true;
break;
}
}

if (atLeastOneFilterPass) {
return true;
}
}

return !(oldAndNewEqual || aspectVersionSkipWrite(newValue, oldValue));
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.scsi;
package com.linkedin.metadata.dao.urnpath;

import com.linkedin.common.urn.Urn;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.scsi;
package com.linkedin.metadata.dao.urnpath;

import com.linkedin.common.urn.Urn;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,26 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.annotations.AspectIngestionAnnotation;
import com.linkedin.metadata.annotations.AspectIngestionAnnotationArray;
import com.linkedin.metadata.annotations.GmaAnnotation;
import com.linkedin.metadata.annotations.GmaAnnotationParser;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class IngestionUtils {

private IngestionUtils() {
Expand Down Expand Up @@ -37,4 +51,62 @@ public static IngestionTrackingContext buildIngestionTrackingContext(@Nonnull UU
.setEmitter(emitter)
.setEmitTime(timestamp);
}

/**
* Parse the ingestion mode annotation given an aspect class.
*/
@Nonnull
public static <ASPECT extends RecordTemplate> AspectIngestionAnnotationArray parseIngestionModeFromAnnotation(
@Nonnull final Class<ASPECT> aspectClass) {

try {
final RecordDataSchema schema = (RecordDataSchema) DataTemplateUtil.getSchema(aspectClass);
final Optional<GmaAnnotation> gmaAnnotation = new GmaAnnotationParser().parse(schema);

// Return empty array if user did not specify any ingestion annotation on the aspect.
if (!gmaAnnotation.isPresent() || !gmaAnnotation.get().hasAspect() || !gmaAnnotation.get().getAspect().hasIngestion()) {
return new AspectIngestionAnnotationArray();
}

return gmaAnnotation.get().getAspect().getIngestion();
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to parse the annotations for aspect %s", aspectClass.getCanonicalName()), e);
}
}

@Nullable
public static AspectIngestionAnnotation findIngestionAnnotationForEntity(@Nonnull AspectIngestionAnnotationArray ingestionAnnotations,
Urn urn) {
List<AspectIngestionAnnotation> aspectIngestionAnnotationList = new ArrayList<>();
for (AspectIngestionAnnotation ingestionAnnotation : ingestionAnnotations) {
if (!ingestionAnnotation.hasUrn() || !ingestionAnnotation.hasMode()) {
continue;
}

final String urnFromAnnotation = getLastElementsInUrnString(ingestionAnnotation.getUrn());
final String urnFromInput = getLastElementsInUrnString(urn.getClass().getCanonicalName());

if (urnFromAnnotation.equals(urnFromInput)) {
aspectIngestionAnnotationList.add(ingestionAnnotation);
}
}

if (aspectIngestionAnnotationList.size() == 1) {
return aspectIngestionAnnotationList.get(0);
} else if (aspectIngestionAnnotationList.size() > 1) {
log.error("Invalid usage. More than one ingestion rule defined for same urn {}", urn);
return null;
} else {
return null;
}
}

/**
* Get last element from urnStr.
* for example, if urnStr is com.linkedin.common.FooUrn, then last element is FooUrn.
*/
private static String getLastElementsInUrnString(String urnStr) {
final String[] urnParts = urnStr.split("\\.");
return urnParts[urnParts.length - 1];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.internal.IngestionParams;
Expand Down Expand Up @@ -59,14 +60,14 @@ static class DummyLocalDAO<ENTITY_ASPECT_UNION extends UnionTemplate> extends Ba

public DummyLocalDAO(Class<ENTITY_ASPECT_UNION> aspectClass, BiFunction<FooUrn, Class<? extends RecordTemplate>, AspectEntry> getLatestFunction,
BaseMetadataEventProducer eventProducer, DummyTransactionRunner transactionRunner) {
super(aspectClass, eventProducer, FooUrn.class);
super(aspectClass, eventProducer, FooUrn.class, new EmptyPathExtractor<>());
_getLatestFunction = getLatestFunction;
_transactionRunner = transactionRunner;
}

public DummyLocalDAO(Class<ENTITY_ASPECT_UNION> aspectClass, BiFunction<FooUrn, Class<? extends RecordTemplate>, AspectEntry> getLatestFunction,
BaseTrackingMetadataEventProducer eventProducer, BaseTrackingManager trackingManager, DummyTransactionRunner transactionRunner) {
super(aspectClass, eventProducer, trackingManager, FooUrn.class);
super(aspectClass, eventProducer, trackingManager, FooUrn.class, new EmptyPathExtractor<>());
_getLatestFunction = getLatestFunction;
_transactionRunner = transactionRunner;
}
Expand Down
1 change: 0 additions & 1 deletion dao-impl/ebean-dao/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ configurations {
}

dependencies {
compile project(':gradle-plugins:metadata-annotations-lib')
compile project(':core-models-utils')
compile project(':dao-api')
compile externalDependency.ebean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.linkedin.metadata.aspect.AuditedAspect;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.EmptyPathExtractor;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.utils.EBeanDAOUtils;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand Down
Loading

0 comments on commit a64d0ca

Please sign in to comment.