From 6b6d820eea3e7c1297381b2b9ad9b37e22cd9c5d Mon Sep 17 00:00:00 2001 From: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Date: Sat, 28 Dec 2024 01:49:15 +0530 Subject: [PATCH] feat(businessAttribute): generate platform events on association/removal with schemaField (#12224) --- metadata-io/build.gradle | 2 +- ...hemaFieldBusinessAttributeChangeEvent.java | 38 ++++++ ...usinessAttributesChangeEventGenerator.java | 98 ++++++++++++++ ...essAttributesChangeEventGeneratorTest.java | 124 ++++++++++++++++++ .../event/EntityChangeEventGeneratorHook.java | 22 +++- .../src/main/resources/application.yaml | 1 + ...tyChangeEventGeneratorRegistryFactory.java | 2 + 7 files changed, 279 insertions(+), 8 deletions(-) create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGeneratorTest.java diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 516a77d59d50bd..88bbfa2e10c4c1 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -102,7 +102,7 @@ dependencies { testImplementation(testFixtures(project(":entity-registry"))) testAnnotationProcessor externalDependency.lombok - + testImplementation project(':mock-entity-registry') constraints { implementation(externalDependency.log4jCore) { because("previous versions are vulnerable to CVE-2021-45105") diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java b/metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java new file mode 100644 index 00000000000000..1f1252e2085452 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java @@ -0,0 +1,38 @@ +package com.linkedin.metadata.timeline.data.dataset.schema; + +import com.google.common.collect.ImmutableMap; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.timeline.data.ChangeCategory; +import com.linkedin.metadata.timeline.data.ChangeEvent; +import com.linkedin.metadata.timeline.data.ChangeOperation; +import com.linkedin.metadata.timeline.data.SemanticChangeType; +import lombok.Builder; + +public class SchemaFieldBusinessAttributeChangeEvent extends ChangeEvent { + @Builder(builderMethodName = "schemaFieldBusinessAttributeChangeEventBuilder") + public SchemaFieldBusinessAttributeChangeEvent( + String entityUrn, + ChangeCategory category, + ChangeOperation operation, + String modifier, + AuditStamp auditStamp, + SemanticChangeType semVerChange, + String description, + Urn parentUrn, + Urn businessAttributeUrn, + Urn datasetUrn) { + super( + entityUrn, + category, + operation, + modifier, + ImmutableMap.of( + "parentUrn", parentUrn.toString(), + "businessAttributeUrn", businessAttributeUrn.toString(), + "datasetUrn", datasetUrn.toString()), + auditStamp, + semVerChange, + description); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java new file mode 100644 index 00000000000000..69d20f2f41bd56 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java @@ -0,0 +1,98 @@ +package com.linkedin.metadata.timeline.eventgenerator; + +import com.linkedin.businessattribute.BusinessAttributeAssociation; +import com.linkedin.businessattribute.BusinessAttributes; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.timeline.data.ChangeCategory; +import com.linkedin.metadata.timeline.data.ChangeEvent; +import com.linkedin.metadata.timeline.data.ChangeOperation; +import com.linkedin.metadata.timeline.data.SemanticChangeType; +import com.linkedin.metadata.timeline.data.dataset.schema.SchemaFieldBusinessAttributeChangeEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BusinessAttributesChangeEventGenerator + extends EntityChangeEventGenerator { + + private static final String BUSINESS_ATTRIBUTE_ADDED_FORMAT = + "BusinessAttribute '%s' added to entity '%s'."; + private static final String BUSINESS_ATTRIBUTE_REMOVED_FORMAT = + "BusinessAttribute '%s' removed from entity '%s'."; + + @Override + public List getChangeEvents( + @Nonnull Urn urn, + @Nonnull String entityName, + @Nonnull String aspectName, + @Nonnull Aspect from, + @Nonnull Aspect to, + @Nonnull AuditStamp auditStamp) { + log.debug( + "Calling BusinessAttributesChangeEventGenerator for entity {} and aspect {}", + entityName, + aspectName); + return computeDiff(urn, entityName, aspectName, from.getValue(), to.getValue(), auditStamp); + } + + private List computeDiff( + Urn urn, + String entityName, + String aspectName, + BusinessAttributes previousValue, + BusinessAttributes newValue, + AuditStamp auditStamp) { + List changeEvents = new ArrayList<>(); + + BusinessAttributeAssociation previousAssociation = + previousValue != null ? previousValue.getBusinessAttribute() : null; + BusinessAttributeAssociation newAssociation = + newValue != null ? newValue.getBusinessAttribute() : null; + + if (Objects.nonNull(previousAssociation) && Objects.isNull(newAssociation)) { + changeEvents.add( + createChangeEvent( + previousAssociation, + urn, + ChangeOperation.REMOVE, + BUSINESS_ATTRIBUTE_REMOVED_FORMAT, + auditStamp)); + + } else if (Objects.isNull(previousAssociation) && Objects.nonNull(newAssociation)) { + changeEvents.add( + createChangeEvent( + newAssociation, + urn, + ChangeOperation.ADD, + BUSINESS_ATTRIBUTE_ADDED_FORMAT, + auditStamp)); + } + return changeEvents; + } + + private ChangeEvent createChangeEvent( + BusinessAttributeAssociation businessAttributeAssociation, + Urn entityUrn, + ChangeOperation changeOperation, + String format, + AuditStamp auditStamp) { + return SchemaFieldBusinessAttributeChangeEvent.schemaFieldBusinessAttributeChangeEventBuilder() + .entityUrn(entityUrn.toString()) + .category(ChangeCategory.BUSINESS_ATTRIBUTE) + .operation(changeOperation) + .modifier(businessAttributeAssociation.getBusinessAttributeUrn().toString()) + .auditStamp(auditStamp) + .semVerChange(SemanticChangeType.MINOR) + .description( + String.format( + format, businessAttributeAssociation.getBusinessAttributeUrn().getId(), entityUrn)) + .parentUrn(entityUrn) + .businessAttributeUrn(businessAttributeAssociation.getBusinessAttributeUrn()) + .datasetUrn(entityUrn.getIdAsUrn()) + .build(); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGeneratorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGeneratorTest.java new file mode 100644 index 00000000000000..fb4c5ca3f96881 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGeneratorTest.java @@ -0,0 +1,124 @@ +package com.linkedin.metadata.timeline.eventgenerator; + +import static org.testng.AssertJUnit.assertEquals; + +import com.linkedin.businessattribute.BusinessAttributeAssociation; +import com.linkedin.businessattribute.BusinessAttributes; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.BusinessAttributeUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.ByteString; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.timeline.data.ChangeEvent; +import com.linkedin.metadata.timeline.data.ChangeOperation; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.SystemMetadata; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import mock.MockEntitySpec; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +public class BusinessAttributesChangeEventGeneratorTest extends AbstractTestNGSpringContextTests { + + private static Urn getSchemaFieldUrn() throws URISyntaxException { + return Urn.createFromString( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD),user_id)"); + } + + private static final String BUSINESS_ATTRIBUTE_URN = + "urn:li:businessAttribute:cypressTestAttribute"; + + private static AuditStamp getTestAuditStamp() throws URISyntaxException { + return new AuditStamp() + .setActor(Urn.createFromString("urn:li:corpuser:__datahub_system")) + .setTime(1683829509553L); + } + + private static Aspect getBusinessAttributes( + BusinessAttributeAssociation association) { + return new Aspect<>( + new BusinessAttributes().setBusinessAttribute(association), new SystemMetadata()); + } + + private static Aspect getNullBusinessAttributes() { + MockEntitySpec mockEntitySpec = new MockEntitySpec("schemaField"); + BusinessAttributes businessAttributes = new BusinessAttributes(); + final AspectSpec aspectSpec = + mockEntitySpec.createAspectSpec(businessAttributes, Constants.BUSINESS_ATTRIBUTE_ASPECT); + final RecordTemplate nullAspect = + GenericRecordUtils.deserializeAspect( + ByteString.copyString("{}", StandardCharsets.UTF_8), "application/json", aspectSpec); + return new Aspect(nullAspect, new SystemMetadata()); + } + + @Test + public void testBusinessAttributeAddition() throws Exception { + BusinessAttributesChangeEventGenerator businessAttributesChangeEventGenerator = + new BusinessAttributesChangeEventGenerator(); + + Urn urn = getSchemaFieldUrn(); + String entity = "schemaField"; + String aspect = "businessAttributes"; + AuditStamp auditStamp = getTestAuditStamp(); + + Aspect from = getNullBusinessAttributes(); + Aspect to = + getBusinessAttributes( + new BusinessAttributeAssociation() + .setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN))); + + List actual = + businessAttributesChangeEventGenerator.getChangeEvents( + urn, entity, aspect, from, to, auditStamp); + assertEquals(1, actual.size()); + assertEquals(ChangeOperation.ADD.name(), actual.get(0).getOperation().name()); + assertEquals(getSchemaFieldUrn(), Urn.createFromString(actual.get(0).getEntityUrn())); + } + + @Test + public void testBusinessAttributeRemoval() throws Exception { + BusinessAttributesChangeEventGenerator test = new BusinessAttributesChangeEventGenerator(); + + Urn urn = getSchemaFieldUrn(); + String entity = "schemaField"; + String aspect = "businessAttributes"; + AuditStamp auditStamp = getTestAuditStamp(); + + Aspect from = + getBusinessAttributes( + new BusinessAttributeAssociation() + .setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN))); + Aspect to = getNullBusinessAttributes(); + + List actual = test.getChangeEvents(urn, entity, aspect, from, to, auditStamp); + assertEquals(1, actual.size()); + assertEquals(ChangeOperation.REMOVE.name(), actual.get(0).getOperation().name()); + assertEquals(getSchemaFieldUrn(), Urn.createFromString(actual.get(0).getEntityUrn())); + } + + @Test + public void testNoChange() throws Exception { + BusinessAttributesChangeEventGenerator test = new BusinessAttributesChangeEventGenerator(); + + Urn urn = getSchemaFieldUrn(); + String entity = "schemaField"; + String aspect = "businessAttributes"; + AuditStamp auditStamp = getTestAuditStamp(); + + Aspect from = + getBusinessAttributes( + new BusinessAttributeAssociation() + .setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN))); + Aspect to = + getBusinessAttributes( + new BusinessAttributeAssociation() + .setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN))); + + List actual = test.getChangeEvents(urn, entity, aspect, from, to, auditStamp); + assertEquals(0, actual.size()); + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java index de570cc91b2fe7..17e34f151ae018 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/event/EntityChangeEventGeneratorHook.java @@ -1,7 +1,5 @@ package com.linkedin.metadata.kafka.hook.event; -import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; @@ -27,6 +25,7 @@ import com.linkedin.platform.event.v1.Parameters; import io.datahubproject.metadata.context.OperationContext; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -65,6 +64,7 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook { Constants.ASSERTION_RUN_EVENT_ASPECT_NAME, Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME, Constants.BUSINESS_ATTRIBUTE_INFO_ASPECT_NAME, + Constants.BUSINESS_ATTRIBUTE_ASPECT, // Entity Lifecycle Event Constants.DATASET_KEY_ASPECT_NAME, @@ -83,13 +83,12 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook { private static final Set SUPPORTED_OPERATIONS = ImmutableSet.of("CREATE", "UPSERT", "DELETE"); - private static final Set ENTITY_EXCLUSIONS = ImmutableSet.of(SCHEMA_FIELD_ENTITY_NAME); - private final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry; private final OperationContext systemOperationContext; private final SystemEntityClient systemEntityClient; private final Boolean isEnabled; @Getter private final String consumerGroupSuffix; + private final List entityExclusions; @Autowired public EntityChangeEventGeneratorHook( @@ -98,13 +97,16 @@ public EntityChangeEventGeneratorHook( final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry, @Nonnull final SystemEntityClient entityClient, @Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled, - @Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix) { + @Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix, + @Nonnull @Value("#{'${entityChangeEvents.entityExclusions}'.split(',')}") + List entityExclusions) { this.systemOperationContext = systemOperationContext; this.entityChangeEventGeneratorRegistry = Objects.requireNonNull(entityChangeEventGeneratorRegistry); this.systemEntityClient = Objects.requireNonNull(entityClient); this.isEnabled = isEnabled; this.consumerGroupSuffix = consumerGroupSuffix; + this.entityExclusions = entityExclusions; } @VisibleForTesting @@ -113,7 +115,13 @@ public EntityChangeEventGeneratorHook( @Nonnull final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry, @Nonnull final SystemEntityClient entityClient, @Nonnull Boolean isEnabled) { - this(systemOperationContext, entityChangeEventGeneratorRegistry, entityClient, isEnabled, ""); + this( + systemOperationContext, + entityChangeEventGeneratorRegistry, + entityClient, + isEnabled, + "", + Collections.emptyList()); } @Override @@ -202,7 +210,7 @@ private List generateChangeEvents( private boolean isEligibleForProcessing(final MetadataChangeLog log) { return SUPPORTED_OPERATIONS.contains(log.getChangeType().toString()) && SUPPORTED_ASPECT_NAMES.contains(log.getAspectName()) - && !ENTITY_EXCLUSIONS.contains(log.getEntityType()); + && !entityExclusions.contains(log.getEntityType()); } private void emitPlatformEvent( diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index b997bc108e4ba1..f6fa4a37fdadbc 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -467,6 +467,7 @@ featureFlags: entityChangeEvents: enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true} consumerGroupSuffix: ${ECE_CONSUMER_GROUP_SUFFIX:} + entityExclusions: ${ECE_ENTITY_EXCLUSIONS:schemaField} # provides a comma separated list of entities to exclude from the ECE hook views: enabled: ${VIEWS_ENABLED:true} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeline/eventgenerator/EntityChangeEventGeneratorRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeline/eventgenerator/EntityChangeEventGeneratorRegistryFactory.java index cd8eb4f1218db4..10770b83ad8811 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeline/eventgenerator/EntityChangeEventGeneratorRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeline/eventgenerator/EntityChangeEventGeneratorRegistryFactory.java @@ -6,6 +6,7 @@ import com.linkedin.metadata.timeline.eventgenerator.AssertionRunEventChangeEventGenerator; import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributeAssociationChangeEventGenerator; import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributeInfoChangeEventGenerator; +import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributesChangeEventGenerator; import com.linkedin.metadata.timeline.eventgenerator.DataProcessInstanceRunEventChangeEventGenerator; import com.linkedin.metadata.timeline.eventgenerator.DatasetPropertiesChangeEventGenerator; import com.linkedin.metadata.timeline.eventgenerator.DeprecationChangeEventGenerator; @@ -59,6 +60,7 @@ protected EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry( BUSINESS_ATTRIBUTE_INFO_ASPECT_NAME, new BusinessAttributeInfoChangeEventGenerator()); registry.register( BUSINESS_ATTRIBUTE_ASSOCIATION, new BusinessAttributeAssociationChangeEventGenerator()); + registry.register(BUSINESS_ATTRIBUTE_ASPECT, new BusinessAttributesChangeEventGenerator()); // Entity Lifecycle Differs registry.register(DATASET_KEY_ASPECT_NAME, new EntityKeyChangeEventGenerator<>());