diff --git a/dao-api/build.gradle b/dao-api/build.gradle index 43b8ad9d0..ad0080211 100644 --- a/dao-api/build.gradle +++ b/dao-api/build.gradle @@ -3,6 +3,7 @@ apply plugin: 'pegasus' apply from: "$rootDir/gradle/java-publishing.gradle" dependencies { + compile project(':gradle-plugins:metadata-annotations-lib') compile project(':core-models') compile project(':validators') compile externalDependency.javatuples @@ -13,7 +14,6 @@ dependencies { dataModel project(':validators') compileOnly externalDependency.lombok - annotationProcessor externalDependency.lombok testCompile project(':testing:core-models-testing') diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 8ac841c5a..25865da62 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -12,6 +12,11 @@ import com.linkedin.data.schema.validation.ValidationResult; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.UnionTemplate; +import com.linkedin.metadata.annotations.AspectIngestionAnnotation; +import com.linkedin.metadata.annotations.AspectIngestionAnnotationArray; +import com.linkedin.metadata.annotations.Mode; +import com.linkedin.metadata.annotations.UrnFilter; +import com.linkedin.metadata.annotations.UrnFilterArray; import com.linkedin.metadata.backfill.BackfillMode; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.equality.DefaultEqualityTester; @@ -26,6 +31,7 @@ import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig; import com.linkedin.metadata.dao.tracking.BaseTrackingManager; import com.linkedin.metadata.dao.tracking.TrackingUtils; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.events.IngestionMode; import com.linkedin.metadata.events.IngestionTrackingContext; @@ -167,8 +173,8 @@ public static class AspectUpdateLambda { protected final BaseMetadataEventProducer _producer; protected final BaseTrackingMetadataEventProducer _trackingProducer; protected final LocalDAOStorageConfig _storageConfig; - protected final BaseTrackingManager _trackingManager; + protected UrnPathExtractor _urnPathExtractor; // Maps an aspect class to the corresponding retention policy private final Map, Retention> _aspectRetentionMap = new HashMap<>(); @@ -210,7 +216,7 @@ public static class AspectUpdateLambda { * @param urnClass class of the URN type */ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseMetadataEventProducer producer, - @Nonnull Class urnClass) { + @Nonnull Class urnClass, @Nonnull UrnPathExtractor urnPathExtractor) { super(aspectUnionClass); _producer = producer; _storageConfig = LocalDAOStorageConfig.builder().build(); @@ -218,6 +224,7 @@ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Base _trackingManager = null; _trackingProducer = null; _urnClass = urnClass; + _urnPathExtractor = urnPathExtractor; } /** @@ -230,7 +237,7 @@ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Base * @param urnClass class of the URN type */ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer trackingProducer, - @Nonnull BaseTrackingManager trackingManager, @Nonnull Class urnClass) { + @Nonnull BaseTrackingManager trackingManager, @Nonnull Class urnClass, @Nonnull UrnPathExtractor urnPathExtractor) { super(aspectUnionClass); _producer = null; _storageConfig = LocalDAOStorageConfig.builder().build(); @@ -238,6 +245,7 @@ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Base _trackingManager = trackingManager; _trackingProducer = trackingProducer; _urnClass = urnClass; + _urnPathExtractor = urnPathExtractor; } /** @@ -248,7 +256,7 @@ public BaseLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Base * @param urnClass class of the URN type */ public BaseLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull LocalDAOStorageConfig storageConfig, - @Nonnull Class urnClass) { + @Nonnull Class urnClass, @Nonnull UrnPathExtractor urnPathExtractor) { super(storageConfig.getAspectStorageConfigMap().keySet()); _producer = producer; _storageConfig = storageConfig; @@ -256,6 +264,7 @@ public BaseLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull LocalD _trackingManager = null; _trackingProducer = null; _urnClass = urnClass; + _urnPathExtractor = urnPathExtractor; } /** @@ -268,7 +277,7 @@ public BaseLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull LocalD * */ public BaseLocalDAO(@Nonnull BaseTrackingMetadataEventProducer trackingProducer, @Nonnull LocalDAOStorageConfig storageConfig, - @Nonnull BaseTrackingManager trackingManager, @Nonnull Class urnClass) { + @Nonnull BaseTrackingManager trackingManager, @Nonnull Class urnClass, @Nonnull UrnPathExtractor urnPathExtractor) { super(storageConfig.getAspectStorageConfigMap().keySet()); _producer = null; _storageConfig = storageConfig; @@ -276,6 +285,7 @@ public BaseLocalDAO(@Nonnull BaseTrackingMetadataEventProducer trackingProducer, _trackingManager = trackingManager; _trackingProducer = trackingProducer; _urnClass = urnClass; + _urnPathExtractor = urnPathExtractor; } /** @@ -473,24 +483,11 @@ private AddResult addCommon(@Nonnull URN } } - final boolean oldAndNewEqual = (oldValue == null && newValue == null) || (oldValue != null && newValue != null && equalityTester.equals( - oldValue, newValue)); - - final IngestionMode ingestionMode = ingestionParams.getIngestionMode(); - - // Skip saving for the following scenarios - if (ingestionMode != IngestionMode.LIVE_OVERRIDE // ensure that the new metadata received is skippable (i.e. not marked as a forced write). - && (oldAndNewEqual || (aspectVersionSkipWrite(newValue, oldValue)))) { // values are equal or newValue ver < oldValue ver + // Logic determines whether an update to aspect should be persisted. + if (!shouldUpdateAspect(ingestionParams.getIngestionMode(), urn, oldValue, newValue, aspectClass, auditStamp, equalityTester)) { return new AddResult<>(oldValue, oldValue, aspectClass); } - if (ingestionMode == IngestionMode.LIVE_OVERRIDE) { - log.info((String.format( - "Received ingestion event with LIVE_OVERRIDE write mode. urn: %s, aspectClass: %s, auditStamp: %s," - + "newValue == oldValue: %b. An MAE will %sbe emitted.", urn, aspectClass, auditStamp, oldAndNewEqual, - oldAndNewEqual ? "not " : ""))); - } - // Save the newValue as the latest version long largestVersion = saveLatest(urn, aspectClass, oldValue, oldAuditStamp, newValue, auditStamp, latest.isSoftDeleted, trackingContext); @@ -1520,5 +1517,61 @@ protected boolean aspectVersionSkipWrite(@Nullable RecordTemplate newValue, @Nul return aspectVersionComparator(newValue, oldValue) == -1; } + /** + * The logic determines if we will update the aspect. + */ + private boolean shouldUpdateAspect(IngestionMode ingestionMode, URN urn, ASPECT oldValue, + ASPECT newValue, Class aspectClass, AuditStamp auditStamp, EqualityTester equalityTester) { + + final boolean oldAndNewEqual = (oldValue == null && newValue == null) || (oldValue != null && newValue != null && equalityTester.equals( + oldValue, newValue)); + + AspectIngestionAnnotationArray ingestionAnnotations = parseIngestionModeFromAnnotation(aspectClass); + AspectIngestionAnnotation annotation = findIngestionAnnotationForEntity(ingestionAnnotations, urn); + Mode mode = annotation == null || !annotation.hasMode() ? Mode.DEFAULT : annotation.getMode(); + + // Skip saving for the following scenarios + if (mode != Mode.FORCE_UPDATE + && ingestionMode != IngestionMode.LIVE_OVERRIDE // ensure that the new metadata received is skippable (i.e. not marked as a forced write). + && (oldAndNewEqual || aspectVersionSkipWrite(newValue, oldValue))) { // values are equal or newValue ver < oldValue ver + return false; + } + + if (ingestionMode == IngestionMode.LIVE_OVERRIDE) { + log.info((String.format( + "Received ingestion event with LIVE_OVERRIDE write mode. urn: %s, aspectClass: %s, auditStamp: %s," + + "newValue == oldValue: %b. An MAE will %sbe emitted.", urn, aspectClass, auditStamp, oldAndNewEqual, + oldAndNewEqual ? "not " : ""))); + return true; + } + + if (mode == Mode.FORCE_UPDATE) { + // If no filters specified in the annotation, FORCE_UPDATE + if (!annotation.hasFilter() || annotation.getFilter() == null) { + log.info((String.format("@gma.aspect.ingestion is FORCE_UPDATE on aspect %s and no filters set in annotation." + + " Force update aspect.", aspectClass.getCanonicalName()))); + return true; + } + + UrnFilterArray filters = annotation.getFilter(); + Map urnPaths = _urnPathExtractor.extractPaths(urn); + + // If there are filters in annotation, at least one filter conditions has to be met. + boolean atLeastOneFilterPass = false; + for (UrnFilter filter : filters) { + if (urnPaths.containsKey(filter.getPath()) + && urnPaths.get(filter.getPath()).toString().equals(filter.getValue())) { + atLeastOneFilterPass = true; + break; + } + } + + if (atLeastOneFilterPass) { + return true; + } + } + + return !(oldAndNewEqual || aspectVersionSkipWrite(newValue, oldValue)); + } } diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/scsi/EmptyPathExtractor.java b/dao-api/src/main/java/com/linkedin/metadata/dao/urnpath/EmptyPathExtractor.java similarity index 90% rename from dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/scsi/EmptyPathExtractor.java rename to dao-api/src/main/java/com/linkedin/metadata/dao/urnpath/EmptyPathExtractor.java index 8e114b539..e1d465c7a 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/scsi/EmptyPathExtractor.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/urnpath/EmptyPathExtractor.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.dao.scsi; +package com.linkedin.metadata.dao.urnpath; import com.linkedin.common.urn.Urn; import java.util.Collections; diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/scsi/UrnPathExtractor.java b/dao-api/src/main/java/com/linkedin/metadata/dao/urnpath/UrnPathExtractor.java similarity index 95% rename from dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/scsi/UrnPathExtractor.java rename to dao-api/src/main/java/com/linkedin/metadata/dao/urnpath/UrnPathExtractor.java index 2d5d54dfa..dcc075d7b 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/scsi/UrnPathExtractor.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/urnpath/UrnPathExtractor.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.dao.scsi; +package com.linkedin.metadata.dao.urnpath; import com.linkedin.common.urn.Urn; import java.util.Map; diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/utils/IngestionUtils.java b/dao-api/src/main/java/com/linkedin/metadata/dao/utils/IngestionUtils.java index adbfa4693..2fdcc129d 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/utils/IngestionUtils.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/utils/IngestionUtils.java @@ -3,12 +3,26 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.linkedin.avro2pegasus.events.UUID; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.schema.RecordDataSchema; +import com.linkedin.data.template.DataTemplateUtil; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.annotations.AspectIngestionAnnotation; +import com.linkedin.metadata.annotations.AspectIngestionAnnotationArray; +import com.linkedin.metadata.annotations.GmaAnnotation; +import com.linkedin.metadata.annotations.GmaAnnotationParser; import com.linkedin.metadata.backfill.BackfillMode; import com.linkedin.metadata.events.IngestionMode; import com.linkedin.metadata.events.IngestionTrackingContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class IngestionUtils { private IngestionUtils() { @@ -37,4 +51,62 @@ public static IngestionTrackingContext buildIngestionTrackingContext(@Nonnull UU .setEmitter(emitter) .setEmitTime(timestamp); } + + /** + * Parse the ingestion mode annotation given an aspect class. + */ + @Nonnull + public static AspectIngestionAnnotationArray parseIngestionModeFromAnnotation( + @Nonnull final Class aspectClass) { + + try { + final RecordDataSchema schema = (RecordDataSchema) DataTemplateUtil.getSchema(aspectClass); + final Optional gmaAnnotation = new GmaAnnotationParser().parse(schema); + + // Return empty array if user did not specify any ingestion annotation on the aspect. + if (!gmaAnnotation.isPresent() || !gmaAnnotation.get().hasAspect() || !gmaAnnotation.get().getAspect().hasIngestion()) { + return new AspectIngestionAnnotationArray(); + } + + return gmaAnnotation.get().getAspect().getIngestion(); + } catch (Exception e) { + throw new RuntimeException(String.format("Failed to parse the annotations for aspect %s", aspectClass.getCanonicalName()), e); + } + } + + @Nullable + public static AspectIngestionAnnotation findIngestionAnnotationForEntity(@Nonnull AspectIngestionAnnotationArray ingestionAnnotations, + Urn urn) { + List aspectIngestionAnnotationList = new ArrayList<>(); + for (AspectIngestionAnnotation ingestionAnnotation : ingestionAnnotations) { + if (!ingestionAnnotation.hasUrn() || !ingestionAnnotation.hasMode()) { + continue; + } + + final String urnFromAnnotation = getLastElementsInUrnString(ingestionAnnotation.getUrn()); + final String urnFromInput = getLastElementsInUrnString(urn.getClass().getCanonicalName()); + + if (urnFromAnnotation.equals(urnFromInput)) { + aspectIngestionAnnotationList.add(ingestionAnnotation); + } + } + + if (aspectIngestionAnnotationList.size() == 1) { + return aspectIngestionAnnotationList.get(0); + } else if (aspectIngestionAnnotationList.size() > 1) { + log.error("Invalid usage. More than one ingestion rule defined for same urn {}", urn); + return null; + } else { + return null; + } + } + + /** + * Get last element from urnStr. + * for example, if urnStr is com.linkedin.common.FooUrn, then last element is FooUrn. + */ + private static String getLastElementsInUrnString(String urnStr) { + final String[] urnParts = urnStr.split("\\."); + return urnParts[urnParts.length - 1]; + } } diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index 4a840441f..3118d49b7 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -10,6 +10,7 @@ import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; import com.linkedin.metadata.dao.tracking.BaseTrackingManager; +import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor; import com.linkedin.metadata.events.IngestionMode; import com.linkedin.metadata.events.IngestionTrackingContext; import com.linkedin.metadata.internal.IngestionParams; @@ -59,14 +60,14 @@ static class DummyLocalDAO extends Ba public DummyLocalDAO(Class aspectClass, BiFunction, AspectEntry> getLatestFunction, BaseMetadataEventProducer eventProducer, DummyTransactionRunner transactionRunner) { - super(aspectClass, eventProducer, FooUrn.class); + super(aspectClass, eventProducer, FooUrn.class, new EmptyPathExtractor<>()); _getLatestFunction = getLatestFunction; _transactionRunner = transactionRunner; } public DummyLocalDAO(Class aspectClass, BiFunction, AspectEntry> getLatestFunction, BaseTrackingMetadataEventProducer eventProducer, BaseTrackingManager trackingManager, DummyTransactionRunner transactionRunner) { - super(aspectClass, eventProducer, trackingManager, FooUrn.class); + super(aspectClass, eventProducer, trackingManager, FooUrn.class, new EmptyPathExtractor<>()); _getLatestFunction = getLatestFunction; _transactionRunner = transactionRunner; } diff --git a/dao-impl/ebean-dao/build.gradle b/dao-impl/ebean-dao/build.gradle index 36f4d4113..8ddc38eb0 100644 --- a/dao-impl/ebean-dao/build.gradle +++ b/dao-impl/ebean-dao/build.gradle @@ -7,7 +7,6 @@ configurations { } dependencies { - compile project(':gradle-plugins:metadata-annotations-lib') compile project(':core-models-utils') compile project(':dao-api') compile externalDependency.ebean diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 4f8cd83c8..4a9413a48 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -7,8 +7,8 @@ import com.linkedin.metadata.aspect.AuditedAspect; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry; -import com.linkedin.metadata.dao.scsi.EmptyPathExtractor; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.metadata.dao.utils.EBeanDAOUtils; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index ba9a99f9f..c7084cbbc 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -16,8 +16,8 @@ import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; -import com.linkedin.metadata.dao.scsi.EmptyPathExtractor; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig; import com.linkedin.metadata.dao.tracking.BaseTrackingManager; import com.linkedin.metadata.dao.utils.EBeanDAOUtils; @@ -83,7 +83,6 @@ public class EbeanLocalDAO private final static int DEFAULT_BATCH_SIZE = 50; private int _queryKeysCount = DEFAULT_BATCH_SIZE; private IEbeanLocalAccess _localAccess; - private UrnPathExtractor _urnPathExtractor; private SchemaConfig _schemaConfig = SchemaConfig.OLD_SCHEMA_ONLY; private final EBeanDAOConfig _eBeanDAOConfig = new EBeanDAOConfig(); @@ -380,18 +379,16 @@ public EbeanLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull Bas private EbeanLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull Class urnClass) { - super(aspectUnionClass, producer, urnClass); + super(aspectUnionClass, producer, urnClass, new EmptyPathExtractor<>()); _server = server; _urnClass = urnClass; - _urnPathExtractor = new EmptyPathExtractor<>(); } private EbeanLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull Class urnClass, @Nonnull BaseTrackingManager trackingManager) { - super(aspectUnionClass, producer, trackingManager, urnClass); + super(aspectUnionClass, producer, trackingManager, urnClass, new EmptyPathExtractor<>()); _server = server; _urnClass = urnClass; - _urnPathExtractor = new EmptyPathExtractor<>(); } private EbeanLocalDAO(@Nonnull Class aspectUnionClass, @Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class urnClass, @Nonnull SchemaConfig schemaConfig) { @@ -453,19 +450,17 @@ private EbeanLocalDAO(@Nonnull Class aspectUnionClass, EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class urnClass, @Nonnull UrnPathExtractor urnPathExtractor) { - super(producer, storageConfig, urnClass); + super(producer, storageConfig, urnClass, urnPathExtractor); _server = server; _urnClass = urnClass; - _urnPathExtractor = urnPathExtractor; } private EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull EbeanServer server, @Nonnull LocalDAOStorageConfig storageConfig, @Nonnull Class urnClass, @Nonnull UrnPathExtractor urnPathExtractor, @Nonnull BaseTrackingManager trackingManager) { - super(producer, storageConfig, trackingManager, urnClass); + super(producer, storageConfig, trackingManager, urnClass, urnPathExtractor); _server = server; _urnClass = urnClass; - _urnPathExtractor = urnPathExtractor; } private EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server, @@ -507,6 +502,11 @@ public void setUrnPathExtractor(@Nonnull UrnPathExtractor urnPathExtractor) _urnPathExtractor = urnPathExtractor; } + @Nonnull + public UrnPathExtractor getUrnPathExtractor() { + return _urnPathExtractor; + } + /** * Return the {@link EbeanServer} server instance used for customized queries. */ diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 25647b3ff..549995f32 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -5,7 +5,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.metadata.events.IngestionTrackingContext; import com.linkedin.metadata.query.IndexFilter; import com.linkedin.metadata.query.IndexGroupByCriterion; diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 96a4d66ab..818ee592c 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -3,7 +3,7 @@ import com.google.common.io.Resources; import com.linkedin.common.AuditStamp; import com.linkedin.metadata.dao.localrelationship.SampleLocalRelationshipRegistryImpl; -import com.linkedin.metadata.dao.scsi.EmptyPathExtractor; +import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor; import com.linkedin.metadata.dao.utils.BarUrnPathExtractor; import com.linkedin.metadata.dao.utils.EmbeddedMariaInstance; import com.linkedin.metadata.dao.utils.FooUrnPathExtractor; diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index 96a6b65fa..b5cad223e 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -25,7 +25,7 @@ import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.retention.TimeBasedRetention; import com.linkedin.metadata.dao.retention.VersionBasedRetention; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig; import com.linkedin.metadata.dao.tracking.BaseTrackingManager; import com.linkedin.metadata.dao.utils.BarUrnPathExtractor; @@ -357,6 +357,115 @@ public void testAddTwo() { verifyNoMoreInteractions(_mockProducer); } + @Test + public void testAddWithIngestionAnnotation() throws URISyntaxException { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(1); + AspectFoo foo = new AspectFoo().setValue("foo"); + + IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE); + long t1 = 1704067200000L; // 2024-01-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t1).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + long t2 = 1706745600000L; // 2024-02-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t2).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + // make sure that the update still went through by checking the aspect's lastmodifiedon + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + AspectKey aspectKey = new AspectKey<>(AspectFoo.class, urn, 0L); + long aspectFooLastModifiedOn = dao.getWithExtraInfo(aspectKey).get().getExtraInfo().getAudit().getTime(); + assertEquals(aspectFooLastModifiedOn, t2); + } else { + String aspectName = ModelUtils.getAspectName(AspectFoo.class); + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + long time = aspect.getCreatedOn().getTime(); + assertEquals(time, t2); + } + } + + @Test + public void testAddWithIngestionAnnotationWithOneFilter() throws URISyntaxException { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(2); + AspectFoo foo = new AspectFoo().setValue("foo"); + + IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE); + long t1 = 1704067200000L; // 2024-01-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t1).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + long t2 = 1706745600000L; // 2024-02-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t2).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + // Even though the aspect is annotated with FORCE_UPDATE annotation, the filter does not match so the update is not persisted. + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + AspectKey aspectKey = new AspectKey<>(AspectFoo.class, urn, 0L); + long aspectFooLastModifiedOn = dao.getWithExtraInfo(aspectKey).get().getExtraInfo().getAudit().getTime(); + assertEquals(aspectFooLastModifiedOn, t1); + } else { + String aspectName = ModelUtils.getAspectName(AspectFoo.class); + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + long time = aspect.getCreatedOn().getTime(); + // update not persisted, timestamp should still be t1. + assertEquals(time, t1); + } + } + + @Test + public void testAddWithIngestionAnnotationWithMultipleFilters() throws URISyntaxException { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn urn = makeFooUrn(2); // This will not match the filter {"path": "/fooId", "value": "1"} + AspectBar foo = new AspectBar().setValue("bar"); + + IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE); + long t1 = 1704067200000L; // 2024-01-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t1).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + long t2 = 1706745600000L; // 2024-02-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t2).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + // One filter (two filters in total) matched, we should persist into db. + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + AspectKey aspectKey = new AspectKey<>(AspectBar.class, urn, 0L); + long aspectFooLastModifiedOn = dao.getWithExtraInfo(aspectKey).get().getExtraInfo().getAudit().getTime(); + assertEquals(aspectFooLastModifiedOn, t2); + } else { + String aspectName = ModelUtils.getAspectName(AspectBar.class); + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + long time = aspect.getCreatedOn().getTime(); + // update not persisted, timestamp should still be t1. + assertEquals(time, t2); + } + } + + @Test + public void testAddWithIngestionAnnotationWithMultipleFiltersButNoMatch() throws URISyntaxException { + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrnPathExtractor urnPathExtractor = (FooUrnPathExtractor) dao.getUrnPathExtractor(); + urnPathExtractor.updateDummyEntry(1); + FooUrn urn = makeFooUrn(2); // This will not match any filter. + AspectBar foo = new AspectBar().setValue("bar"); + + IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE); + long t1 = 1704067200000L; // 2024-01-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t1).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + long t2 = 1706745600000L; // 2024-02-01 00:00:00.0 GMT + dao.add(urn, foo, new AuditStamp().setTime(t2).setActor(Urn.createFromString("urn:li:corpuser:tester")), null, ingestionParams); + + // No filter, we should not persist into db. + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + AspectKey aspectKey = new AspectKey<>(AspectBar.class, urn, 0L); + long aspectFooLastModifiedOn = dao.getWithExtraInfo(aspectKey).get().getExtraInfo().getAudit().getTime(); + assertEquals(aspectFooLastModifiedOn, t1); + } else { + String aspectName = ModelUtils.getAspectName(AspectBar.class); + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); + long time = aspect.getCreatedOn().getTime(); + // update not persisted, timestamp should still be t1. + assertEquals(time, t1); + } + } + @Test public void testAddWithOverrideIngestionMode() throws URISyntaxException { // this test is used to check that new metadata ingestion with the OVERRIDE write mode is still updated in @@ -380,11 +489,9 @@ public void testAddWithOverrideIngestionMode() throws URISyntaxException { // however, make sure that the update still went through by checking the aspect's lastmodifiedon if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { - String aspectFooLastModifiedOnStr = dao.getServer() - .createSqlQuery( - "select json_extract(a_aspectfoo, '$.lastmodifiedon') as lastmodifiedon from metadata_entity_foo") - .findOne().getString("lastmodifiedon"); - assertEquals(Timestamp.valueOf(aspectFooLastModifiedOnStr.replace("\"", "")).getTime(), t2); + AspectKey aspectKey = new AspectKey<>(AspectFoo.class, urn, 0L); + long aspectFooLastModifiedOn = dao.getWithExtraInfo(aspectKey).get().getExtraInfo().getAudit().getTime(); + assertEquals(aspectFooLastModifiedOn, t2); } else { String aspectName = ModelUtils.getAspectName(AspectFoo.class); EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 0); @@ -397,7 +504,7 @@ public void testAddWithOverrideIngestionMode() throws URISyntaxException { public void testDefaultEqualityTester() { EbeanLocalDAO dao = createDao(FooUrn.class); dao.setEqualityTester(AspectFoo.class, DefaultEqualityTester.newInstance()); - FooUrn urn = makeFooUrn(1); + FooUrn urn = makeFooUrn(2); String aspectName = ModelUtils.getAspectName(AspectFoo.class); AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); @@ -2187,7 +2294,6 @@ public void testGetSoftDeletedAspect() { assertEquals(fooOptional.get(), v0); } - InOrder inOrder = inOrder(_mockProducer); inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, v1); inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, v1, v0); @@ -2198,7 +2304,7 @@ public void testGetSoftDeletedAspect() { @Test public void testSoftDeletedAspectWithNoExistingMetadata() { EbeanLocalDAO dao = createDao(FooUrn.class); - FooUrn urn = makeFooUrn(1); + FooUrn urn = makeFooUrn(2); String aspectName = ModelUtils.getAspectName(AspectFoo.class); // no metadata already exists diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java index fa90144e5..eefd38a72 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java @@ -11,7 +11,7 @@ import com.linkedin.metadata.dao.EbeanLocalRelationshipWriterDAO; import com.linkedin.metadata.dao.IEbeanLocalAccess; import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO; -import com.linkedin.metadata.dao.scsi.EmptyPathExtractor; +import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor; import com.linkedin.metadata.dao.utils.EBeanDAOUtils; import com.linkedin.metadata.dao.utils.EmbeddedMariaInstance; import com.linkedin.metadata.dao.utils.SQLSchemaUtils; diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java index 8b2fa438a..491fd7099 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BarUrnPathExtractor.java @@ -1,6 +1,6 @@ package com.linkedin.metadata.dao.utils; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.testing.urn.BarUrn; import java.util.Collections; import java.util.HashMap; diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java index ceb5fbf11..19762e570 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/BazUrnPathExtractor.java @@ -1,6 +1,6 @@ package com.linkedin.metadata.dao.utils; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.testing.urn.BazUrn; import java.util.Collections; import java.util.HashMap; diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java index 407b3509f..95f947f72 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/FooUrnPathExtractor.java @@ -1,6 +1,6 @@ package com.linkedin.metadata.dao.utils; -import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.dao.urnpath.UrnPathExtractor; import com.linkedin.testing.urn.FooUrn; import java.util.Collections; import java.util.HashMap; @@ -9,12 +9,19 @@ public class FooUrnPathExtractor implements UrnPathExtractor { + private Map urnPaths = new HashMap() { + { + put("/dummyId", 10); // Hard-code the value to test ingestion multiple filters with @gma.aspect.ingestion + } + }; + @Override public Map extractPaths(@Nonnull FooUrn urn) { - return Collections.unmodifiableMap(new HashMap() { - { - put("/fooId", urn.getFooIdEntity()); - } - }); + urnPaths.put("/fooId", urn.getFooIdEntity()); + return Collections.unmodifiableMap(urnPaths); + } + + public void updateDummyEntry(Integer value) { + urnPaths.put("/dummyId", value); } } diff --git a/gradle-plugins/metadata-annotations-schema/build.gradle b/gradle-plugins/metadata-annotations-schema/build.gradle index 32d97382c..28f127c4c 100644 --- a/gradle-plugins/metadata-annotations-schema/build.gradle +++ b/gradle-plugins/metadata-annotations-schema/build.gradle @@ -8,5 +8,4 @@ apply from: "$rootDir/gradle/java-publishing.gradle" dependencies { dataModel project(':core-models') - dataModel project(':dao-api') } \ No newline at end of file diff --git a/gradle-plugins/metadata-annotations-schema/src/main/pegasus/com/linkedin/metadata/annotations/AspectIngestionAnnotation.pdl b/gradle-plugins/metadata-annotations-schema/src/main/pegasus/com/linkedin/metadata/annotations/AspectIngestionAnnotation.pdl index 3fe85809c..2fe729053 100644 --- a/gradle-plugins/metadata-annotations-schema/src/main/pegasus/com/linkedin/metadata/annotations/AspectIngestionAnnotation.pdl +++ b/gradle-plugins/metadata-annotations-schema/src/main/pegasus/com/linkedin/metadata/annotations/AspectIngestionAnnotation.pdl @@ -23,7 +23,7 @@ record AspectIngestionAnnotation { urn: optional string /** - * Filter on the URN so that this ingestion mode is only applicable to a subset of entities. + * Filter on the URN so that this ingestion mode is only applicable to a subset of entities. Filters are chained by OR relation. */ filter: optional array[record UrnFilter { diff --git a/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectBar.pdl b/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectBar.pdl index 8f20f2e71..aa3713b43 100644 --- a/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectBar.pdl +++ b/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectBar.pdl @@ -3,6 +3,11 @@ namespace com.linkedin.testing /** * For unit tests */ +@gma.aspect.ingestion = [ + {"mode": "FORCE_UPDATE", "urn": "com.linkedin.testing.FooUrn", "filter": [ + {"path": "/fooId", "value": "1"}, {"path": "/dummyId", "value": "10"} + ]} +] @gma.aspect.column.name = "aspectbar" record AspectBar { diff --git a/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectFoo.pdl b/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectFoo.pdl index 4f133441f..0f8c6a241 100644 --- a/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectFoo.pdl +++ b/testing/test-models/src/main/pegasus/com/linkedin/testing/AspectFoo.pdl @@ -3,6 +3,11 @@ namespace com.linkedin.testing /** * For unit tests */ +@gma.aspect.ingestion = [ + {"mode": "FORCE_UPDATE", "urn": "com.linkedin.testing.FooUrn", "filter": [ + {"path": "/fooId", "value": "1"} + ]} +] @gma.aspect.column.name = "aspectfoo" record AspectFoo { diff --git a/version.properties b/version.properties index 63dd9f4eb..6f785b137 100644 --- a/version.properties +++ b/version.properties @@ -1 +1 @@ -version=0.4.* +version=0.5.*