Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(entityVersioning): initial implementation #12166

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
import com.linkedin.datahub.graphql.resolvers.embed.UpdateEmbedResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityExistsResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.LinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.UnlinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchAssignFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchRemoveFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.CreateDynamicFormAssignmentResolver;
Expand Down Expand Up @@ -391,6 +393,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -476,6 +479,7 @@ public class GmsGraphQLEngine {
private final RestrictedService restrictedService;
private ConnectionService connectionService;
private AssertionService assertionService;
private final EntityVersioningService entityVersioningService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -599,6 +603,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;
this.assertionService = args.assertionService;
this.entityVersioningService = args.entityVersioningService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -1392,6 +1397,16 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
"removeBusinessAttribute",
new RemoveBusinessAttributeResolver(this.entityService));
}
if (featureFlags.isEntityVersioning()) {
typeWiring
.dataFetcher(
"linkAssetVersion",
new LinkAssetVersionResolver(this.entityVersioningService, this.featureFlags))
.dataFetcher(
"unlinkAssetVersion",
new UnlinkAssetVersionResolver(
this.entityVersioningService, this.featureFlags));
}
return typeWiring;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GmsGraphQLEngineArgs {
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;
AssertionService assertionService;
EntityVersioningService entityVersioningService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
RyanHolstien marked this conversation as resolved.
Show resolved Hide resolved
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.StringUtils;

/**
* Currently only supports linking the latest version, but may be modified later to support inserts
*/
public class LinkAssetVersionResolver implements DataFetcher<CompletableFuture<String>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public LinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<String> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final LinkVersionInput input =
bindArgument(environment.getArgument("input"), LinkVersionInput.class);
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getLinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor().toUrnStr(),
UPDATE,
input.getVersionSet(),
input.getLinkedEntity()));
}
VersionPropertiesInput versionPropertiesInput =
new VersionPropertiesInput(
input.getComment(),
input.getVersion(),
input.getSourceTimestamp(),
input.getSourceCreator());
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
List<IngestResult> linkResults =
entityVersioningService.linkLatestVersion(
opContext, versionSetUrn, entityUrn, versionPropertiesInput);

return linkResults.stream()
.filter(
ingestResult -> input.getLinkedEntity().equals(ingestResult.getUrn().toString()))
.map(ingestResult -> ingestResult.getUrn().toString())
.findAny()
.orElse(StringUtils.EMPTY);
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.UnlinkVersionInput;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.concurrent.CompletableFuture;

public class UnlinkAssetVersionResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public UnlinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
final QueryContext context = environment.getContext();
final UnlinkVersionInput input =
bindArgument(environment.getArgument("input"), UnlinkVersionInput.class);
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getUnlinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor(),
UPDATE,
input.getVersionSet(),
input.getUnlinkedEntity()));
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
entityVersioningService.unlinkVersion(opContext, versionSetUrn, entityUrn);
return true;
},
this.getClass().getSimpleName(),
"get");
}
}
60 changes: 60 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,16 @@ type Mutation {
Remove Business Attribute
"""
removeBusinessAttribute(input: AddBusinessAttributeInput!): Boolean

"""
Link the latest versioned entity to a Version Set
"""
linkAssetVersion(input: LinkVersionInput!): String

"""
Unlink a versioned entity from a Version Set
"""
unlinkAssetVersion(input: UnlinkVersionInput!): Boolean
}

"""
Expand Down Expand Up @@ -12911,6 +12921,56 @@ input ListBusinessAttributesInput {
query: String
}

"""
Input for linking a versioned entity to a Version Set
"""
input LinkVersionInput {
"""
The target version set
"""
versionSet: String!

"""
The target versioned entity to link
"""
linkedEntity: String!

"""
Version Tag label for the version, should be unique within a Version Set
"""
version: String!

"""
Optional timestamp from the source system
"""
sourceTimestamp: Long

"""
Optional creator from the source system, will be converted to an Urn
"""
sourceCreator: String

"""
Optional comment about the version
"""
comment: String
}

"""
Input for unlinking a versioned entity from a Version Set
"""
input UnlinkVersionInput {
"""
The target version set
"""
versionSet: String

"""
The target versioned entity to unlink
"""
unlinkedEntity: String
RyanHolstien marked this conversation as resolved.
Show resolved Hide resolved
}

"""
The result obtained when listing Business Attribute
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.TestUtils.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.testng.Assert.*;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
import graphql.schema.DataFetchingEnvironment;
import org.mockito.Mockito;
import org.testng.annotations.Test;

public class LinkAssetVersionResolverTest {

private static final String TEST_VERSION_SET_URN = "urn:li:versionSet:test-version-set";
private static final String TEST_ENTITY_URN =
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-test,PROD)";

@Test
public void testGetSuccessful() throws Exception {
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);

Mockito.when(mockFlags.isEntityVersioning()).thenReturn(true);

IngestResult mockResult =
IngestResult.builder().urn(Urn.createFromString(TEST_ENTITY_URN)).build();

Mockito.when(
mockService.linkLatestVersion(
any(),
eq(UrnUtils.getUrn(TEST_VERSION_SET_URN)),
eq(UrnUtils.getUrn(TEST_ENTITY_URN)),
any(VersionPropertiesInput.class)))
.thenReturn(ImmutableList.of(mockResult));

LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
LinkVersionInput input = new LinkVersionInput();
input.setVersionSet(TEST_VERSION_SET_URN);
input.setLinkedEntity(TEST_ENTITY_URN);
input.setComment("Test comment");
input.setVersion("v1");

Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

String result = resolver.get(mockEnv).get();
assertEquals(result, TEST_ENTITY_URN);
}

@Test
public void testGetFeatureFlagDisabled() throws Exception {
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);

Mockito.when(mockFlags.isEntityVersioning()).thenReturn(false);

LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);

// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
LinkVersionInput input = new LinkVersionInput();
input.setVersionSet(TEST_VERSION_SET_URN);
input.setLinkedEntity(TEST_ENTITY_URN);

Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);

assertThrows(IllegalAccessError.class, () -> resolver.get(mockEnv));
}

@Test
public void testGetInvalidVersionSetUrn() throws Exception {
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);

Mockito.when(mockFlags.isEntityVersioning()).thenReturn(true);

LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);

// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
LinkVersionInput input = new LinkVersionInput();
input.setVersionSet("urn:li:dataset:invalid-version-set"); // Invalid URN type
input.setLinkedEntity(TEST_ENTITY_URN);

Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);

assertThrows(IllegalArgumentException.class, () -> resolver.get(mockEnv));
}
}
Loading
Loading