Skip to content

Commit

Permalink
fix(mysql): index gap lock deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Dec 14, 2024
1 parent bc4c7c6 commit 87b06ec
Show file tree
Hide file tree
Showing 31 changed files with 1,253 additions and 151 deletions.
3 changes: 3 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ Another form of conditional writes which considers the existence of an aspect or

`CREATE_ENTITY` - Create the aspect if no aspects exist for the entity.

By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation
should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public static AspectValidationException forPrecondition(BatchItem item, String m
return forPrecondition(item, msg, null);
}

public static AspectValidationException forFilter(BatchItem item, String msg) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.FILTER);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.PRECONDITION, e);
Expand Down Expand Up @@ -65,8 +70,12 @@ public Pair<Urn, String> getAspectGroup() {
return Pair.of(entityUrn, aspectName);
}

public static enum SubType {
public enum SubType {
// A validation exception is thrown
VALIDATION,
PRECONDITION
// A failed precondition is throw if the client constraints are not met
PRECONDITION,
// Exclude from processing further
FILTER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
@Getter
@Accessors(chain = true)
public class CreateIfNotExistsValidator extends AspectPayloadValidator {
public static final String FILTER_EXCEPTION_HEADER = "If-None-Match";
public static final String FILTER_EXCEPTION_VALUE = "*";

@Nonnull private AspectPluginConfig config;

Expand All @@ -49,22 +51,34 @@ protected Stream<AspectValidationException> validatePreCommitAspects(
.filter(item -> ChangeType.CREATE_ENTITY.equals(item.getChangeType()))
.collect(Collectors.toSet())) {
// if the key aspect is missing in the batch, the entity exists and CREATE_ENTITY should be
// denied
// denied or dropped
if (!entityKeyMap.containsKey(createEntityItem.getUrn())) {
exceptions.addException(
createEntityItem,
"Cannot perform CREATE_ENTITY if not exists since the entity key already exists.");
if (isPrecondition(createEntityItem)) {
exceptions.addException(
AspectValidationException.forFilter(
createEntityItem, "Dropping write per precondition header If-None-Match: *"));
} else {
exceptions.addException(
createEntityItem,
"Cannot perform CREATE_ENTITY if not exists since the entity key already exists.");
}
}
}

for (ChangeMCP createItem :
changeMCPs.stream()
.filter(item -> ChangeType.CREATE.equals(item.getChangeType()))
.collect(Collectors.toSet())) {
// if a CREATE item has a previous value, should be denied
// if a CREATE item has a previous value, should be denied or dropped
if (createItem.getPreviousRecordTemplate() != null) {
exceptions.addException(
createItem, "Cannot perform CREATE since the aspect already exists.");
if (isPrecondition(createItem)) {
exceptions.addException(
AspectValidationException.forFilter(
createItem, "Dropping write per precondition header If-None-Match: *"));
} else {
exceptions.addException(
createItem, "Cannot perform CREATE since the aspect already exists.");
}
}
}

Expand All @@ -77,4 +91,9 @@ protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
}

private static boolean isPrecondition(ChangeMCP item) {
return item.getHeader(FILTER_EXCEPTION_HEADER).stream()
.anyMatch(FILTER_EXCEPTION_VALUE::equals);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -140,7 +141,7 @@ public Map<String, String> getHeaders() {
mcp ->
mcp.getHeaders().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(headers);
.orElse(headers != null ? headers : Collections.emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataplatform.DataPlatformInfo;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
Expand Down Expand Up @@ -98,7 +100,8 @@ public static List<MCPItem> getAdditionalChanges(
.filter(item -> SUPPORTED_TYPES.contains(item.getChangeType()))
.collect(Collectors.groupingBy(BatchItem::getUrn));

Set<Urn> urnsWithExistingKeyAspects = entityService.exists(opContext, itemsByUrn.keySet());
Set<Urn> urnsWithExistingKeyAspects =
entityService.exists(opContext, itemsByUrn.keySet(), true, true);

// create default aspects when key aspect is missing
return itemsByUrn.entrySet().stream()
Expand Down Expand Up @@ -126,7 +129,7 @@ public static List<MCPItem> getAdditionalChanges(
// pick the first item as a template (use entity information)
MCPItem templateItem = aspectsEntry.getValue().get(0);

// generate default aspects (including key aspect, always upserts)
// generate default aspects (including key aspect)
return defaultAspects.stream()
.map(
entry ->
Expand Down Expand Up @@ -215,7 +218,7 @@ private static List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissin
if (!fetchAspects.isEmpty()) {

Set<String> latestAspects =
entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects).keySet();
entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects, true).keySet();

return fetchAspects.stream()
.filter(aspectName -> !latestAspects.contains(aspectName))
Expand Down Expand Up @@ -347,6 +350,11 @@ public static MetadataChangeProposal getProposalFromAspectForDefault(
proposal.setAspectName(aspectName);
// already checked existence, default aspects should be changeType CREATE
proposal.setChangeType(ChangeType.CREATE);
proposal.setHeaders(
new StringMap(
Map.of(
CreateIfNotExistsValidator.FILTER_EXCEPTION_HEADER,
CreateIfNotExistsValidator.FILTER_EXCEPTION_VALUE)));

// Set fields determined from original
if (templateItem.getSystemMetadata() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ EntityAspect getAspect(

@Nonnull
Map<EntityAspectIdentifier, EntityAspect> batchGet(
@Nonnull final Set<EntityAspectIdentifier> keys);
@Nonnull final Set<EntityAspectIdentifier> keys, boolean forUpdate);

@Nonnull
List<EntityAspect> getAspectsInRange(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
import com.linkedin.metadata.config.PreProcessHooks;
Expand Down Expand Up @@ -238,7 +239,7 @@ public Map<Urn, List<RecordTemplate>> getLatestAspects(
boolean alwaysIncludeKeyAspect) {

Map<EntityAspectIdentifier, EntityAspect> batchGetResults =
getLatestAspect(opContext, urns, aspectNames);
getLatestAspect(opContext, urns, aspectNames, false);

// Fetch from db and populate urn -> aspect map.
final Map<Urn, List<RecordTemplate>> urnToAspects = new HashMap<>();
Expand Down Expand Up @@ -285,9 +286,10 @@ public Map<Urn, List<RecordTemplate>> getLatestAspects(
public Map<String, RecordTemplate> getLatestAspectsForUrn(
@Nonnull OperationContext opContext,
@Nonnull final Urn urn,
@Nonnull final Set<String> aspectNames) {
@Nonnull final Set<String> aspectNames,
boolean forUpdate) {
Map<EntityAspectIdentifier, EntityAspect> batchGetResults =
getLatestAspect(opContext, new HashSet<>(Arrays.asList(urn)), aspectNames);
getLatestAspect(opContext, new HashSet<>(Arrays.asList(urn)), aspectNames, forUpdate);

return EntityUtils.toSystemAspects(
opContext.getRetrieverContext().get(), batchGetResults.values())
Expand Down Expand Up @@ -868,7 +870,12 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

// Read before write is unfortunate, however batch it
final Map<String, Set<String>> urnAspects = batchWithDefaults.getUrnAspectsMap();

// read #1
// READ COMMITED is used in conjunction with SELECT FOR UPDATE (read lock) in order
// to ensure that the aspect's version is not modified outside the transaction.
// We rely on the retry mechanism if the row is modified and will re-read (require the
// lock)
Map<String, Map<String, EntityAspect>> databaseAspects =
aspectDao.getLatestAspects(urnAspects, true);

Expand Down Expand Up @@ -936,19 +943,32 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
// do final pre-commit checks with previous aspect value
ValidationExceptionCollection exceptions =
AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get());
if (!exceptions.isEmpty()) {
MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc();
throw new ValidationException(collectMetrics(exceptions).toString());

if (exceptions
.streamAllExceptions()
.anyMatch(e -> e.getSubType() != AspectValidationException.SubType.FILTER)) {

// IF this is a client request/API request we fail the `transaction batch`
if (opContext.getRequestContext() != null) {
MetricUtils.counter(EntityServiceImpl.class, "batch_request_validation_exception")
.inc();
throw new ValidationException(collectMetrics(exceptions).toString());
}

MetricUtils.counter(EntityServiceImpl.class, "batch_consumer_validation_exception")
.inc();
log.error("mce-consumer batch exceptions: {}", collectMetrics(exceptions));
}

// Database Upsert results
// Database Upsert successfully validated results
log.info(
"Ingesting aspects batch to database: {}",
AspectsBatch.toAbbreviatedString(changeMCPs, 2048));
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
List<UpdateAspectResult> upsertResults =
changeMCPs.stream()
exceptions
.streamSuccessful(changeMCPs.stream())
.map(
writeItem -> {

Expand Down Expand Up @@ -1498,7 +1518,7 @@ public List<RestoreIndicesResult> restoreIndices(
List<SystemAspect> systemAspects =
EntityUtils.toSystemAspects(
opContext.getRetrieverContext().get(),
getLatestAspect(opContext, entityBatch.getValue(), aspectNames).values());
getLatestAspect(opContext, entityBatch.getValue(), aspectNames, false).values());
long timeSqlQueryMs = System.currentTimeMillis() - startTime;

RestoreIndicesResult result = restoreIndices(opContext, systemAspects, s -> {});
Expand Down Expand Up @@ -2168,7 +2188,8 @@ public Set<Urn> exists(
@Nonnull OperationContext opContext,
@Nonnull final Collection<Urn> urns,
@Nullable String aspectName,
boolean includeSoftDeleted) {
boolean includeSoftDeleted,
boolean forUpdate) {
final Set<EntityAspectIdentifier> dbKeys =
urns.stream()
.map(
Expand All @@ -2184,11 +2205,11 @@ public Set<Urn> exists(
: aspectName,
ASPECT_LATEST_VERSION))
.collect(Collectors.toSet());
final Map<EntityAspectIdentifier, EntityAspect> aspects = aspectDao.batchGet(dbKeys);
final Map<EntityAspectIdentifier, EntityAspect> aspects = aspectDao.batchGet(dbKeys, forUpdate);
final Set<String> existingUrnStrings =
aspects.values().stream()
.filter(aspect -> aspect != null)
.map(aspect -> aspect.getUrn())
.filter(Objects::nonNull)
.map(EntityAspect::getUrn)
.collect(Collectors.toSet());

Set<Urn> existing =
Expand Down Expand Up @@ -2444,7 +2465,8 @@ protected AuditStamp createSystemAuditStamp() {
private Map<EntityAspectIdentifier, EntityAspect> getLatestAspect(
@Nonnull OperationContext opContext,
@Nonnull final Set<Urn> urns,
@Nonnull final Set<String> aspectNames) {
@Nonnull final Set<String> aspectNames,
boolean forUpdate) {

log.debug("Invoked getLatestAspects with urns: {}, aspectNames: {}", urns, aspectNames);

Expand All @@ -2468,7 +2490,8 @@ private Map<EntityAspectIdentifier, EntityAspect> getLatestAspect(
Map<EntityAspectIdentifier, EntityAspect> batchGetResults = new HashMap<>();
Iterators.partition(dbKeys.iterator(), MAX_KEYS_PER_QUERY)
.forEachRemaining(
batch -> batchGetResults.putAll(aspectDao.batchGet(ImmutableSet.copyOf(batch))));
batch ->
batchGetResults.putAll(aspectDao.batchGet(ImmutableSet.copyOf(batch), forUpdate)));
return batchGetResults;
}

Expand All @@ -2487,7 +2510,7 @@ private long calculateVersionNumber(

private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(
@Nonnull OperationContext opContext, final Set<EntityAspectIdentifier> dbKeys) {
final Map<EntityAspectIdentifier, EntityAspect> dbEntries = aspectDao.batchGet(dbKeys);
final Map<EntityAspectIdentifier, EntityAspect> dbEntries = aspectDao.batchGet(dbKeys, false);

List<SystemAspect> envelopedAspects =
EntityUtils.toSystemAspects(opContext.getRetrieverContext().get(), dbEntries.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void saveAspect(
@Override
@Nonnull
public Map<EntityAspectIdentifier, EntityAspect> batchGet(
@Nonnull final Set<EntityAspectIdentifier> keys) {
@Nonnull final Set<EntityAspectIdentifier> keys, boolean forUpdate) {
validateConnection();
return keys.stream()
.map(this::getAspect)
Expand Down
Loading

0 comments on commit 87b06ec

Please sign in to comment.