Skip to content

Commit

Permalink
Add initial implementation for integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Jun 2, 2024
1 parent a0e51dc commit 5c801b9
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.netflix.metacat.common.server.usermetadata.TagService;
import com.netflix.metacat.common.server.usermetadata.UserMetadataService;
import com.netflix.metacat.common.server.util.ThreadServiceManager;
import com.netflix.metacat.common.server.usermetadata.ParentChildRelMetadataService;
import com.netflix.metacat.main.manager.CatalogManager;
import com.netflix.metacat.main.manager.ConnectorManager;
import com.netflix.metacat.main.manager.PluginManager;
Expand Down Expand Up @@ -228,6 +229,7 @@ public DatabaseService databaseService(
* @param converterUtil converter utilities
* @param authorizationService authorization Service
* @param ownerValidationService owner validation service
* @param parentChildRelMetadataService parentChildRelMetadataService
*
* @return The table service bean
*/
Expand All @@ -244,7 +246,8 @@ public TableService tableService(
final Config config,
final ConverterUtil converterUtil,
final AuthorizationService authorizationService,
final OwnerValidationService ownerValidationService) {
final OwnerValidationService ownerValidationService,
final ParentChildRelMetadataService parentChildRelMetadataService) {
return new TableServiceImpl(
connectorManager,
connectorTableServiceProxy,
Expand All @@ -257,7 +260,8 @@ public TableService tableService(
config,
converterUtil,
authorizationService,
ownerValidationService
ownerValidationService,
parentChildRelMetadataService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package com.netflix.metacat.main.services.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -43,9 +45,12 @@
import com.netflix.metacat.common.server.events.MetacatUpdateTablePostEvent;
import com.netflix.metacat.common.server.events.MetacatUpdateTablePreEvent;
import com.netflix.metacat.common.server.monitoring.Metrics;
import com.netflix.metacat.common.server.model.ChildInfo;
import com.netflix.metacat.common.server.model.ParentInfo;
import com.netflix.metacat.common.server.properties.Config;
import com.netflix.metacat.common.server.spi.MetacatCatalogConfig;
import com.netflix.metacat.common.server.usermetadata.AuthorizationService;
import com.netflix.metacat.common.server.usermetadata.ParentChildRelMetadataService;
import com.netflix.metacat.common.server.usermetadata.GetMetadataInterceptorParameters;
import com.netflix.metacat.common.server.usermetadata.MetacatOperation;
import com.netflix.metacat.common.server.usermetadata.TagService;
Expand Down Expand Up @@ -89,6 +94,7 @@ public class TableServiceImpl implements TableService {
private final ConverterUtil converterUtil;
private final AuthorizationService authorizationService;
private final OwnerValidationService ownerValidationService;
private final ParentChildRelMetadataService parentChildRelMetadataService;

/**
* {@inheritDoc}
Expand All @@ -109,6 +115,7 @@ public TableDto create(final QualifiedName name, final TableDto tableDto) {
connectorTableServiceProxy.create(name, converterUtil.fromTableDto(tableDto));

if (tableDto.getDataMetadata() != null || tableDto.getDefinitionMetadata() != null) {
saveParentChildRelationship(name, tableDto);
log.info("Saving user metadata for table {}", name);
final long start = registry.clock().wallTime();
userMetadataService.saveMetadata(metacatRequestContext.getUserName(), tableDto, true);
Expand Down Expand Up @@ -139,6 +146,81 @@ public TableDto create(final QualifiedName name, final TableDto tableDto) {
return dto;
}

private ObjectNode createParentChildObjectNode(@Nullable final ParentInfo parentInfo,
final Set<ChildInfo> childInfos) {
final ObjectMapper objectMapper = new ObjectMapper();
// Create the root ObjectNode
final ObjectNode rootNode = objectMapper.createObjectNode();

if (parentInfo != null) {
// Convert ParentInfo to an ObjectNode
final ObjectNode parentNode = objectMapper.createObjectNode();
parentNode.put("name", parentInfo.getName());
parentNode.put("relationType", parentInfo.getRelationType());

// Add ParentInfo to root node
rootNode.set("parentInfo", parentNode);
}

if (childInfos.isEmpty()) {
// Convert Set<ChildInfo> to an ArrayNode
final ArrayNode childrenArrayNode = objectMapper.createArrayNode();
for (ChildInfo childInfo : childInfos) {
final ObjectNode childNode = objectMapper.createObjectNode();
childNode.put("name", childInfo.getName());
childNode.put("relationType", childInfo.getRelationType());
childrenArrayNode.add(childNode);
}
// Add ChildInfo array to root node
rootNode.set("childInfos", childrenArrayNode);
}
return rootNode;
}

private void saveParentChildRelationship(final QualifiedName child, final TableDto tableDto) {
if (tableDto.getDefinitionMetadata() != null) {
final ObjectNode definitionMetadata = tableDto.getDefinitionMetadata();
if (definitionMetadata.has("parent")) {
// fetch parent name
final String parentFullName = definitionMetadata.path("parent").asText();
final String[] splits = parentFullName.split("\\.");
if (splits.length != 3) {
throw new RuntimeException("Parent table identifier should pass in the following format "
+ "{catalog}.{db}.{parentTable}");
}
final QualifiedName parent = QualifiedName.ofTable(
splits[0],
splits[1],
splits[2]
);
validate(parent);

// fetch relationshipType
String relationType = "unknown";
if (definitionMetadata.has("relationType")) {
relationType = definitionMetadata.path("relationType").asText();
}
try {
log.info("Saving parent child relationship for child={}, parent={}, relationType={}",
parent, child, relationType);
parentChildRelMetadataService.createParentChildRelation(parent, child, relationType);
} catch (Exception e) {
try {
log.info("Attempting to delete child={}, after fail to store parent child relation ship for "
+ "parent={}, relationType={}",
child, child, parent);
delete(child);
} catch (Exception deleteException) {
log.error("Fail to deleteTable = {} after failing to save the parent child relationship "
+ "for child={}, parent={}",
child, child, parent, deleteException);
}
throw e;
}
}
}
}

private void setDefaultAttributes(final TableDto tableDto) {
setDefaultSerdeIfNull(tableDto);
setDefaultDefinitionMetadataIfNull(tableDto);
Expand Down Expand Up @@ -270,6 +352,11 @@ public TableDto deleteAndReturn(final QualifiedName name, final boolean isMView)
// Try to delete the table even if get above fails
try {
connectorTableServiceProxy.delete(name);
try {
parentChildRelMetadataService.drop(name);
} catch (Exception e) {
log.error("Fail to drop parent child relation for table = {}", name, e);
}

// If this is a common view, the storage_table if present
// should also be deleted.
Expand Down Expand Up @@ -373,11 +460,20 @@ public Optional<TableDto> get(final QualifiedName name, final GetTableServicePar
}

if (getTableServiceParameters.isIncludeDefinitionMetadata()) {
final Optional<ObjectNode> definitionMetadata =
Optional<ObjectNode> definitionMetadata =
(getTableServiceParameters.isDisableOnReadMetadataIntercetor())
? userMetadataService.getDefinitionMetadata(name)
: userMetadataService.getDefinitionMetadataWithInterceptor(name,
GetMetadataInterceptorParameters.builder().hasMetadata(tableInternal).build());
// Always get the source of truth for parent child relation from the parentChildRelMetadataService
final ParentInfo parentInfo = parentChildRelMetadataService.getParent(name);
final Set<ChildInfo> childInfos = parentChildRelMetadataService.getChildren(name);
final ObjectNode parentChildRelObjectNode = createParentChildObjectNode(parentInfo, childInfos);
if (definitionMetadata.isPresent()) {
definitionMetadata.get().set("parentChildRelationInfo", parentChildRelObjectNode);
} else {
definitionMetadata = Optional.of(parentChildRelObjectNode);
}
definitionMetadata.ifPresent(table::setDefinitionMetadata);
}

Expand Down Expand Up @@ -438,6 +534,18 @@ public void rename(
//Ignore if the operation is not supported, so that we can at least go ahead and save the user metadata
eventBus.post(new MetacatRenameTablePreEvent(oldName, metacatRequestContext, this, newName));
connectorTableServiceProxy.rename(oldName, newName, isMView);
try {
parentChildRelMetadataService.rename(oldName, newName);
} catch (Exception e) {
try {
// rename the parent back if fail to update parent child relation
connectorTableServiceProxy.rename(newName, oldName, isMView);
} catch (Exception renameException) {
log.error("Fail to rename from {} to {} after failing to save the parent child relationship",
oldName, newName, renameException);
}
throw e;
}
userMetadataService.renameDefinitionMetadataKey(oldName, newName);
tagService.renameTableTags(oldName, newName.getTableName());

Expand Down

0 comments on commit 5c801b9

Please sign in to comment.