Skip to content

Commit

Permalink
feat(businessAttribute): generate platform events on association/remo…
Browse files Browse the repository at this point in the history
…val with schemaField (#12224)
  • Loading branch information
deepgarg-visa authored Dec 27, 2024
1 parent 4e3103e commit 6b6d820
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 8 deletions.
2 changes: 1 addition & 1 deletion metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ dependencies {
testImplementation(testFixtures(project(":entity-registry")))

testAnnotationProcessor externalDependency.lombok

testImplementation project(':mock-entity-registry')
constraints {
implementation(externalDependency.log4jCore) {
because("previous versions are vulnerable to CVE-2021-45105")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.metadata.timeline.data.dataset.schema;

import com.google.common.collect.ImmutableMap;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import com.linkedin.metadata.timeline.data.SemanticChangeType;
import lombok.Builder;

public class SchemaFieldBusinessAttributeChangeEvent extends ChangeEvent {
@Builder(builderMethodName = "schemaFieldBusinessAttributeChangeEventBuilder")
public SchemaFieldBusinessAttributeChangeEvent(
String entityUrn,
ChangeCategory category,
ChangeOperation operation,
String modifier,
AuditStamp auditStamp,
SemanticChangeType semVerChange,
String description,
Urn parentUrn,
Urn businessAttributeUrn,
Urn datasetUrn) {
super(
entityUrn,
category,
operation,
modifier,
ImmutableMap.of(
"parentUrn", parentUrn.toString(),
"businessAttributeUrn", businessAttributeUrn.toString(),
"datasetUrn", datasetUrn.toString()),
auditStamp,
semVerChange,
description);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.linkedin.businessattribute.BusinessAttributeAssociation;
import com.linkedin.businessattribute.BusinessAttributes;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import com.linkedin.metadata.timeline.data.SemanticChangeType;
import com.linkedin.metadata.timeline.data.dataset.schema.SchemaFieldBusinessAttributeChangeEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BusinessAttributesChangeEventGenerator
extends EntityChangeEventGenerator<BusinessAttributes> {

private static final String BUSINESS_ATTRIBUTE_ADDED_FORMAT =
"BusinessAttribute '%s' added to entity '%s'.";
private static final String BUSINESS_ATTRIBUTE_REMOVED_FORMAT =
"BusinessAttribute '%s' removed from entity '%s'.";

@Override
public List<ChangeEvent> getChangeEvents(
@Nonnull Urn urn,
@Nonnull String entityName,
@Nonnull String aspectName,
@Nonnull Aspect<BusinessAttributes> from,
@Nonnull Aspect<BusinessAttributes> to,
@Nonnull AuditStamp auditStamp) {
log.debug(
"Calling BusinessAttributesChangeEventGenerator for entity {} and aspect {}",
entityName,
aspectName);
return computeDiff(urn, entityName, aspectName, from.getValue(), to.getValue(), auditStamp);
}

private List<ChangeEvent> computeDiff(
Urn urn,
String entityName,
String aspectName,
BusinessAttributes previousValue,
BusinessAttributes newValue,
AuditStamp auditStamp) {
List<ChangeEvent> changeEvents = new ArrayList<>();

BusinessAttributeAssociation previousAssociation =
previousValue != null ? previousValue.getBusinessAttribute() : null;
BusinessAttributeAssociation newAssociation =
newValue != null ? newValue.getBusinessAttribute() : null;

if (Objects.nonNull(previousAssociation) && Objects.isNull(newAssociation)) {
changeEvents.add(
createChangeEvent(
previousAssociation,
urn,
ChangeOperation.REMOVE,
BUSINESS_ATTRIBUTE_REMOVED_FORMAT,
auditStamp));

} else if (Objects.isNull(previousAssociation) && Objects.nonNull(newAssociation)) {
changeEvents.add(
createChangeEvent(
newAssociation,
urn,
ChangeOperation.ADD,
BUSINESS_ATTRIBUTE_ADDED_FORMAT,
auditStamp));
}
return changeEvents;
}

private ChangeEvent createChangeEvent(
BusinessAttributeAssociation businessAttributeAssociation,
Urn entityUrn,
ChangeOperation changeOperation,
String format,
AuditStamp auditStamp) {
return SchemaFieldBusinessAttributeChangeEvent.schemaFieldBusinessAttributeChangeEventBuilder()
.entityUrn(entityUrn.toString())
.category(ChangeCategory.BUSINESS_ATTRIBUTE)
.operation(changeOperation)
.modifier(businessAttributeAssociation.getBusinessAttributeUrn().toString())
.auditStamp(auditStamp)
.semVerChange(SemanticChangeType.MINOR)
.description(
String.format(
format, businessAttributeAssociation.getBusinessAttributeUrn().getId(), entityUrn))
.parentUrn(entityUrn)
.businessAttributeUrn(businessAttributeAssociation.getBusinessAttributeUrn())
.datasetUrn(entityUrn.getIdAsUrn())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.linkedin.metadata.timeline.eventgenerator;

import static org.testng.AssertJUnit.assertEquals;

import com.linkedin.businessattribute.BusinessAttributeAssociation;
import com.linkedin.businessattribute.BusinessAttributes;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.BusinessAttributeUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.ByteString;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.SystemMetadata;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import mock.MockEntitySpec;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

public class BusinessAttributesChangeEventGeneratorTest extends AbstractTestNGSpringContextTests {

private static Urn getSchemaFieldUrn() throws URISyntaxException {
return Urn.createFromString(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD),user_id)");
}

private static final String BUSINESS_ATTRIBUTE_URN =
"urn:li:businessAttribute:cypressTestAttribute";

private static AuditStamp getTestAuditStamp() throws URISyntaxException {
return new AuditStamp()
.setActor(Urn.createFromString("urn:li:corpuser:__datahub_system"))
.setTime(1683829509553L);
}

private static Aspect<BusinessAttributes> getBusinessAttributes(
BusinessAttributeAssociation association) {
return new Aspect<>(
new BusinessAttributes().setBusinessAttribute(association), new SystemMetadata());
}

private static Aspect<BusinessAttributes> getNullBusinessAttributes() {
MockEntitySpec mockEntitySpec = new MockEntitySpec("schemaField");
BusinessAttributes businessAttributes = new BusinessAttributes();
final AspectSpec aspectSpec =
mockEntitySpec.createAspectSpec(businessAttributes, Constants.BUSINESS_ATTRIBUTE_ASPECT);
final RecordTemplate nullAspect =
GenericRecordUtils.deserializeAspect(
ByteString.copyString("{}", StandardCharsets.UTF_8), "application/json", aspectSpec);
return new Aspect(nullAspect, new SystemMetadata());
}

@Test
public void testBusinessAttributeAddition() throws Exception {
BusinessAttributesChangeEventGenerator businessAttributesChangeEventGenerator =
new BusinessAttributesChangeEventGenerator();

Urn urn = getSchemaFieldUrn();
String entity = "schemaField";
String aspect = "businessAttributes";
AuditStamp auditStamp = getTestAuditStamp();

Aspect<BusinessAttributes> from = getNullBusinessAttributes();
Aspect<BusinessAttributes> to =
getBusinessAttributes(
new BusinessAttributeAssociation()
.setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN)));

List<ChangeEvent> actual =
businessAttributesChangeEventGenerator.getChangeEvents(
urn, entity, aspect, from, to, auditStamp);
assertEquals(1, actual.size());
assertEquals(ChangeOperation.ADD.name(), actual.get(0).getOperation().name());
assertEquals(getSchemaFieldUrn(), Urn.createFromString(actual.get(0).getEntityUrn()));
}

@Test
public void testBusinessAttributeRemoval() throws Exception {
BusinessAttributesChangeEventGenerator test = new BusinessAttributesChangeEventGenerator();

Urn urn = getSchemaFieldUrn();
String entity = "schemaField";
String aspect = "businessAttributes";
AuditStamp auditStamp = getTestAuditStamp();

Aspect<BusinessAttributes> from =
getBusinessAttributes(
new BusinessAttributeAssociation()
.setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN)));
Aspect<BusinessAttributes> to = getNullBusinessAttributes();

List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to, auditStamp);
assertEquals(1, actual.size());
assertEquals(ChangeOperation.REMOVE.name(), actual.get(0).getOperation().name());
assertEquals(getSchemaFieldUrn(), Urn.createFromString(actual.get(0).getEntityUrn()));
}

@Test
public void testNoChange() throws Exception {
BusinessAttributesChangeEventGenerator test = new BusinessAttributesChangeEventGenerator();

Urn urn = getSchemaFieldUrn();
String entity = "schemaField";
String aspect = "businessAttributes";
AuditStamp auditStamp = getTestAuditStamp();

Aspect<BusinessAttributes> from =
getBusinessAttributes(
new BusinessAttributeAssociation()
.setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN)));
Aspect<BusinessAttributes> to =
getBusinessAttributes(
new BusinessAttributeAssociation()
.setBusinessAttributeUrn(new BusinessAttributeUrn(BUSINESS_ATTRIBUTE_URN)));

List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to, auditStamp);
assertEquals(0, actual.size());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.metadata.kafka.hook.event;

import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
Expand All @@ -27,6 +25,7 @@
import com.linkedin.platform.event.v1.Parameters;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -65,6 +64,7 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
Constants.ASSERTION_RUN_EVENT_ASPECT_NAME,
Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
Constants.BUSINESS_ATTRIBUTE_INFO_ASPECT_NAME,
Constants.BUSINESS_ATTRIBUTE_ASPECT,

// Entity Lifecycle Event
Constants.DATASET_KEY_ASPECT_NAME,
Expand All @@ -83,13 +83,12 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
private static final Set<String> SUPPORTED_OPERATIONS =
ImmutableSet.of("CREATE", "UPSERT", "DELETE");

private static final Set<String> ENTITY_EXCLUSIONS = ImmutableSet.of(SCHEMA_FIELD_ENTITY_NAME);

private final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry;
private final OperationContext systemOperationContext;
private final SystemEntityClient systemEntityClient;
private final Boolean isEnabled;
@Getter private final String consumerGroupSuffix;
private final List<String> entityExclusions;

@Autowired
public EntityChangeEventGeneratorHook(
Expand All @@ -98,13 +97,16 @@ public EntityChangeEventGeneratorHook(
final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry,
@Nonnull final SystemEntityClient entityClient,
@Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled,
@Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix) {
@Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix,
@Nonnull @Value("#{'${entityChangeEvents.entityExclusions}'.split(',')}")
List<String> entityExclusions) {
this.systemOperationContext = systemOperationContext;
this.entityChangeEventGeneratorRegistry =
Objects.requireNonNull(entityChangeEventGeneratorRegistry);
this.systemEntityClient = Objects.requireNonNull(entityClient);
this.isEnabled = isEnabled;
this.consumerGroupSuffix = consumerGroupSuffix;
this.entityExclusions = entityExclusions;
}

@VisibleForTesting
Expand All @@ -113,7 +115,13 @@ public EntityChangeEventGeneratorHook(
@Nonnull final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry,
@Nonnull final SystemEntityClient entityClient,
@Nonnull Boolean isEnabled) {
this(systemOperationContext, entityChangeEventGeneratorRegistry, entityClient, isEnabled, "");
this(
systemOperationContext,
entityChangeEventGeneratorRegistry,
entityClient,
isEnabled,
"",
Collections.emptyList());
}

@Override
Expand Down Expand Up @@ -202,7 +210,7 @@ private <T extends RecordTemplate> List<ChangeEvent> generateChangeEvents(
private boolean isEligibleForProcessing(final MetadataChangeLog log) {
return SUPPORTED_OPERATIONS.contains(log.getChangeType().toString())
&& SUPPORTED_ASPECT_NAMES.contains(log.getAspectName())
&& !ENTITY_EXCLUSIONS.contains(log.getEntityType());
&& !entityExclusions.contains(log.getEntityType());
}

private void emitPlatformEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ featureFlags:
entityChangeEvents:
enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true}
consumerGroupSuffix: ${ECE_CONSUMER_GROUP_SUFFIX:}
entityExclusions: ${ECE_ENTITY_EXCLUSIONS:schemaField} # provides a comma separated list of entities to exclude from the ECE hook

views:
enabled: ${VIEWS_ENABLED:true}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.timeline.eventgenerator.AssertionRunEventChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributeAssociationChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributeInfoChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributesChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DataProcessInstanceRunEventChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DatasetPropertiesChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DeprecationChangeEventGenerator;
Expand Down Expand Up @@ -59,6 +60,7 @@ protected EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry(
BUSINESS_ATTRIBUTE_INFO_ASPECT_NAME, new BusinessAttributeInfoChangeEventGenerator());
registry.register(
BUSINESS_ATTRIBUTE_ASSOCIATION, new BusinessAttributeAssociationChangeEventGenerator());
registry.register(BUSINESS_ATTRIBUTE_ASPECT, new BusinessAttributesChangeEventGenerator());

// Entity Lifecycle Differs
registry.register(DATASET_KEY_ASPECT_NAME, new EntityKeyChangeEventGenerator<>());
Expand Down

0 comments on commit 6b6d820

Please sign in to comment.