Skip to content

Commit

Permalink
Hive: Add View support for HIVE catalog (#9852)
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 authored Sep 13, 2024
1 parent a2b8008 commit e449d34
Show file tree
Hide file tree
Showing 9 changed files with 1,508 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.exceptions;

import com.google.errorprone.annotations.FormatMethod;

/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
public class NoSuchIcebergViewException extends NoSuchViewException {
@FormatMethod
public NoSuchIcebergViewException(String message, Object... args) {
super(message, args);
}

@FormatMethod
public static void check(boolean test, String message, Object... args) {
if (!test) {
throw new NoSuchIcebergViewException(message, args);
}
}
}
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.ViewMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,6 +138,23 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

/**
* Drops view metadata files referenced by ViewMetadata.
*
* <p>This should be called by dropView implementations
*
* @param io a FileIO to use for deletes
* @param metadata the last valid ViewMetadata instance for a dropped view.
*/
public static void dropViewMetadata(FileIO io, ViewMetadata metadata) {
boolean gcEnabled =
PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT);

if (gcEnabled) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}
}

@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Expand Down
200 changes: 193 additions & 7 deletions hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,29 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
Expand All @@ -56,13 +59,21 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewOperations;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
public class HiveCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, Configurable {
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";

Expand Down Expand Up @@ -117,6 +128,16 @@ public void initialize(String inputName, Map<String, String> properties) {
this.fileIOTracker = new FileIOTracker();
}

@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return new ViewAwareTableBuilder(identifier, schema);
}

@Override
public ViewBuilder buildView(TableIdentifier identifier) {
return new TableAwareViewBuilder(identifier);
}

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -156,6 +177,38 @@ public List<TableIdentifier> listTables(Namespace namespace) {
}
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
Preconditions.checkArgument(
isValidateNamespace(namespace), "Missing database in namespace: %s", namespace);

try {
String database = namespace.level(0);
List<String> viewNames =
clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW));

// Retrieving the Table objects from HMS in batches to avoid OOM
List<TableIdentifier> filteredTableIdentifiers = Lists.newArrayList();
Iterable<List<String>> viewNameSets = Iterables.partition(viewNames, 100);

for (List<String> viewNameSet : viewNameSets) {
filteredTableIdentifiers.addAll(
listIcebergTables(viewNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE));
}

return filteredTableIdentifiers;
} catch (UnknownDBException e) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);

} catch (TException e) {
throw new RuntimeException("Failed to list all views under namespace " + namespace, e);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted in call to listViews", e);
}
}

@Override
public String name() {
return name;
Expand Down Expand Up @@ -213,11 +266,57 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
}

@Override
public boolean dropView(TableIdentifier identifier) {
if (!isValidIdentifier(identifier)) {
return false;
}

try {
String database = identifier.namespace().level(0);
String viewName = identifier.name();

HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier);
ViewMetadata lastViewMetadata = null;
try {
lastViewMetadata = ops.current();
} catch (NotFoundException e) {
LOG.warn("Failed to load view metadata for view: {}", identifier, e);
}

clients.run(
client -> {
client.dropTable(database, viewName, false, false);
return null;
});

if (lastViewMetadata != null) {
CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata);
}

LOG.info("Dropped view: {}", identifier);
return true;
} catch (NoSuchObjectException e) {
LOG.info("Skipping drop, view does not exist: {}", identifier, e);
return false;
} catch (TException e) {
throw new RuntimeException("Failed to drop view " + identifier, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted in call to dropView", e);
}
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE);
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW);
}

private List<TableIdentifier> listIcebergTables(
List<String> tableNames, Namespace namespace, String tableTypeProp)
throws TException, InterruptedException {
Expand All @@ -233,13 +332,12 @@ private List<TableIdentifier> listIcebergTables(
.collect(Collectors.toList());
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void renameTableOrView(
TableIdentifier from,
TableIdentifier originalTo,
HiveOperationsBase.ContentType contentType) {
if (!isValidIdentifier(from)) {
throw new NoSuchTableException("Invalid identifier: %s", from);
}
Preconditions.checkArgument(isValidIdentifier(from), "Invalid identifier: %s", from);

TableIdentifier to = removeCatalogName(originalTo);
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
Expand All @@ -248,6 +346,16 @@ private void renameTableOrView(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}

if (tableExists(to)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Cannot rename %s to %s. Table already exists", from, to);
}

if (viewExists(to)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Cannot rename %s to %s. View already exists", from, to);
}

String toDatabase = to.namespace().level(0);
String fromDatabase = from.namespace().level(0);
String fromName = from.name();
Expand All @@ -268,7 +376,12 @@ private void renameTableOrView(
LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to);

} catch (NoSuchObjectException e) {
throw new NoSuchTableException("Table does not exist: %s", from);
switch (contentType) {
case TABLE:
throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to);
case VIEW:
throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to);
}

} catch (InvalidOperationException e) {
if (e.getMessage() != null
Expand All @@ -295,7 +408,7 @@ private void validateTableIsIcebergTableOrView(
HiveOperationsBase.validateTableIsIceberg(table, fullName);
break;
case VIEW:
throw new UnsupportedOperationException("View is not supported.");
HiveOperationsBase.validateTableIsIcebergView(table, fullName);
}
}

Expand Down Expand Up @@ -522,6 +635,11 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) {
return ops;
}

@Override
protected ViewOperations newViewOps(TableIdentifier identifier) {
return new HiveViewOperations(conf, clients, fileIO, name, identifier);
}

@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
// This is a little edgy since we basically duplicate the HMS location generation logic.
Expand Down Expand Up @@ -660,4 +778,72 @@ void setListAllTables(boolean listAllTables) {
ClientPool<IMetaStoreClient, TException> clientPool() {
return clients;
}

/**
* The purpose of this class is to add view detection only for Hive-Specific tables. Hive catalog
* follows checks at different levels: 1. During refresh, it validates if the table is an iceberg
* table or not. 2. During commit, it validates if there is any concurrent commit with table or
* table-name already exists. This class helps to do the validation on an early basis.
*/
private class ViewAwareTableBuilder extends BaseMetastoreViewCatalogTableBuilder {

private final TableIdentifier identifier;

private ViewAwareTableBuilder(TableIdentifier identifier, Schema schema) {
super(identifier, schema);
this.identifier = identifier;
}

@Override
public Transaction createOrReplaceTransaction() {
if (viewExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"View with same name already exists: %s", identifier);
}
return super.createOrReplaceTransaction();
}

@Override
public org.apache.iceberg.Table create() {
if (viewExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"View with same name already exists: %s", identifier);
}
return super.create();
}
}

/**
* The purpose of this class is to add table detection only for Hive-Specific view. Hive catalog
* follows checks at different levels: 1. During refresh, it validates if the view is an iceberg
* view or not. 2. During commit, it validates if there is any concurrent commit with view or
* view-name already exists. This class helps to do the validation on an early basis.
*/
private class TableAwareViewBuilder extends BaseViewBuilder {

private final TableIdentifier identifier;

private TableAwareViewBuilder(TableIdentifier identifier) {
super(identifier);
this.identifier = identifier;
}

@Override
public View createOrReplace() {
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Table with same name already exists: %s", identifier);
}
return super.createOrReplace();
}

@Override
public View create() {
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Table with same name already exists: %s", identifier);
}
return super.create();
}
}
}
Loading

0 comments on commit e449d34

Please sign in to comment.