Skip to content

Commit

Permalink
fix(mysql): index gap lock deadlock (#12119)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Dec 14, 2024
1 parent 50a7560 commit ab15fb9
Show file tree
Hide file tree
Showing 32 changed files with 1,285 additions and 175 deletions.
3 changes: 3 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ Another form of conditional writes which considers the existence of an aspect or

`CREATE_ENTITY` - Create the aspect if no aspects exist for the entity.

By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation
should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP.

Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,39 @@ public static AspectValidationException forItem(BatchItem item, String msg) {
}

public static AspectValidationException forItem(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.VALIDATION, e);
return new AspectValidationException(item, msg, SubType.VALIDATION, e);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg) {
return forPrecondition(item, msg, null);
}

public static AspectValidationException forFilter(BatchItem item, String msg) {
return new AspectValidationException(item, msg, SubType.FILTER);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.PRECONDITION, e);
return new AspectValidationException(item, msg, SubType.PRECONDITION, e);
}

@Nonnull BatchItem item;
@Nonnull ChangeType changeType;
@Nonnull Urn entityUrn;
@Nonnull String aspectName;
@Nonnull SubType subType;
@Nullable String msg;

public AspectValidationException(
@Nonnull ChangeType changeType,
@Nonnull Urn entityUrn,
@Nonnull String aspectName,
String msg,
SubType subType) {
this(changeType, entityUrn, aspectName, msg, subType, null);
public AspectValidationException(@Nonnull BatchItem item, String msg, SubType subType) {
this(item, msg, subType, null);
}

public AspectValidationException(
@Nonnull ChangeType changeType,
@Nonnull Urn entityUrn,
@Nonnull String aspectName,
@Nonnull String msg,
@Nullable SubType subType,
Exception e) {
@Nonnull BatchItem item, @Nonnull String msg, @Nullable SubType subType, Exception e) {
super(msg, e);
this.changeType = changeType;
this.entityUrn = entityUrn;
this.aspectName = aspectName;
this.item = item;
this.changeType = item.getChangeType();
this.entityUrn = item.getUrn();
this.aspectName = item.getAspectName();
this.msg = msg;
this.subType = subType != null ? subType : SubType.VALIDATION;
}
Expand All @@ -65,8 +59,12 @@ public Pair<Urn, String> getAspectGroup() {
return Pair.of(entityUrn, aspectName);
}

public static enum SubType {
public enum SubType {
// A validation exception is thrown
VALIDATION,
PRECONDITION
// A failed precondition is thrown if the header constraints are not met
PRECONDITION,
// Exclude from processing further
FILTER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,38 @@
public class ValidationExceptionCollection
extends HashMap<Pair<Urn, String>, Set<AspectValidationException>> {

private final Set<Integer> failedHashCodes;
private final Set<Integer> filteredHashCodes;

public ValidationExceptionCollection() {
super();
this.failedHashCodes = new HashSet<>();
this.filteredHashCodes = new HashSet<>();
}

public boolean hasFatalExceptions() {
return !failedHashCodes.isEmpty();
}

public static ValidationExceptionCollection newCollection() {
return new ValidationExceptionCollection();
}

public void addException(AspectValidationException exception) {
super.computeIfAbsent(exception.getAspectGroup(), key -> new HashSet<>()).add(exception);
if (!AspectValidationException.SubType.FILTER.equals(exception.getSubType())) {
failedHashCodes.add(exception.getItem().hashCode());
} else {
filteredHashCodes.add(exception.getItem().hashCode());
}
}

public void addException(BatchItem item, String message) {
addException(item, message, null);
}

public void addException(BatchItem item, String message, Exception ex) {
super.computeIfAbsent(Pair.of(item.getUrn(), item.getAspectName()), key -> new HashSet<>())
.add(AspectValidationException.forItem(item, message, ex));
addException(AspectValidationException.forItem(item, message, ex));
}

public Stream<AspectValidationException> streamAllExceptions() {
Expand All @@ -41,15 +58,16 @@ public <T extends BatchItem> Collection<T> successful(Collection<T> items) {
}

public <T extends BatchItem> Stream<T> streamSuccessful(Stream<T> items) {
return items.filter(i -> !this.containsKey(Pair.of(i.getUrn(), i.getAspectName())));
return items.filter(
i -> !failedHashCodes.contains(i.hashCode()) && !filteredHashCodes.contains(i.hashCode()));
}

public <T extends BatchItem> Collection<T> exceptions(Collection<T> items) {
return streamExceptions(items.stream()).collect(Collectors.toList());
}

public <T extends BatchItem> Stream<T> streamExceptions(Stream<T> items) {
return items.filter(i -> this.containsKey(Pair.of(i.getUrn(), i.getAspectName())));
return items.filter(i -> failedHashCodes.contains(i.hashCode()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
@Getter
@Accessors(chain = true)
public class CreateIfNotExistsValidator extends AspectPayloadValidator {
public static final String FILTER_EXCEPTION_HEADER = "If-None-Match";
public static final String FILTER_EXCEPTION_VALUE = "*";

@Nonnull private AspectPluginConfig config;

Expand All @@ -49,22 +51,34 @@ protected Stream<AspectValidationException> validatePreCommitAspects(
.filter(item -> ChangeType.CREATE_ENTITY.equals(item.getChangeType()))
.collect(Collectors.toSet())) {
// if the key aspect is missing in the batch, the entity exists and CREATE_ENTITY should be
// denied
// denied or dropped
if (!entityKeyMap.containsKey(createEntityItem.getUrn())) {
exceptions.addException(
createEntityItem,
"Cannot perform CREATE_ENTITY if not exists since the entity key already exists.");
if (isPrecondition(createEntityItem)) {
exceptions.addException(
AspectValidationException.forFilter(
createEntityItem, "Dropping write per precondition header If-None-Match: *"));
} else {
exceptions.addException(
createEntityItem,
"Cannot perform CREATE_ENTITY if not exists since the entity key already exists.");
}
}
}

for (ChangeMCP createItem :
changeMCPs.stream()
.filter(item -> ChangeType.CREATE.equals(item.getChangeType()))
.collect(Collectors.toSet())) {
// if a CREATE item has a previous value, should be denied
// if a CREATE item has a previous value, should be denied or dropped
if (createItem.getPreviousRecordTemplate() != null) {
exceptions.addException(
createItem, "Cannot perform CREATE since the aspect already exists.");
if (isPrecondition(createItem)) {
exceptions.addException(
AspectValidationException.forFilter(
createItem, "Dropping write per precondition header If-None-Match: *"));
} else {
exceptions.addException(
createItem, "Cannot perform CREATE since the aspect already exists.");
}
}
}

Expand All @@ -77,4 +91,10 @@ protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
}

private static boolean isPrecondition(ChangeMCP item) {
return item.getHeader(FILTER_EXCEPTION_HEADER)
.map(FILTER_EXCEPTION_VALUE::equals)
.orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -140,7 +141,7 @@ public Map<String, String> getHeaders() {
mcp ->
mcp.getHeaders().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(headers);
.orElse(headers != null ? headers : Collections.emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataplatform.DataPlatformInfo;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
Expand Down Expand Up @@ -98,7 +100,8 @@ public static List<MCPItem> getAdditionalChanges(
.filter(item -> SUPPORTED_TYPES.contains(item.getChangeType()))
.collect(Collectors.groupingBy(BatchItem::getUrn));

Set<Urn> urnsWithExistingKeyAspects = entityService.exists(opContext, itemsByUrn.keySet());
Set<Urn> urnsWithExistingKeyAspects =
entityService.exists(opContext, itemsByUrn.keySet(), true, true);

// create default aspects when key aspect is missing
return itemsByUrn.entrySet().stream()
Expand Down Expand Up @@ -126,7 +129,7 @@ public static List<MCPItem> getAdditionalChanges(
// pick the first item as a template (use entity information)
MCPItem templateItem = aspectsEntry.getValue().get(0);

// generate default aspects (including key aspect, always upserts)
// generate default aspects (including key aspect)
return defaultAspects.stream()
.map(
entry ->
Expand Down Expand Up @@ -215,7 +218,7 @@ private static List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissin
if (!fetchAspects.isEmpty()) {

Set<String> latestAspects =
entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects).keySet();
entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects, true).keySet();

return fetchAspects.stream()
.filter(aspectName -> !latestAspects.contains(aspectName))
Expand Down Expand Up @@ -347,6 +350,11 @@ public static MetadataChangeProposal getProposalFromAspectForDefault(
proposal.setAspectName(aspectName);
// already checked existence, default aspects should be changeType CREATE
proposal.setChangeType(ChangeType.CREATE);
proposal.setHeaders(
new StringMap(
Map.of(
CreateIfNotExistsValidator.FILTER_EXCEPTION_HEADER,
CreateIfNotExistsValidator.FILTER_EXCEPTION_VALUE)));

// Set fields determined from original
if (templateItem.getSystemMetadata() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ EntityAspect getAspect(

@Nonnull
Map<EntityAspectIdentifier, EntityAspect> batchGet(
@Nonnull final Set<EntityAspectIdentifier> keys);
@Nonnull final Set<EntityAspectIdentifier> keys, boolean forUpdate);

@Nonnull
List<EntityAspect> getAspectsInRange(
Expand Down
Loading

0 comments on commit ab15fb9

Please sign in to comment.