Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mysql): index gap lock deadlock #12119

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ Another form of conditional writes which considers the existence of an aspect or

`CREATE_ENTITY` - Create the aspect if no aspects exist for the entity.

By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation
should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP.

Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,39 @@ public static AspectValidationException forItem(BatchItem item, String msg) {
}

public static AspectValidationException forItem(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.VALIDATION, e);
return new AspectValidationException(item, msg, SubType.VALIDATION, e);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg) {
return forPrecondition(item, msg, null);
}

public static AspectValidationException forFilter(BatchItem item, String msg) {
return new AspectValidationException(item, msg, SubType.FILTER);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.PRECONDITION, e);
return new AspectValidationException(item, msg, SubType.PRECONDITION, e);
}

@Nonnull BatchItem item;
@Nonnull ChangeType changeType;
@Nonnull Urn entityUrn;
@Nonnull String aspectName;
@Nonnull SubType subType;
@Nullable String msg;

public AspectValidationException(
@Nonnull ChangeType changeType,
@Nonnull Urn entityUrn,
@Nonnull String aspectName,
String msg,
SubType subType) {
this(changeType, entityUrn, aspectName, msg, subType, null);
public AspectValidationException(@Nonnull BatchItem item, String msg, SubType subType) {
this(item, msg, subType, null);
}

public AspectValidationException(
@Nonnull ChangeType changeType,
@Nonnull Urn entityUrn,
@Nonnull String aspectName,
@Nonnull String msg,
@Nullable SubType subType,
Exception e) {
@Nonnull BatchItem item, @Nonnull String msg, @Nullable SubType subType, Exception e) {
super(msg, e);
this.changeType = changeType;
this.entityUrn = entityUrn;
this.aspectName = aspectName;
this.item = item;
this.changeType = item.getChangeType();
this.entityUrn = item.getUrn();
this.aspectName = item.getAspectName();
this.msg = msg;
this.subType = subType != null ? subType : SubType.VALIDATION;
}
Expand All @@ -65,8 +59,12 @@ public Pair<Urn, String> getAspectGroup() {
return Pair.of(entityUrn, aspectName);
}

public static enum SubType {
public enum SubType {
// A validation exception is thrown
VALIDATION,
PRECONDITION
// A failed precondition is thrown if the header constraints are not met
PRECONDITION,
// Exclude from processing further
FILTER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,38 @@
public class ValidationExceptionCollection
extends HashMap<Pair<Urn, String>, Set<AspectValidationException>> {

private final Set<Integer> failedHashCodes;
private final Set<Integer> filteredHashCodes;

public ValidationExceptionCollection() {
super();
this.failedHashCodes = new HashSet<>();
this.filteredHashCodes = new HashSet<>();
}

public boolean hasFatalExceptions() {
return !failedHashCodes.isEmpty();
}

public static ValidationExceptionCollection newCollection() {
return new ValidationExceptionCollection();
}

public void addException(AspectValidationException exception) {
super.computeIfAbsent(exception.getAspectGroup(), key -> new HashSet<>()).add(exception);
if (!AspectValidationException.SubType.FILTER.equals(exception.getSubType())) {
failedHashCodes.add(exception.getItem().hashCode());
} else {
filteredHashCodes.add(exception.getItem().hashCode());
}
}

public void addException(BatchItem item, String message) {
addException(item, message, null);
}

public void addException(BatchItem item, String message, Exception ex) {
super.computeIfAbsent(Pair.of(item.getUrn(), item.getAspectName()), key -> new HashSet<>())
.add(AspectValidationException.forItem(item, message, ex));
addException(AspectValidationException.forItem(item, message, ex));
}

public Stream<AspectValidationException> streamAllExceptions() {
Expand All @@ -41,15 +58,16 @@ public <T extends BatchItem> Collection<T> successful(Collection<T> items) {
}

public <T extends BatchItem> Stream<T> streamSuccessful(Stream<T> items) {
return items.filter(i -> !this.containsKey(Pair.of(i.getUrn(), i.getAspectName())));
return items.filter(
i -> !failedHashCodes.contains(i.hashCode()) && !filteredHashCodes.contains(i.hashCode()));
}

public <T extends BatchItem> Collection<T> exceptions(Collection<T> items) {
return streamExceptions(items.stream()).collect(Collectors.toList());
}

public <T extends BatchItem> Stream<T> streamExceptions(Stream<T> items) {
return items.filter(i -> this.containsKey(Pair.of(i.getUrn(), i.getAspectName())));
return items.filter(i -> failedHashCodes.contains(i.hashCode()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
@Getter
@Accessors(chain = true)
public class CreateIfNotExistsValidator extends AspectPayloadValidator {
public static final String FILTER_EXCEPTION_HEADER = "If-None-Match";
public static final String FILTER_EXCEPTION_VALUE = "*";

@Nonnull private AspectPluginConfig config;

Expand All @@ -49,22 +51,34 @@ protected Stream<AspectValidationException> validatePreCommitAspects(
.filter(item -> ChangeType.CREATE_ENTITY.equals(item.getChangeType()))
.collect(Collectors.toSet())) {
// if the key aspect is missing in the batch, the entity exists and CREATE_ENTITY should be
// denied
// denied or dropped
if (!entityKeyMap.containsKey(createEntityItem.getUrn())) {
exceptions.addException(
createEntityItem,
"Cannot perform CREATE_ENTITY if not exists since the entity key already exists.");
if (isPrecondition(createEntityItem)) {
exceptions.addException(
AspectValidationException.forFilter(
createEntityItem, "Dropping write per precondition header If-None-Match: *"));
} else {
exceptions.addException(
createEntityItem,
"Cannot perform CREATE_ENTITY if not exists since the entity key already exists.");
}
}
}

for (ChangeMCP createItem :
changeMCPs.stream()
.filter(item -> ChangeType.CREATE.equals(item.getChangeType()))
.collect(Collectors.toSet())) {
// if a CREATE item has a previous value, should be denied
// if a CREATE item has a previous value, should be denied or dropped
if (createItem.getPreviousRecordTemplate() != null) {
exceptions.addException(
createItem, "Cannot perform CREATE since the aspect already exists.");
if (isPrecondition(createItem)) {
exceptions.addException(
AspectValidationException.forFilter(
createItem, "Dropping write per precondition header If-None-Match: *"));
} else {
exceptions.addException(
createItem, "Cannot perform CREATE since the aspect already exists.");
}
}
}

Expand All @@ -77,4 +91,10 @@ protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
}

private static boolean isPrecondition(ChangeMCP item) {
return item.getHeader(FILTER_EXCEPTION_HEADER)
.map(FILTER_EXCEPTION_VALUE::equals)
.orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -140,7 +141,7 @@ public Map<String, String> getHeaders() {
mcp ->
mcp.getHeaders().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(headers);
.orElse(headers != null ? headers : Collections.emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataplatform.DataPlatformInfo;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator;
import com.linkedin.metadata.entity.EntityApiUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
Expand Down Expand Up @@ -98,7 +100,8 @@ public static List<MCPItem> getAdditionalChanges(
.filter(item -> SUPPORTED_TYPES.contains(item.getChangeType()))
.collect(Collectors.groupingBy(BatchItem::getUrn));

Set<Urn> urnsWithExistingKeyAspects = entityService.exists(opContext, itemsByUrn.keySet());
Set<Urn> urnsWithExistingKeyAspects =
entityService.exists(opContext, itemsByUrn.keySet(), true, true);

// create default aspects when key aspect is missing
return itemsByUrn.entrySet().stream()
Expand Down Expand Up @@ -126,7 +129,7 @@ public static List<MCPItem> getAdditionalChanges(
// pick the first item as a template (use entity information)
MCPItem templateItem = aspectsEntry.getValue().get(0);

// generate default aspects (including key aspect, always upserts)
// generate default aspects (including key aspect)
return defaultAspects.stream()
.map(
entry ->
Expand Down Expand Up @@ -215,7 +218,7 @@ private static List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissin
if (!fetchAspects.isEmpty()) {

Set<String> latestAspects =
entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects).keySet();
entityService.getLatestAspectsForUrn(opContext, urn, fetchAspects, true).keySet();

return fetchAspects.stream()
.filter(aspectName -> !latestAspects.contains(aspectName))
Expand Down Expand Up @@ -347,6 +350,11 @@ public static MetadataChangeProposal getProposalFromAspectForDefault(
proposal.setAspectName(aspectName);
// already checked existence, default aspects should be changeType CREATE
proposal.setChangeType(ChangeType.CREATE);
proposal.setHeaders(
new StringMap(
Map.of(
CreateIfNotExistsValidator.FILTER_EXCEPTION_HEADER,
CreateIfNotExistsValidator.FILTER_EXCEPTION_VALUE)));

// Set fields determined from original
if (templateItem.getSystemMetadata() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ EntityAspect getAspect(

@Nonnull
Map<EntityAspectIdentifier, EntityAspect> batchGet(
@Nonnull final Set<EntityAspectIdentifier> keys);
@Nonnull final Set<EntityAspectIdentifier> keys, boolean forUpdate);

@Nonnull
List<EntityAspect> getAspectsInRange(
Expand Down
Loading
Loading