Skip to content

Commit

Permalink
feat(entityVersioning): conditional writes and review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien committed Dec 18, 2024
1 parent bfa2bde commit 0d49b56
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import static com.linkedin.metadata.Constants.INITIAL_VERSION_SORT_ID;
import static com.linkedin.metadata.Constants.VERSION_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.VERSION_SET_KEY_ASPECT_NAME;
import static com.linkedin.metadata.Constants.VERSION_SET_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.VERSION_SORT_ID_FIELD_NAME;
import static com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.HTTP_HEADER_IF_VERSION_MATCH;

import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableList;
Expand All @@ -18,9 +18,10 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.Aspect;
import com.linkedin.data.template.StringMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.RollbackResult;
Expand Down Expand Up @@ -75,48 +76,52 @@ public List<IngestResult> linkLatestVersion(
List<MetadataChangeProposal> proposals = new ArrayList<>();
AspectRetriever aspectRetriever = opContext.getAspectRetriever();
String sortId;
Long versionSetConstraint;
Long versionPropertiesConstraint;
VersionSetKey versionSetKey =
(VersionSetKey)
EntityKeyUtils.convertUrnToEntityKey(
versionSet, opContext.getEntityRegistryContext().getKeyAspectSpec(versionSet));
if (!versionSetKey.getEntityType().equals(newLatestVersion.getEntityType())) {
throw new IllegalArgumentException(
"Entity type must match Version Set's specified type: "
+ versionSetKey.getEntityType()
+ " invalid type: "
+ newLatestVersion.getEntityType());
}
if (!aspectRetriever.entityExists(ImmutableSet.of(versionSet)).get(versionSet)) {
VersionSetKey versionSetKey =
(VersionSetKey)
EntityKeyUtils.convertUrnToEntityKey(
versionSet, opContext.getEntityRegistryContext().getKeyAspectSpec(versionSet));
if (!versionSetKey.getEntityType().equals(newLatestVersion.getEntityType())) {
throw new IllegalArgumentException(
"Entity type must match Version Set's specified type: "
+ versionSetKey.getEntityType()
+ " invalid type: "
+ newLatestVersion.getEntityType());
}
MetadataChangeProposal versionSetKeyProposal = new MetadataChangeProposal();
versionSetKeyProposal.setEntityUrn(versionSet);
versionSetKeyProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetKeyProposal.setAspectName(VERSION_SET_KEY_ASPECT_NAME);
versionSetKeyProposal.setAspect(GenericRecordUtils.serializeAspect(versionSetKey));
versionSetKeyProposal.setChangeType(ChangeType.CREATE_ENTITY);
proposals.add(versionSetKeyProposal);
sortId = INITIAL_VERSION_SORT_ID;
versionSetConstraint = -1L;
versionPropertiesConstraint = -1L;
} else {
Aspect versionSetPropertiesAspect =
aspectRetriever.getLatestAspectObject(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME);
SystemAspect versionSetPropertiesAspect =
aspectRetriever.getLatestSystemAspect(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME);
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(
VersionSetProperties.class, versionSetPropertiesAspect.data());
Aspect latestVersion =
aspectRetriever.getLatestAspectObject(
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
versionSetConstraint =
versionSetPropertiesAspect
.getSystemMetadataVersion()
.orElse(versionSetPropertiesAspect.getVersion());
SystemAspect latestVersion =
aspectRetriever.getLatestSystemAspect(
versionSetProperties.getLatest(), VERSION_PROPERTIES_ASPECT_NAME);
VersionProperties latestVersionProperties =
RecordUtils.toRecordTemplate(VersionProperties.class, latestVersion.data());
RecordUtils.toRecordTemplate(
VersionProperties.class, latestVersion.getRecordTemplate().data());
versionPropertiesConstraint =
latestVersion.getSystemMetadataVersion().orElse(latestVersion.getVersion());
// When more impls for versioning scheme are set up, this will need to be resolved to the
// correct scheme generation strategy
sortId = AlphanumericSortIdGenerator.increment(latestVersionProperties.getSortId());
}

Aspect currentVersionPropertiesAspect =
aspectRetriever.getLatestAspectObject(newLatestVersion, VERSION_PROPERTIES_ASPECT_NAME);
SystemAspect currentVersionPropertiesAspect =
aspectRetriever.getLatestSystemAspect(newLatestVersion, VERSION_PROPERTIES_ASPECT_NAME);
if (currentVersionPropertiesAspect != null) {
VersionProperties currentVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class, currentVersionPropertiesAspect.data());
VersionProperties.class, currentVersionPropertiesAspect.getRecordTemplate().data());
if (currentVersionProperties.getVersionSet().equals(versionSet)) {
return new ArrayList<>();
} else {
Expand Down Expand Up @@ -157,7 +162,9 @@ public List<IngestResult> linkLatestVersion(
versionPropertiesProposal.setEntityType(newLatestVersion.getEntityType());
versionPropertiesProposal.setAspectName(VERSION_PROPERTIES_ASPECT_NAME);
versionPropertiesProposal.setAspect(GenericRecordUtils.serializeAspect(versionProperties));
// Error if properties already exist
versionPropertiesProposal.setChangeType(ChangeType.UPSERT);
StringMap headerMap = new StringMap();
headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionPropertiesConstraint.toString());
versionPropertiesProposal.setChangeType(ChangeType.UPSERT);
proposals.add(versionPropertiesProposal);

Expand All @@ -178,6 +185,9 @@ public List<IngestResult> linkLatestVersion(
versionSetPropertiesProposal.setAspect(
GenericRecordUtils.serializeAspect(versionSetProperties));
versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT);
StringMap versionSetHeaderMap = new StringMap();
versionSetHeaderMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionSetConstraint.toString());
versionSetPropertiesProposal.setHeaders(versionSetHeaderMap);
proposals.add(versionSetPropertiesProposal);

return entityService.ingestProposal(
Expand All @@ -201,14 +211,15 @@ public List<RollbackResult> unlinkVersion(
OperationContext opContext, Urn versionSet, Urn linkedVersion) {
List<RollbackResult> deletedAspects = new ArrayList<>();
AspectRetriever aspectRetriever = opContext.getAspectRetriever();
Aspect linkedVersionPropertiesAspect =
aspectRetriever.getLatestAspectObject(linkedVersion, VERSION_PROPERTIES_ASPECT_NAME);
SystemAspect linkedVersionPropertiesAspect =
aspectRetriever.getLatestSystemAspect(linkedVersion, VERSION_PROPERTIES_ASPECT_NAME);
// Not currently versioned, do nothing
if (linkedVersionPropertiesAspect == null) {
return deletedAspects;
}
VersionProperties linkedVersionProperties =
RecordUtils.toRecordTemplate(VersionProperties.class, linkedVersionPropertiesAspect.data());
RecordUtils.toRecordTemplate(
VersionProperties.class, linkedVersionPropertiesAspect.getRecordTemplate().data());
Urn versionSetUrn = linkedVersionProperties.getVersionSet();
if (!versionSet.equals(versionSetUrn)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -250,15 +261,20 @@ public List<RollbackResult> unlinkVersion(
String updatedLatestVersionUrn = null;

SearchEntityArray linkedEntities = linkedVersions.getEntities();
Aspect versionSetPropertiesAspect =
aspectRetriever.getLatestAspectObject(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME);
SystemAspect versionSetPropertiesAspect =
aspectRetriever.getLatestSystemAspect(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME);
if (versionSetPropertiesAspect == null) {
throw new IllegalStateException(
String.format(
"Version Set Properties must exist if entity version exists: %s", versionSetUrn));
}
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(VersionSetProperties.class, versionSetPropertiesAspect.data());
RecordUtils.toRecordTemplate(
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
long versionConstraint =
versionSetPropertiesAspect
.getSystemMetadataVersion()
.orElse(versionSetPropertiesAspect.getVersion());
boolean isLatest = linkedVersion.equals(versionSetProperties.getLatest());

if (linkedEntities.size() == 2 && isLatest) {
Expand All @@ -284,6 +300,7 @@ public List<RollbackResult> unlinkVersion(
updatedLatestVersionUrn = maybePriorLatestVersion.getEntity().toString();
} else {
// Delete Version Set if we are removing the last version
// TODO: Conditional deletes impl + only do the delete if version match
RollbackRunResult deleteResult = entityService.deleteUrn(opContext, versionSetUrn);
deletedAspects.addAll(deleteResult.getRollbackResults());
}
Expand All @@ -309,6 +326,9 @@ public List<RollbackResult> unlinkVersion(
versionSetPropertiesProposal.setAspect(
GenericRecordUtils.serializeAspect(newVersionSetProperties));
versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT);
StringMap headerMap = new StringMap();
headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, Long.toString(versionConstraint));
versionSetPropertiesProposal.setHeaders(headerMap);
entityService.ingestProposal(
opContext,
AspectsBatchImpl.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -35,12 +37,19 @@ public ScrollResult scroll(
@Nullable String scrollId,
int count,
List<SortCriterion> sortCriteria) {
List<SortCriterion> finalCriteria = new ArrayList<>(sortCriteria);
if (sortCriteria.stream().noneMatch(sortCriterion -> "urn".equals(sortCriterion.getField()))) {
SortCriterion urnSort = new SortCriterion();
urnSort.setField("urn");
urnSort.setOrder(SortOrder.ASCENDING);
finalCriteria.add(urnSort);
}
return searchService.scrollAcrossEntities(
systemOperationContext.withSearchFlags(flags -> RETRIEVER_SEARCH_FLAGS),
entities,
"*",
filters,
sortCriteria,
finalCriteria,
scrollId,
null,
count);
Expand Down
Loading

0 comments on commit 0d49b56

Please sign in to comment.