Skip to content

Commit

Permalink
[Generic DAO][Part 2] Implement backfill method (#385)
Browse files Browse the repository at this point in the history
* [Generic DAO][Part 2] Implement backfill method

* fix

* fix

* support backfill event

* address comment

* fix

* fix test

---------

Co-authored-by: Jesse Jia <[email protected]>
  • Loading branch information
zhixuanjia and Jesse Jia authored Jun 12, 2024
1 parent 6f0bf31 commit 793bd8c
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
}

/**
* Emits backfill MAE for an aspect of an entity and/or backfills SCSI depending on the backfill mode.
* Emits backfill MAE for an aspect of an entity depending on the backfill mode.
*
* @param mode backfill mode
* @param aspect aspect to backfill
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.ExtraInfo;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Value;


Expand All @@ -25,8 +32,11 @@ class MetadataWithExtraInfo {
* @param aspectClass The aspect class for the metadata.
* @param metadata The metadata serialized as JSON string.
* @param auditStamp audit stamp containing information on who and when the metadata is saved.
* @param trackingContext Nullable tracking context contains information passed from metadata events.
* @param ingestionMode Different options for ingestion.
*/
void save(@Nonnull Urn urn, @Nonnull Class aspectClass, @Nonnull String metadata, @Nonnull AuditStamp auditStamp);
void save(@Nonnull Urn urn, @Nonnull Class aspectClass, @Nonnull String metadata, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nullable IngestionMode ingestionMode);

/**
* Query the latest metadata from database.
Expand All @@ -35,4 +45,13 @@ class MetadataWithExtraInfo {
* @return The metadata with extra info regarding auditing.
*/
Optional<MetadataWithExtraInfo> queryLatest(@Nonnull Urn urn, @Nonnull Class aspectClass);

/**
* Backfill secondary storages by triggering MAEs.
* @param mode The backfill mode.
* @param urnToAspect For each urn, the aspects to be backfilled.
* @return The aspect class and its backfilled value.
*/
Map<Urn, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> backfill(@Nonnull BackfillMode mode,
@Nonnull Map<Urn, Set<Class<? extends RecordTemplate>>> urnToAspect);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.metadata.dao.producer;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


/**
* Generic metadata producer without type-bound.
*/
public interface GenericMetadataProducer {

/**
* Produces an aspect specific Metadata Audit Event (MAE) after a metadata aspect is updated for an entity.
*
* @param urn {@link Urn} of the entity
* @param oldValue The value prior to the update, or null if there's none.
* @param newValue The value after the update
* @param auditStamp Containing version auditing information for the metadata change
* @param trackingContext Nullable tracking context passed in to be appended to produced MAEv5s
* @param ingestionMode Different options for ingestion.
*/
void produceAspectSpecificMetadataAuditEvent(@Nonnull Urn urn, @Nullable RecordTemplate oldValue, @Nonnull RecordTemplate newValue,
@Nullable AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, @Nullable IngestionMode ingestionMode);
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.dao.equality.GenericEqualityTester;
import com.linkedin.metadata.dao.exception.RetryLimitReached;
import com.linkedin.metadata.dao.tracking.TrackingUtils;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.dao.producer.GenericMetadataProducer;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.ExtraInfo;
import io.ebean.DuplicateKeyException;
import io.ebean.EbeanServer;
import io.ebean.SqlUpdate;
import io.ebean.Transaction;
import io.ebean.config.ServerConfig;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +38,7 @@
import static com.linkedin.metadata.dao.EbeanMetadataAspect.*;
import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*;
import static com.linkedin.metadata.dao.utils.EbeanServerUtils.*;
import static com.linkedin.metadata.dao.utils.IngestionUtils.*;
import static com.linkedin.metadata.dao.utils.RecordUtils.toRecordTemplate;
import static com.linkedin.metadata.dao.utils.SQLStatementUtils.*;

Expand All @@ -44,10 +52,15 @@ public class EbeanGenericLocalDAO implements GenericLocalDAO {

private final EbeanServer _server;

private final GenericMetadataProducer _producer;

private Map<Class, GenericEqualityTester> _equalityTesters = new HashMap<>();

public EbeanGenericLocalDAO(@Nonnull ServerConfig serverConfig) {
private static final String BACKFILL_EMITTER = "dao_backfill_endpoint";

public EbeanGenericLocalDAO(@Nonnull ServerConfig serverConfig, @Nonnull GenericMetadataProducer producer) {
_server = createServer(serverConfig);
_producer = producer;
}

/**
Expand All @@ -69,20 +82,34 @@ public void setEqualityTesters(Map<Class, GenericEqualityTester> equalityTesters
* @param aspectClass The aspect class for the metadata.
* @param metadata The metadata serialized as JSON string.
* @param auditStamp audit stamp containing information on who and when the metadata is saved.
* @param trackingContext Nullable tracking context contains information passed from metadata events.
* @param ingestionMode Different options for ingestion.
*/
public void save(@Nonnull Urn urn, @Nonnull Class aspectClass, @Nonnull String metadata, @Nonnull AuditStamp auditStamp) {
public void save(@Nonnull Urn urn, @Nonnull Class aspectClass, @Nonnull String metadata, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nullable IngestionMode ingestionMode) {
runInTransactionWithRetry(() -> {
final Optional<GenericLocalDAO.MetadataWithExtraInfo> latest = queryLatest(urn, aspectClass);
RecordTemplate newValue = toRecordTemplate(aspectClass, metadata);

if (!latest.isPresent()) {
saveLatest(urn, aspectClass, newValue, null, auditStamp, null);
_producer.produceAspectSpecificMetadataAuditEvent(urn, null, newValue, auditStamp, trackingContext, ingestionMode);
} else {
RecordTemplate currentValue = toRecordTemplate(aspectClass, latest.get().getAspect());
final AuditStamp oldAuditStamp = latest.get().getExtraInfo() == null ? null : latest.get().getExtraInfo().getAudit();

// Check ingestion tracking context to determine if the metadata is being backfilled.
final boolean isBackfillEvent = trackingContext != null && trackingContext.hasBackfill() && trackingContext.isBackfill();

// Skip update if metadata is sent by an "expired" backfill event so that we don't overwrite the latest metadata.
if (isBackfillEvent && isExpiredBackfill(trackingContext, oldAuditStamp)) {
return null;
}

// Skip update if current value and new value are equal.
if (!areEqual(currentValue, newValue, _equalityTesters.get(aspectClass))) {
saveLatest(urn, aspectClass, newValue, currentValue, auditStamp, latest.get().getExtraInfo().getAudit());
_producer.produceAspectSpecificMetadataAuditEvent(urn, currentValue, newValue, auditStamp, trackingContext, ingestionMode);
}
}
return null;
Expand All @@ -109,6 +136,62 @@ public Optional<GenericLocalDAO.MetadataWithExtraInfo> queryLatest(@Nonnull Urn
return Optional.of(new GenericLocalDAO.MetadataWithExtraInfo(metadata.getMetadata(), extraInfo));
}

/**
* Trigger MAE events to backfill secondary stores.
* @return The aspects being backfilled.
*/
@Nonnull
public Map<Urn, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> backfill(
@Nonnull BackfillMode mode, @Nonnull Map<Urn, Set<Class<? extends RecordTemplate>>> aspectClasses) {

if (aspectClasses.isEmpty()) {
return Collections.emptyMap();
}

Map<Urn, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> urnToAspects = new HashMap<>();

for (Urn urn : aspectClasses.keySet()) {
Set<Class<? extends RecordTemplate>> aspects = aspectClasses.get(urn);
Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> aspectVals = new HashMap<>();

for (Class<? extends RecordTemplate> aspect : aspects) {
Optional<GenericLocalDAO.MetadataWithExtraInfo> metadata = queryLatest(urn, aspect);

metadata.ifPresent(metadataWithExtraInfo -> aspectVals.put(aspect,
Optional.of(RecordUtils.toRecordTemplate(aspect, metadataWithExtraInfo.getAspect()))));
}

urnToAspects.put(urn, aspectVals);
}

urnToAspects.forEach((urn, aspects) -> {
aspects.forEach((aspectClass, aspect) -> backfill(mode, aspect.get(), urn));
});
return urnToAspects;
}

/**
* Emits backfill MAE for an aspect of an entity depending on the backfill mode.
*
* @param mode backfill mode
* @param aspect aspect to backfill
* @param urn {@link Urn} for the entity
*/
private void backfill(@Nonnull BackfillMode mode, @Nonnull RecordTemplate aspect, @Nonnull Urn urn) {

if (mode == BackfillMode.MAE_ONLY
|| mode == BackfillMode.BACKFILL_ALL
|| mode == BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX) {

IngestionMode ingestionMode = ALLOWED_INGESTION_BACKFILL_BIMAP.inverse().get(mode);

IngestionTrackingContext trackingContext = buildIngestionTrackingContext(
TrackingUtils.getRandomUUID(), BACKFILL_EMITTER, System.currentTimeMillis());

_producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, null, trackingContext, ingestionMode);
}
}

/**
* Save metadata into database.
*/
Expand Down Expand Up @@ -206,15 +289,6 @@ private SqlUpdate assembleSchemaSqlUpdate(@Nonnull EbeanMetadataAspect aspect, @
return update;
}

// TODO: This validation is still weak. It can only make sure urn is in "urn:li:entity:foo" format.
private void validateUrn(String urn) {
try {
Urn.createFromCharSequence(urn);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid Urn format");
}
}

private boolean areEqual(@Nonnull RecordTemplate r1, @Nonnull RecordTemplate r2, @Nullable GenericEqualityTester equalityTester) {
if (equalityTester != null) {
return equalityTester.equals(r1, r2);
Expand All @@ -223,6 +297,14 @@ private boolean areEqual(@Nonnull RecordTemplate r1, @Nonnull RecordTemplate r2,
return DataTemplateUtil.areEqual(r1, r2);
}

@VisibleForTesting
protected boolean isExpiredBackfill(@Nullable IngestionTrackingContext trackingContext, @Nullable AuditStamp currentAuditStamp) {

// If trackingContext.getEmitTime() > currentAuditStamp.getTime(), then backfill event has the latest metadata. Hence, we should backfill.
return !(trackingContext != null && trackingContext.hasEmitTime() && currentAuditStamp != null && currentAuditStamp.hasTime()
&& trackingContext.getEmitTime() > currentAuditStamp.getTime());
}

@Nonnull
protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTransactionRetry) {
int retryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
// import lombok.SneakyThrows;
// import org.json.simple.JSONObject;
// import org.json.simple.parser.JSONParser;


/**
Expand Down
Loading

0 comments on commit 793bd8c

Please sign in to comment.