Skip to content

Commit

Permalink
feat(entityVersioning): initial implementation (#12166)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Jan 14, 2025
1 parent 90fe5b6 commit 94b9da0
Show file tree
Hide file tree
Showing 68 changed files with 4,063 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
import com.linkedin.datahub.graphql.resolvers.embed.UpdateEmbedResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityExistsResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.LinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.UnlinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchAssignFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchRemoveFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.CreateDynamicFormAssignmentResolver;
Expand Down Expand Up @@ -391,6 +393,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -476,6 +479,7 @@ public class GmsGraphQLEngine {
private final RestrictedService restrictedService;
private ConnectionService connectionService;
private AssertionService assertionService;
private final EntityVersioningService entityVersioningService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -599,6 +603,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;
this.assertionService = args.assertionService;
this.entityVersioningService = args.entityVersioningService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -1392,6 +1397,16 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
"removeBusinessAttribute",
new RemoveBusinessAttributeResolver(this.entityService));
}
if (featureFlags.isEntityVersioning()) {
typeWiring
.dataFetcher(
"linkAssetVersion",
new LinkAssetVersionResolver(this.entityVersioningService, this.featureFlags))
.dataFetcher(
"unlinkAssetVersion",
new UnlinkAssetVersionResolver(
this.entityVersioningService, this.featureFlags));
}
return typeWiring;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GmsGraphQLEngineArgs {
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;
AssertionService assertionService;
EntityVersioningService entityVersioningService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.StringUtils;

/**
* Currently only supports linking the latest version, but may be modified later to support inserts
*/
public class LinkAssetVersionResolver implements DataFetcher<CompletableFuture<String>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public LinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<String> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final LinkVersionInput input =
bindArgument(environment.getArgument("input"), LinkVersionInput.class);
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getLinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor().toUrnStr(),
UPDATE,
input.getVersionSet(),
input.getLinkedEntity()));
}
VersionPropertiesInput versionPropertiesInput =
new VersionPropertiesInput(
input.getComment(),
input.getVersion(),
input.getSourceTimestamp(),
input.getSourceCreator());
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
List<IngestResult> linkResults =
entityVersioningService.linkLatestVersion(
opContext, versionSetUrn, entityUrn, versionPropertiesInput);

return linkResults.stream()
.filter(
ingestResult -> input.getLinkedEntity().equals(ingestResult.getUrn().toString()))
.map(ingestResult -> ingestResult.getUrn().toString())
.findAny()
.orElse(StringUtils.EMPTY);
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.UnlinkVersionInput;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.concurrent.CompletableFuture;

public class UnlinkAssetVersionResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public UnlinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
final QueryContext context = environment.getContext();
final UnlinkVersionInput input =
bindArgument(environment.getArgument("input"), UnlinkVersionInput.class);
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getUnlinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor(),
UPDATE,
input.getVersionSet(),
input.getUnlinkedEntity()));
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
entityVersioningService.unlinkVersion(opContext, versionSetUrn, entityUrn);
return true;
},
this.getClass().getSimpleName(),
"get");
}
}
60 changes: 60 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,16 @@ type Mutation {
Remove Business Attribute
"""
removeBusinessAttribute(input: AddBusinessAttributeInput!): Boolean

"""
Link the latest versioned entity to a Version Set
"""
linkAssetVersion(input: LinkVersionInput!): String

"""
Unlink a versioned entity from a Version Set
"""
unlinkAssetVersion(input: UnlinkVersionInput!): Boolean
}

"""
Expand Down Expand Up @@ -12911,6 +12921,56 @@ input ListBusinessAttributesInput {
query: String
}

"""
Input for linking a versioned entity to a Version Set
"""
input LinkVersionInput {
"""
The target version set
"""
versionSet: String!

"""
The target versioned entity to link
"""
linkedEntity: String!

"""
Version Tag label for the version, should be unique within a Version Set
"""
version: String!

"""
Optional timestamp from the source system
"""
sourceTimestamp: Long

"""
Optional creator from the source system, will be converted to an Urn
"""
sourceCreator: String

"""
Optional comment about the version
"""
comment: String
}

"""
Input for unlinking a versioned entity from a Version Set
"""
input UnlinkVersionInput {
"""
The target version set
"""
versionSet: String

"""
The target versioned entity to unlink
"""
unlinkedEntity: String
}

"""
The result obtained when listing Business Attribute
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.TestUtils.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.testng.Assert.*;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
import graphql.schema.DataFetchingEnvironment;
import org.mockito.Mockito;
import org.testng.annotations.Test;

public class LinkAssetVersionResolverTest {

private static final String TEST_VERSION_SET_URN = "urn:li:versionSet:test-version-set";
private static final String TEST_ENTITY_URN =
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-test,PROD)";

@Test
public void testGetSuccessful() throws Exception {
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);

Mockito.when(mockFlags.isEntityVersioning()).thenReturn(true);

IngestResult mockResult =
IngestResult.builder().urn(Urn.createFromString(TEST_ENTITY_URN)).build();

Mockito.when(
mockService.linkLatestVersion(
any(),
eq(UrnUtils.getUrn(TEST_VERSION_SET_URN)),
eq(UrnUtils.getUrn(TEST_ENTITY_URN)),
any(VersionPropertiesInput.class)))
.thenReturn(ImmutableList.of(mockResult));

LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
LinkVersionInput input = new LinkVersionInput();
input.setVersionSet(TEST_VERSION_SET_URN);
input.setLinkedEntity(TEST_ENTITY_URN);
input.setComment("Test comment");
input.setVersion("v1");

Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

String result = resolver.get(mockEnv).get();
assertEquals(result, TEST_ENTITY_URN);
}

@Test
public void testGetFeatureFlagDisabled() throws Exception {
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);

Mockito.when(mockFlags.isEntityVersioning()).thenReturn(false);

LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);

// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
LinkVersionInput input = new LinkVersionInput();
input.setVersionSet(TEST_VERSION_SET_URN);
input.setLinkedEntity(TEST_ENTITY_URN);

Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);

assertThrows(IllegalAccessError.class, () -> resolver.get(mockEnv));
}

@Test
public void testGetInvalidVersionSetUrn() throws Exception {
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);

Mockito.when(mockFlags.isEntityVersioning()).thenReturn(true);

LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);

// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
LinkVersionInput input = new LinkVersionInput();
input.setVersionSet("urn:li:dataset:invalid-version-set"); // Invalid URN type
input.setLinkedEntity(TEST_ENTITY_URN);

Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);

assertThrows(IllegalArgumentException.class, () -> resolver.get(mockEnv));
}
}
Loading

0 comments on commit 94b9da0

Please sign in to comment.