Skip to content

Commit

Permalink
Merge branch 'master' into feat(ingestion/neo4j)
Browse files Browse the repository at this point in the history
  • Loading branch information
k-bartlett authored Oct 22, 2024
2 parents 44fad3a + 6c55511 commit bde184b
Show file tree
Hide file tree
Showing 58 changed files with 1,026 additions and 302 deletions.
2 changes: 2 additions & 0 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.util.Configuration;
import config.ConfigurationProvider;
import controllers.SsoCallbackController;
import io.datahubproject.metadata.context.ValidationContext;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

Expand Down Expand Up @@ -187,6 +188,7 @@ protected OperationContext provideOperationContext(
.authorizationContext(AuthorizationContext.builder().authorizer(Authorizer.EMPTY).build())
.searchContext(SearchContext.EMPTY)
.entityRegistryContext(EntityRegistryContext.builder().build(EmptyEntityRegistry.EMPTY))
.validationContext(ValidationContext.builder().alternateValidation(false).build())
.build(systemAuthentication);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.datahubproject.metadata.context.OperationContextConfig;
import io.datahubproject.metadata.context.RetrieverContext;
import io.datahubproject.metadata.context.ServicesRegistryContext;
import io.datahubproject.metadata.context.ValidationContext;
import io.datahubproject.metadata.services.RestrictedService;
import java.util.List;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -161,7 +162,8 @@ protected OperationContext javaSystemOperationContext(
@Nonnull final GraphService graphService,
@Nonnull final SearchService searchService,
@Qualifier("baseElasticSearchComponents")
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) {
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components,
@Nonnull final ConfigurationProvider configurationProvider) {

EntityServiceAspectRetriever entityServiceAspectRetriever =
EntityServiceAspectRetriever.builder()
Expand All @@ -186,6 +188,10 @@ protected OperationContext javaSystemOperationContext(
.aspectRetriever(entityServiceAspectRetriever)
.graphRetriever(systemGraphRetriever)
.searchRetriever(searchServiceSearchRetriever)
.build(),
ValidationContext.builder()
.alternateValidation(
configurationProvider.getFeatureFlags().isAlternateMCPValidation())
.build());

entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);
Expand Down
2 changes: 2 additions & 0 deletions docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ x-datahub-gms-service: &datahub-gms-service
environment: &datahub-gms-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml}
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}
healthcheck:
test: curl -sS --fail http://datahub-gms:${DATAHUB_GMS_PORT:-8080}/health
start_period: 90s
Expand Down Expand Up @@ -182,6 +183,7 @@ x-datahub-mce-consumer-service: &datahub-mce-consumer-service
- ${DATAHUB_LOCAL_MCE_ENV:-empty2.env}
environment: &datahub-mce-consumer-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}

x-datahub-mce-consumer-service-dev: &datahub-mce-consumer-service-dev
<<: *datahub-mce-consumer-service
Expand Down
2 changes: 1 addition & 1 deletion docs-website/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module.exports = {
announcementBar: {
id: "announcement-2",
content:
'<div style="display: flex; justify-content: center; align-items: center;width: 100%;"><!--img src="/img/acryl-logo-white-mark.svg" / --><div style="font-size: .8rem; font-weight: 600; background-color: white; color: #111; padding: 0px 8px; border-radius: 4px; margin-right:12px;">NEW</div><p><span>Join us at Metadata & AI Summit, Oct. 29 & 30!</span></p><a href="http://www.acryldata.io/conference?utm_source=datahub_web&utm_medium=metadata_ai_2024&utm_campaign=home_banner" target="_blank" class="button">Register</a></div>',
'<div style="display: flex; justify-content: center; align-items: center;width: 100%;"><!--img src="/img/acryl-logo-white-mark.svg" / --><div style="font-size: .8rem; font-weight: 600; background-color: white; color: #111; padding: 0px 8px; border-radius: 4px; margin-right:12px;">NEW</div><p>Join us at Metadata & AI Summit, Oct. 29 & 30!</p><a href="http://www.acryldata.io/conference?utm_source=datahub_web&utm_medium=metadata_ai_2024&utm_campaign=home_banner" target="_blank" class="button">Register<span> →</span></a></div>',
backgroundColor: "#111",
textColor: "#ffffff",
isCloseable: false,
Expand Down
29 changes: 21 additions & 8 deletions docs-website/src/styles/global.scss
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,34 @@ div[class^="announcementBar"] {
>div {
display: flex;
align-items: center;
> div {
@media (max-width: 580px) {
display: none;
}
}
a>span {
@media (max-width: 580px) {
display: none;
}
}

>p {
text-align: left;
line-height: 1.1rem;
margin: 0;

>span {
@media (max-width: 780px) {
display: none;
}
}

@media (max-width: 480px) {
display: none;
@media (max-width: 580px) {
font-size: .9rem;
}
// >span {
// @media (max-width: 780px) {
// display: none;
// }
// }

// @media (max-width: 480px) {
// display: none;
// }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@AllArgsConstructor
@EqualsAndHashCode
public abstract class PluginSpec {
protected static String ENTITY_WILDCARD = "*";
protected static String WILDCARD = "*";

@Nonnull
public abstract AspectPluginConfig getConfig();
Expand Down Expand Up @@ -50,7 +50,7 @@ protected boolean isEntityAspectSupported(
return (getConfig().getSupportedEntityAspectNames().stream()
.anyMatch(
supported ->
ENTITY_WILDCARD.equals(supported.getEntityName())
WILDCARD.equals(supported.getEntityName())
|| supported.getEntityName().equals(entityName)))
&& isAspectSupported(aspectName);
}
Expand All @@ -59,13 +59,16 @@ protected boolean isAspectSupported(@Nonnull String aspectName) {
return getConfig().getSupportedEntityAspectNames().stream()
.anyMatch(
supported ->
ENTITY_WILDCARD.equals(supported.getAspectName())
WILDCARD.equals(supported.getAspectName())
|| supported.getAspectName().equals(aspectName));
}

protected boolean isChangeTypeSupported(@Nullable ChangeType changeType) {
return (changeType == null && getConfig().getSupportedOperations().isEmpty())
|| getConfig().getSupportedOperations().stream()
.anyMatch(supported -> supported.equalsIgnoreCase(String.valueOf(changeType)));
.anyMatch(
supported ->
WILDCARD.equals(supported)
|| supported.equalsIgnoreCase(String.valueOf(changeType)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,11 @@ async def _get_flow_run_graph(self, flow_run_id: str) -> Optional[List[Dict]]:
The flow run graph in json format.
"""
try:
response = orchestration.get_client()._client.get(
response_coroutine = orchestration.get_client()._client.get(
f"/flow_runs/{flow_run_id}/graph"
)

if asyncio.iscoroutine(response):
response = await response
response = await response_coroutine

if hasattr(response, "json"):
response_json = response.json()
Expand Down Expand Up @@ -410,10 +409,9 @@ async def get_flow_run(flow_run_id: UUID) -> FlowRun:
if not hasattr(client, "read_flow_run"):
raise ValueError("Client does not support async read_flow_run method")

response = client.read_flow_run(flow_run_id=flow_run_id)
response_coroutine = client.read_flow_run(flow_run_id=flow_run_id)

if asyncio.iscoroutine(response):
response = await response
response = await response_coroutine

return FlowRun.parse_obj(response)

Expand Down Expand Up @@ -477,10 +475,9 @@ async def get_task_run(task_run_id: UUID) -> TaskRun:
if not hasattr(client, "read_task_run"):
raise ValueError("Client does not support async read_task_run method")

response = client.read_task_run(task_run_id=task_run_id)
response_coroutine = client.read_task_run(task_run_id=task_run_id)

if asyncio.iscoroutine(response):
response = await response
response = await response_coroutine

return TaskRun.parse_obj(response)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static class EntitySystemAspect implements SystemAspect {
@Nullable private final RecordTemplate recordTemplate;

@Nonnull private final EntitySpec entitySpec;
@Nonnull private final AspectSpec aspectSpec;
@Nullable private final AspectSpec aspectSpec;

@Nonnull
public String getUrnRaw() {
Expand Down Expand Up @@ -151,15 +152,19 @@ private EntityAspect.EntitySystemAspect build() {

public EntityAspect.EntitySystemAspect build(
@Nonnull EntitySpec entitySpec,
@Nonnull AspectSpec aspectSpec,
@Nullable AspectSpec aspectSpec,
@Nonnull EntityAspect entityAspect) {
this.entityAspect = entityAspect;
this.urn = UrnUtils.getUrn(entityAspect.getUrn());
this.aspectSpec = aspectSpec;
if (entityAspect.getMetadata() != null) {
this.recordTemplate =
RecordUtils.toRecordTemplate(
aspectSpec.getDataTemplateClass(), entityAspect.getMetadata());
(Class<? extends RecordTemplate>)
(aspectSpec == null
? GenericAspect.class
: aspectSpec.getDataTemplateClass()),
entityAspect.getMetadata());
}

return new EntitySystemAspect(entityAspect, urn, recordTemplate, entitySpec, aspectSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
Expand All @@ -11,6 +12,7 @@
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.Collection;
Expand Down Expand Up @@ -114,39 +116,33 @@ private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem
proposedItem.getChangeType(),
proposedItem.getUrn(),
proposedItem.getAspectName())))
.map(
mcpItem -> {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever().getEntityRegistry());
}
return ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever());
});
.map(mcpItem -> patchDiscriminator(mcpItem, retrieverContext.getAspectRetriever()));
List<MCPItem> mutatedItems =
applyProposalMutationHooks(proposedItems, retrieverContext).collect(Collectors.toList());
Stream<? extends BatchItem> proposedItemsToChangeItems =
mutatedItems.stream()
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
// Filter on proposed items again to avoid applying builder to Patch Item side effects
.filter(mcpItem -> mcpItem instanceof ProposedItem)
.map(
mcpItem ->
ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever()));
.map(mcpItem -> patchDiscriminator(mcpItem, retrieverContext.getAspectRetriever()));
Stream<? extends BatchItem> sideEffectItems =
mutatedItems.stream().filter(mcpItem -> !(mcpItem instanceof ProposedItem));
Stream<? extends BatchItem> combinedChangeItems =
Stream.concat(proposedItemsToChangeItems, unmutatedItems);
return Stream.concat(combinedChangeItems, sideEffectItems);
}

private static BatchItem patchDiscriminator(MCPItem mcpItem, AspectRetriever aspectRetriever) {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
}
return ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(), mcpItem.getAuditStamp(), aspectRetriever);
}

public static class AspectsBatchImplBuilder {
/**
* Just one aspect record template
Expand All @@ -164,13 +160,33 @@ public AspectsBatchImplBuilder mcps(
Collection<MetadataChangeProposal> mcps,
AuditStamp auditStamp,
RetrieverContext retrieverContext) {
return mcps(mcps, auditStamp, retrieverContext, false);
}

public AspectsBatchImplBuilder mcps(
Collection<MetadataChangeProposal> mcps,
AuditStamp auditStamp,
RetrieverContext retrieverContext,
boolean alternateMCPValidation) {

retrieverContext(retrieverContext);
items(
mcps.stream()
.map(
mcp -> {
try {
if (alternateMCPValidation) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(mcp.getEntityType());
return ProposedItem.builder()
.metadataChangeProposal(mcp)
.entitySpec(entitySpec)
.auditStamp(auditStamp)
.build();
}
if (mcp.getChangeType().equals(ChangeType.PATCH)) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public static PatchItemImpl build(
.build(entityRegistry);
}

private static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
JsonNode json;
try {
return Json.createPatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
Expand Down Expand Up @@ -83,4 +85,18 @@ public SystemMetadata getSystemMetadata() {
public ChangeType getChangeType() {
return metadataChangeProposal.getChangeType();
}

public static class ProposedItemBuilder {
public ProposedItem build() {
// Ensure systemMetadata
return new ProposedItem(
Objects.requireNonNull(this.metadataChangeProposal)
.setSystemMetadata(
SystemMetadataUtils.generateSystemMetadataIfEmpty(
this.metadataChangeProposal.getSystemMetadata())),
this.auditStamp,
this.entitySpec,
this.aspectSpec);
}
}
}
Loading

0 comments on commit bde184b

Please sign in to comment.