diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java
new file mode 100644
index 000000000000..bc5da2aee280
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+
+/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
+public class NoSuchIcebergViewException extends NoSuchViewException {
+ @FormatMethod
+ public NoSuchIcebergViewException(String message, Object... args) {
+ super(message, args);
+ }
+
+ @FormatMethod
+ public static void check(boolean test, String message, Object... args) {
+ if (!test) {
+ throw new NoSuchIcebergViewException(message, args);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
index d4fcbda0686d..70b10cbaeb62 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -47,6 +47,7 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
+import org.apache.iceberg.view.ViewMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,6 +138,23 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}
+ /**
+ * Drops view metadata files referenced by ViewMetadata.
+ *
+ *
This should be called by dropView implementations
+ *
+ * @param io a FileIO to use for deletes
+ * @param metadata the last valid ViewMetadata instance for a dropped view.
+ */
+ public static void dropViewMetadata(FileIO io, ViewMetadata metadata) {
+ boolean gcEnabled =
+ PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT);
+
+ if (gcEnabled) {
+ deleteFile(io, metadata.metadataFileLocation(), "metadata");
+ }
+ }
+
@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 5c58222f0c01..1cf738d736cb 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -34,19 +35,21 @@
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
@@ -56,13 +59,21 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.view.BaseMetastoreViewCatalog;
+import org.apache.iceberg.view.View;
+import org.apache.iceberg.view.ViewBuilder;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewOperations;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreViewCatalog
+ implements SupportsNamespaces, Configurable {
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";
@@ -117,6 +128,16 @@ public void initialize(String inputName, Map properties) {
this.fileIOTracker = new FileIOTracker();
}
+ @Override
+ public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
+ return new ViewAwareTableBuilder(identifier, schema);
+ }
+
+ @Override
+ public ViewBuilder buildView(TableIdentifier identifier) {
+ return new TableAwareViewBuilder(identifier);
+ }
+
@Override
public List listTables(Namespace namespace) {
Preconditions.checkArgument(
@@ -156,6 +177,38 @@ public List listTables(Namespace namespace) {
}
}
+ @Override
+ public List listViews(Namespace namespace) {
+ Preconditions.checkArgument(
+ isValidateNamespace(namespace), "Missing database in namespace: %s", namespace);
+
+ try {
+ String database = namespace.level(0);
+ List viewNames =
+ clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW));
+
+ // Retrieving the Table objects from HMS in batches to avoid OOM
+ List filteredTableIdentifiers = Lists.newArrayList();
+ Iterable> viewNameSets = Iterables.partition(viewNames, 100);
+
+ for (List viewNameSet : viewNameSets) {
+ filteredTableIdentifiers.addAll(
+ listIcebergTables(viewNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE));
+ }
+
+ return filteredTableIdentifiers;
+ } catch (UnknownDBException e) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to list all views under namespace " + namespace, e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to listViews", e);
+ }
+ }
+
@Override
public String name() {
return name;
@@ -213,11 +266,57 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
}
+ @Override
+ public boolean dropView(TableIdentifier identifier) {
+ if (!isValidIdentifier(identifier)) {
+ return false;
+ }
+
+ try {
+ String database = identifier.namespace().level(0);
+ String viewName = identifier.name();
+
+ HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier);
+ ViewMetadata lastViewMetadata = null;
+ try {
+ lastViewMetadata = ops.current();
+ } catch (NotFoundException e) {
+ LOG.warn("Failed to load view metadata for view: {}", identifier, e);
+ }
+
+ clients.run(
+ client -> {
+ client.dropTable(database, viewName, false, false);
+ return null;
+ });
+
+ if (lastViewMetadata != null) {
+ CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata);
+ }
+
+ LOG.info("Dropped view: {}", identifier);
+ return true;
+ } catch (NoSuchObjectException e) {
+ LOG.info("Skipping drop, view does not exist: {}", identifier, e);
+ return false;
+ } catch (TException e) {
+ throw new RuntimeException("Failed to drop view " + identifier, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to dropView", e);
+ }
+ }
+
@Override
public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE);
}
+ @Override
+ public void renameView(TableIdentifier from, TableIdentifier to) {
+ renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW);
+ }
+
private List listIcebergTables(
List tableNames, Namespace namespace, String tableTypeProp)
throws TException, InterruptedException {
@@ -233,13 +332,12 @@ private List listIcebergTables(
.collect(Collectors.toList());
}
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
private void renameTableOrView(
TableIdentifier from,
TableIdentifier originalTo,
HiveOperationsBase.ContentType contentType) {
- if (!isValidIdentifier(from)) {
- throw new NoSuchTableException("Invalid identifier: %s", from);
- }
+ Preconditions.checkArgument(isValidIdentifier(from), "Invalid identifier: %s", from);
TableIdentifier to = removeCatalogName(originalTo);
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
@@ -248,6 +346,16 @@ private void renameTableOrView(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}
+ if (tableExists(to)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+ "Cannot rename %s to %s. Table already exists", from, to);
+ }
+
+ if (viewExists(to)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+ "Cannot rename %s to %s. View already exists", from, to);
+ }
+
String toDatabase = to.namespace().level(0);
String fromDatabase = from.namespace().level(0);
String fromName = from.name();
@@ -268,7 +376,12 @@ private void renameTableOrView(
LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to);
} catch (NoSuchObjectException e) {
- throw new NoSuchTableException("Table does not exist: %s", from);
+ switch (contentType) {
+ case TABLE:
+ throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to);
+ case VIEW:
+ throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to);
+ }
} catch (InvalidOperationException e) {
if (e.getMessage() != null
@@ -295,7 +408,7 @@ private void validateTableIsIcebergTableOrView(
HiveOperationsBase.validateTableIsIceberg(table, fullName);
break;
case VIEW:
- throw new UnsupportedOperationException("View is not supported.");
+ HiveOperationsBase.validateTableIsIcebergView(table, fullName);
}
}
@@ -522,6 +635,11 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) {
return ops;
}
+ @Override
+ protected ViewOperations newViewOps(TableIdentifier identifier) {
+ return new HiveViewOperations(conf, clients, fileIO, name, identifier);
+ }
+
@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
// This is a little edgy since we basically duplicate the HMS location generation logic.
@@ -660,4 +778,72 @@ void setListAllTables(boolean listAllTables) {
ClientPool clientPool() {
return clients;
}
+
+ /**
+ * The purpose of this class is to add view detection only for Hive-Specific tables. Hive catalog
+ * follows checks at different levels: 1. During refresh, it validates if the table is an iceberg
+ * table or not. 2. During commit, it validates if there is any concurrent commit with table or
+ * table-name already exists. This class helps to do the validation on an early basis.
+ */
+ private class ViewAwareTableBuilder extends BaseMetastoreViewCatalogTableBuilder {
+
+ private final TableIdentifier identifier;
+
+ private ViewAwareTableBuilder(TableIdentifier identifier, Schema schema) {
+ super(identifier, schema);
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Transaction createOrReplaceTransaction() {
+ if (viewExists(identifier)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+ "View with same name already exists: %s", identifier);
+ }
+ return super.createOrReplaceTransaction();
+ }
+
+ @Override
+ public org.apache.iceberg.Table create() {
+ if (viewExists(identifier)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+ "View with same name already exists: %s", identifier);
+ }
+ return super.create();
+ }
+ }
+
+ /**
+ * The purpose of this class is to add table detection only for Hive-Specific view. Hive catalog
+ * follows checks at different levels: 1. During refresh, it validates if the view is an iceberg
+ * view or not. 2. During commit, it validates if there is any concurrent commit with view or
+ * view-name already exists. This class helps to do the validation on an early basis.
+ */
+ private class TableAwareViewBuilder extends BaseViewBuilder {
+
+ private final TableIdentifier identifier;
+
+ private TableAwareViewBuilder(TableIdentifier identifier) {
+ super(identifier);
+ this.identifier = identifier;
+ }
+
+ @Override
+ public View createOrReplace() {
+ if (tableExists(identifier)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+ "Table with same name already exists: %s", identifier);
+ }
+ return super.createOrReplace();
+ }
+
+ @Override
+ public View create() {
+ if (tableExists(identifier)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(
+ "Table with same name already exists: %s", identifier);
+ }
+ return super.create();
+ }
+ }
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
index 6500e724a4f0..4c78c43096fe 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
@@ -33,6 +33,7 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
+import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -53,6 +54,7 @@ interface HiveOperationsBase {
long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;
String NO_LOCK_EXPECTED_KEY = "expected_parameter_key";
String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value";
+ String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view";
enum ContentType {
TABLE("Table"),
@@ -129,6 +131,17 @@ static void validateTableIsIceberg(Table table, String fullName) {
tableType);
}
+ static void validateTableIsIcebergView(Table table, String fullName) {
+ String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+ NoSuchIcebergViewException.check(
+ TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType())
+ && ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
+ "Not an iceberg view: %s (type=%s) (tableType=%s)",
+ fullName,
+ tableTypeProp,
+ table.getTableType());
+ }
+
default void persistTable(Table hmsTable, boolean updateHiveTable, String metadataLocation)
throws TException, InterruptedException {
if (updateHiveTable) {
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 64f091385297..518daaf6acd1 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -167,7 +167,7 @@ protected void doRefresh() {
refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
}
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
@@ -191,6 +191,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (newTable
&& tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
!= null) {
+ if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) {
+ throw new AlreadyExistsException(
+ "View with same name already exists: %s.%s", database, tableName);
+ }
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
new file mode 100644
index 000000000000..4fc71299d457
--- /dev/null
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import static java.util.Collections.emptySet;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.hadoop.ConfigProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.SQLViewRepresentation;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewRepresentation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Hive implementation of Iceberg {@link org.apache.iceberg.view.ViewOperations}. */
+final class HiveViewOperations extends BaseViewOperations implements HiveOperationsBase {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveViewOperations.class);
+
+ private final String fullName;
+ private final String database;
+ private final String viewName;
+ private final FileIO fileIO;
+ private final ClientPool metaClients;
+ private final long maxHiveTablePropertySize;
+ private final Configuration conf;
+ private final String catalogName;
+
+ HiveViewOperations(
+ Configuration conf,
+ ClientPool metaClients,
+ FileIO fileIO,
+ String catalogName,
+ TableIdentifier viewIdentifier) {
+ this.conf = conf;
+ this.catalogName = catalogName;
+ this.metaClients = metaClients;
+ this.fileIO = fileIO;
+ this.fullName = CatalogUtil.fullTableName(catalogName, viewIdentifier);
+ this.database = viewIdentifier.namespace().level(0);
+ this.viewName = viewIdentifier.name();
+ this.maxHiveTablePropertySize =
+ conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+ }
+
+ @Override
+ public void doRefresh() {
+ String metadataLocation = null;
+ Table table;
+
+ try {
+ table = metaClients.run(client -> client.getTable(database, viewName));
+ HiveOperationsBase.validateTableIsIcebergView(table, fullName);
+
+ metadataLocation =
+ table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
+ } catch (NoSuchObjectException e) {
+ if (currentMetadataLocation() != null) {
+ throw new NoSuchViewException("View does not exist: %s.%s", database, viewName);
+ }
+ } catch (TException e) {
+ String errMsg =
+ String.format("Failed to get view info from metastore %s.%s", database, viewName);
+ throw new RuntimeException(errMsg, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during refresh", e);
+ }
+
+ refreshFromMetadataLocation(metadataLocation);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ @Override
+ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+ boolean newView = base == null;
+ String newMetadataLocation = writeNewMetadataIfRequired(metadata);
+ boolean hiveEngineEnabled = false;
+
+ CommitStatus commitStatus = CommitStatus.FAILURE;
+ boolean updateHiveView = false;
+
+ HiveLock lock = lockObject();
+ try {
+ lock.lock();
+
+ Table tbl = loadHmsTable();
+
+ if (tbl != null) {
+ // If we try to create the view but the metadata location is already set, then we had a
+ // concurrent commit
+ if (newView
+ && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)
+ != null) {
+ throw new AlreadyExistsException(
+ "%s already exists: %s.%s",
+ TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())
+ ? ContentType.VIEW.value()
+ : ContentType.TABLE.value(),
+ database,
+ viewName);
+ }
+
+ updateHiveView = true;
+ LOG.debug("Committing existing view: {}", fullName);
+ } else {
+ tbl = newHMSView(metadata);
+ LOG.debug("Committing new view: {}", fullName);
+ }
+
+ tbl.setSd(
+ HiveOperationsBase.storageDescriptor(
+ metadata.schema(),
+ metadata.location(),
+ hiveEngineEnabled)); // set to pick up any schema changes
+
+ String metadataLocation =
+ tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+ String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+ if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
+ throw new CommitFailedException(
+ "Cannot commit: Base metadata location '%s' is not same as the current view metadata location '%s' for %s.%s",
+ baseMetadataLocation, metadataLocation, database, viewName);
+ }
+
+ // get Iceberg props that have been removed
+ Set removedProps = emptySet();
+ if (base != null) {
+ removedProps =
+ base.properties().keySet().stream()
+ .filter(key -> !metadata.properties().containsKey(key))
+ .collect(Collectors.toSet());
+ }
+
+ setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps);
+
+ lock.ensureActive();
+
+ try {
+ persistTable(tbl, updateHiveView, hiveLockEnabled(conf) ? null : baseMetadataLocation);
+ lock.ensureActive();
+
+ commitStatus = CommitStatus.SUCCESS;
+ } catch (LockException le) {
+ commitStatus = CommitStatus.UNKNOWN;
+ throw new CommitStateUnknownException(
+ "Failed to heartbeat for hive lock while "
+ + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. "
+ + "Please check the commit history. If you are running into this issue, try reducing "
+ + "iceberg.hive.lock-heartbeat-interval-ms.",
+ le);
+ } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
+ throw new AlreadyExistsException(e, "View already exists: %s.%s", database, viewName);
+
+ } catch (InvalidObjectException e) {
+ throw new ValidationException(e, "Invalid Hive object for %s.%s", database, viewName);
+
+ } catch (CommitFailedException | CommitStateUnknownException e) {
+ throw e;
+
+ } catch (Throwable e) {
+ if (e.getMessage() != null
+ && e.getMessage()
+ .contains(
+ "The table has been modified. The parameter value for key '"
+ + BaseMetastoreTableOperations.METADATA_LOCATION_PROP
+ + "' is")) {
+ throw new CommitFailedException(
+ e, "The view %s.%s has been modified concurrently", database, viewName);
+ }
+
+ if (e.getMessage() != null
+ && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
+ throw new RuntimeException(
+ "Failed to acquire locks from metastore because the underlying metastore "
+ + "view 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not "
+ + "support transactions. To fix this use an alternative metastore.",
+ e);
+ }
+
+ LOG.error(
+ "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
+ database,
+ viewName,
+ e);
+ commitStatus =
+ checkCommitStatus(
+ viewName,
+ newMetadataLocation,
+ metadata.properties(),
+ () -> checkCurrentMetadataLocation(newMetadataLocation));
+ switch (commitStatus) {
+ case SUCCESS:
+ break;
+ case FAILURE:
+ throw e;
+ case UNKNOWN:
+ throw new CommitStateUnknownException(e);
+ }
+ }
+ } catch (TException e) {
+ throw new RuntimeException(
+ String.format("Metastore operation failed for %s.%s", database, viewName), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during commit", e);
+
+ } catch (LockException e) {
+ throw new CommitFailedException(e);
+
+ } finally {
+ HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock);
+ }
+
+ LOG.info(
+ "Committed to view {} with the new metadata location {}", fullName, newMetadataLocation);
+ }
+
+ /**
+ * Validate if the new metadata location is the current metadata location.
+ *
+ * @param newMetadataLocation newly written metadata location
+ * @return true if the new metadata location is the current metadata location
+ */
+ private boolean checkCurrentMetadataLocation(String newMetadataLocation) {
+ ViewMetadata metadata = refresh();
+ return newMetadataLocation.equals(metadata.metadataFileLocation());
+ }
+
+ private void setHmsTableParameters(
+ String newMetadataLocation, Table tbl, ViewMetadata metadata, Set obsoleteProps) {
+ Map parameters =
+ Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap);
+
+ // push all Iceberg view properties into HMS
+ metadata.properties().entrySet().stream()
+ .filter(entry -> !entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER))
+ .forEach(entry -> parameters.put(entry.getKey(), entry.getValue()));
+ if (metadata.uuid() != null) {
+ parameters.put("uuid", metadata.uuid());
+ }
+
+ // remove any props from HMS that are no longer present in Iceberg view props
+ obsoleteProps.forEach(parameters::remove);
+
+ parameters.put(
+ BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+ ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+ parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, newMetadataLocation);
+
+ if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+ parameters.put(
+ BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+ }
+
+ setSchema(metadata.schema(), parameters);
+ tbl.setParameters(parameters);
+ }
+
+ private static boolean hiveLockEnabled(Configuration conf) {
+ return conf.getBoolean(ConfigProperties.LOCK_HIVE_ENABLED, true);
+ }
+
+ private Table newHMSView(ViewMetadata metadata) {
+ final long currentTimeMillis = System.currentTimeMillis();
+ String hmsTableOwner =
+ PropertyUtil.propertyAsString(
+ metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser());
+ String sqlQuery = sqlFor(metadata);
+
+ return new Table(
+ table(),
+ database(),
+ hmsTableOwner,
+ (int) currentTimeMillis / 1000,
+ (int) currentTimeMillis / 1000,
+ Integer.MAX_VALUE,
+ null,
+ Collections.emptyList(),
+ Maps.newHashMap(),
+ sqlQuery,
+ sqlQuery,
+ tableType().name());
+ }
+
+ private String sqlFor(ViewMetadata metadata) {
+ SQLViewRepresentation closest = null;
+ for (ViewRepresentation representation : metadata.currentVersion().representations()) {
+ if (representation instanceof SQLViewRepresentation) {
+ SQLViewRepresentation sqlViewRepresentation = (SQLViewRepresentation) representation;
+ if (sqlViewRepresentation.dialect().equalsIgnoreCase("hive")) {
+ return sqlViewRepresentation.sql();
+ } else if (closest == null) {
+ closest = sqlViewRepresentation;
+ }
+ }
+ }
+
+ return closest == null ? null : closest.sql();
+ }
+
+ @VisibleForTesting
+ HiveLock lockObject() {
+ if (hiveLockEnabled(conf)) {
+ return new MetastoreLock(conf, metaClients, catalogName, database, viewName);
+ } else {
+ return new NoLock();
+ }
+ }
+
+ @Override
+ protected String viewName() {
+ return fullName;
+ }
+
+ @Override
+ public TableType tableType() {
+ return TableType.VIRTUAL_VIEW;
+ }
+
+ @Override
+ public ClientPool metaClients() {
+ return metaClients;
+ }
+
+ @Override
+ public long maxHiveTablePropertySize() {
+ return maxHiveTablePropertySize;
+ }
+
+ @Override
+ public String database() {
+ return database;
+ }
+
+ @Override
+ public String table() {
+ return viewName;
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIO;
+ }
+}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index 9249deb7598e..7d0eb641a385 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -155,6 +155,21 @@ private Schema getTestSchema() {
required(2, "data", Types.StringType.get()));
}
+ @Test
+ public void testInvalidIdentifiersWithRename() {
+ TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "table1");
+ TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedTable");
+ assertThatThrownBy(() -> catalog.renameTable(invalidFrom, validTo))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid identifier: " + invalidFrom);
+
+ TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "table1");
+ TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "renamedTable");
+ assertThatThrownBy(() -> catalog.renameTable(validFrom, invalidTo))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid identifier: " + invalidTo);
+ }
+
@Test
public void testCreateTableBuilder() throws Exception {
Schema schema = getTestSchema();
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java
new file mode 100644
index 000000000000..3c195e256520
--- /dev/null
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCatalog.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.view.BaseView;
+import org.apache.iceberg.view.ViewCatalogTests;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestHiveViewCatalog extends ViewCatalogTests {
+
+ private HiveCatalog catalog;
+
+ @RegisterExtension
+ private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION =
+ HiveMetastoreExtension.builder().build();
+
+ @BeforeEach
+ public void before() throws TException {
+ catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
+ ImmutableMap.of(
+ CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+ String.valueOf(TimeUnit.SECONDS.toMillis(10))),
+ HIVE_METASTORE_EXTENSION.hiveConf());
+ }
+
+ @AfterEach
+ public void cleanup() throws Exception {
+ HIVE_METASTORE_EXTENSION.metastore().reset();
+ }
+
+ @Override
+ protected HiveCatalog catalog() {
+ return catalog;
+ }
+
+ @Override
+ protected Catalog tableCatalog() {
+ return catalog;
+ }
+
+ @Override
+ protected boolean requiresNamespaceCreate() {
+ return true;
+ }
+
+ @Test
+ public void testHiveViewAndIcebergViewWithSameName() throws TException, IOException {
+ String dbName = "hivedb";
+ Namespace ns = Namespace.of(dbName);
+ String viewName = "test_hive_view";
+ TableIdentifier identifier = TableIdentifier.of(ns, viewName);
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog.listViews(ns)).isEmpty();
+ // create a hive table
+ Table hiveTable =
+ createHiveView(
+ viewName, dbName, Files.createTempDirectory("hive-view-tests-name").toString());
+ HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable);
+
+ catalog.setListAllTables(true);
+ assertThat(catalog.listTables(ns)).containsExactly(identifier).hasSize(1);
+
+ assertThat(catalog.viewExists(identifier)).isFalse();
+
+ assertThatThrownBy(
+ () ->
+ catalog
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(ns)
+ .withQuery("hive", "select * from hivedb.tbl")
+ .create())
+ .isInstanceOf(NoSuchIcebergViewException.class)
+ .hasMessageStartingWith("Not an iceberg view: hive.hivedb.test_hive_view");
+ }
+
+ @Test
+ public void testListViewWithHiveView() throws TException, IOException {
+ String dbName = "hivedb";
+ Namespace ns = Namespace.of(dbName);
+ TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_view");
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog.viewExists(identifier)).isFalse();
+ assertThat(catalog.listViews(ns)).isEmpty();
+
+ String hiveViewName = "test_hive_view";
+ // create a hive table
+ Table hiveTable =
+ createHiveView(
+ hiveViewName, dbName, Files.createTempDirectory("hive-view-tests-list").toString());
+ HIVE_METASTORE_EXTENSION.metastoreClient().createTable(hiveTable);
+
+ catalog.setListAllTables(true);
+
+ assertThat(catalog.listTables(ns))
+ .containsExactly(TableIdentifier.of(ns, hiveViewName))
+ .hasSize(1);
+
+ assertThat(catalog.listViews(ns)).hasSize(0);
+
+ catalog
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(ns)
+ .withQuery("hive", "select * from hivedb.tbl")
+ .create();
+ assertThat(catalog.viewExists(identifier)).isTrue();
+
+ assertThat(catalog.listViews(ns)).containsExactly(identifier).hasSize(1);
+ }
+
+ @Test
+ public void testViewWithHiveParameters() throws TException, IOException {
+ String dbName = "hivedb";
+ Namespace ns = Namespace.of(dbName);
+ TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_view");
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog.viewExists(identifier)).isFalse();
+ String tableQuery = "select * from hivedb.tbl";
+
+ catalog
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(ns)
+ .withQuery("hive", tableQuery)
+ .create();
+ assertThat(catalog.viewExists(identifier)).isTrue();
+
+ Table hiveTable =
+ HIVE_METASTORE_EXTENSION.metastoreClient().getTable(dbName, identifier.name());
+ assertThat(hiveTable.getViewOriginalText()).isEqualTo(tableQuery);
+ assertThat(hiveTable.getViewExpandedText()).isEqualTo(tableQuery);
+ }
+
+ @Test
+ public void testInvalidIdentifiersWithRename() {
+ TableIdentifier invalidFrom = TableIdentifier.of(Namespace.of("l1", "l2"), "view");
+ TableIdentifier validTo = TableIdentifier.of(Namespace.of("l1"), "renamedView");
+ assertThatThrownBy(() -> catalog.renameView(invalidFrom, validTo))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid identifier: " + invalidFrom);
+
+ TableIdentifier validFrom = TableIdentifier.of(Namespace.of("l1"), "view");
+ TableIdentifier invalidTo = TableIdentifier.of(Namespace.of("l1", "l2"), "renamedView");
+ assertThatThrownBy(() -> catalog.renameView(validFrom, invalidTo))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid identifier: " + invalidTo);
+ }
+
+ @Test
+ public void dropViewShouldNotDropMetadataFileIfGcNotEnabled() throws IOException {
+ String dbName = "hivedb";
+ Namespace ns = Namespace.of(dbName);
+ TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_drop_view_gc_disabled");
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(identifier.namespace());
+ }
+
+ BaseView view =
+ (BaseView)
+ catalog
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(ns)
+ .withQuery("hive", "select * from hivedb.tbl")
+ .withProperty(TableProperties.GC_ENABLED, "false")
+ .create();
+
+ assertThat(catalog.viewExists(identifier)).isTrue();
+
+ Path viewLocation = new Path(view.location());
+ String currentMetadataLocation = view.operations().current().metadataFileLocation();
+
+ catalog.dropView(identifier);
+
+ assertThat(
+ viewLocation
+ .getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf())
+ .exists(new Path(currentMetadataLocation)))
+ .isTrue();
+ assertThat(catalog.viewExists(identifier)).isFalse();
+ }
+
+ @Test
+ public void dropViewShouldDropMetadataFileIfGcEnabled() throws IOException {
+ String dbName = "hivedb";
+ Namespace ns = Namespace.of(dbName);
+ TableIdentifier identifier = TableIdentifier.of(ns, "test_iceberg_drop_view_gc_enabled");
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(identifier.namespace());
+ }
+
+ BaseView view =
+ (BaseView)
+ catalog
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(ns)
+ .withQuery("hive", "select * from hivedb.tbl")
+ .withProperty(TableProperties.GC_ENABLED, "true")
+ .create();
+
+ assertThat(catalog.viewExists(identifier)).isTrue();
+
+ Path viewLocation = new Path(view.location());
+ String currentMetadataLocation = view.operations().current().metadataFileLocation();
+
+ assertThat(
+ viewLocation
+ .getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf())
+ .exists(new Path(currentMetadataLocation)))
+ .isTrue();
+
+ catalog.dropView(identifier);
+
+ assertThat(
+ viewLocation
+ .getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf())
+ .exists(new Path(currentMetadataLocation)))
+ .isFalse();
+ assertThat(catalog.viewExists(identifier)).isFalse();
+ }
+
+ private Table createHiveView(String hiveViewName, String dbName, String location) {
+ Map parameters = Maps.newHashMap();
+ parameters.put(
+ serdeConstants.SERIALIZATION_CLASS, "org.apache.hadoop.hive.serde2.thrift.test.IntString");
+ parameters.put(
+ serdeConstants.SERIALIZATION_FORMAT, "org.apache.thrift.protocol.TBinaryProtocol");
+
+ SerDeInfo serDeInfo =
+ new SerDeInfo(null, "org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer", parameters);
+
+ // StorageDescriptor has an empty list of fields - SerDe will report them.
+ StorageDescriptor sd =
+ new StorageDescriptor(
+ Lists.newArrayList(),
+ location,
+ "org.apache.hadoop.mapred.TextInputFormat",
+ "org.apache.hadoop.mapred.TextOutputFormat",
+ false,
+ -1,
+ serDeInfo,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ Maps.newHashMap());
+
+ Table hiveTable =
+ new Table(
+ hiveViewName,
+ dbName,
+ "test_owner",
+ 0,
+ 0,
+ 0,
+ sd,
+ Lists.newArrayList(),
+ Maps.newHashMap(),
+ "viewOriginalText",
+ "viewExpandedText",
+ TableType.VIRTUAL_VIEW.name());
+ return hiveTable;
+ }
+}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java
new file mode 100644
index 000000000000..47abb51602fa
--- /dev/null
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveViewCommits.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.view.BaseView;
+import org.apache.iceberg.view.View;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/** Test Hive locks and Hive errors and retry during commits. */
+public class TestHiveViewCommits {
+
+ private static final String VIEW_NAME = "test_iceberg_view";
+ private static final String DB_NAME = "hivedb";
+ private static final Namespace NS = Namespace.of(DB_NAME);
+ private static final Schema SCHEMA =
+ new Schema(
+ 5,
+ required(3, "id", Types.IntegerType.get(), "unique ID"),
+ required(4, "data", Types.StringType.get()));
+ private static final TableIdentifier VIEW_IDENTIFIER = TableIdentifier.of(NS, VIEW_NAME);
+
+ @RegisterExtension
+ protected static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION =
+ HiveMetastoreExtension.builder().withDatabase(DB_NAME).build();
+
+ private View view;
+ private Path viewLocation;
+
+ private static HiveCatalog catalog;
+
+ @BeforeAll
+ public static void initCatalog() {
+ catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
+ ImmutableMap.of(
+ CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+ String.valueOf(TimeUnit.SECONDS.toMillis(10))),
+ HIVE_METASTORE_EXTENSION.hiveConf());
+ }
+
+ @BeforeEach
+ public void createTestView() {
+ view =
+ catalog
+ .buildView(VIEW_IDENTIFIER)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(NS)
+ .withQuery("hive", "select * from ns.tbl")
+ .create();
+ viewLocation = new Path(view.location());
+ }
+
+ @AfterEach
+ public void dropTestView() throws IOException {
+ viewLocation.getFileSystem(HIVE_METASTORE_EXTENSION.hiveConf()).delete(viewLocation, true);
+ catalog.dropView(VIEW_IDENTIFIER);
+ }
+
+ @Test
+ public void testSuppressUnlockExceptions() {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ AtomicReference lockRef = new AtomicReference<>();
+
+ when(spyOps.lockObject())
+ .thenAnswer(
+ i -> {
+ HiveLock lock = (HiveLock) i.callRealMethod();
+ lockRef.set(lock);
+ return lock;
+ });
+
+ try {
+ spyOps.commit(metadataV2, metadataV1);
+ HiveLock spyLock = spy(lockRef.get());
+ doThrow(new RuntimeException()).when(spyLock).unlock();
+ } finally {
+ lockRef.get().unlock();
+ }
+
+ ops.refresh();
+
+ // the commit must succeed
+ assertThat(ops.current().properties()).hasSize(0).isEqualTo(metadataV1.properties());
+ }
+
+ /**
+ * Pretends we throw an error while persisting, and not found with check state, commit state
+ * should be treated as unknown, because in reality the persisting may still succeed, just not yet
+ * by the time of checking.
+ */
+ @Test
+ public void testThriftExceptionUnknownStateIfNotInHistoryFailureOnCommit()
+ throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ failCommitAndThrowException(spyOps);
+
+ assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageStartingWith("Datacenter on fire");
+
+ ops.refresh();
+
+ assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2);
+ assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue();
+ assertThat(metadataFileCount(metadataV2))
+ .as(
+ "New metadata files should still exist, new location not in history but"
+ + " the commit may still succeed")
+ .isEqualTo(2);
+ }
+
+ /** Pretends we throw an error while persisting that actually does commit serverside. */
+ @Test
+ public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ // Simulate a communication error after a successful commit
+ commitAndThrowException(ops, spyOps);
+ spyOps.commit(metadataV2, metadataV1);
+
+ assertThat(ops.current()).as("Current metadata should have not changed").isEqualTo(metadataV2);
+ assertThat(metadataFileExists(metadataV2))
+ .as("Current metadata file should still exist")
+ .isTrue();
+ assertThat(metadataFileCount(metadataV2))
+ .as("Commit should have been successful and new metadata file should be made")
+ .isEqualTo(2);
+ }
+
+ /**
+ * Pretends we throw an exception while persisting and don't know what happened, can't check to
+ * find out, but in reality the commit failed
+ */
+ @Test
+ public void testThriftExceptionUnknownFailedCommit() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ failCommitAndThrowException(spyOps);
+ breakFallbackCatalogCommitCheck(spyOps);
+
+ assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageStartingWith("Datacenter on fire");
+
+ ops.refresh();
+
+ assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2);
+ assertThat(metadataFileExists(metadataV2))
+ .as("Current metadata file should still exist")
+ .isTrue();
+ assertThat(metadataFileCount(metadataV2))
+ .as("Client could not determine outcome so new metadata file should also exist")
+ .isEqualTo(2);
+ }
+
+ /**
+ * Pretends we throw an exception while persisting and don't know what happened, can't check to
+ * find out, but in reality the commit succeeded
+ */
+ @Test
+ public void testThriftExceptionsUnknownSuccessCommit() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+
+ ViewMetadata metadataV2 = ops.current();
+
+ assertThat(metadataV2.properties()).hasSize(1);
+
+ HiveViewOperations spyOps = spy(ops);
+
+ commitAndThrowException(ops, spyOps);
+ breakFallbackCatalogCommitCheck(spyOps);
+
+ assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageStartingWith("Datacenter on fire");
+
+ ops.refresh();
+ ViewMetadata metadataV3 = ops.current();
+
+ assertThat(metadataV3).as("Current metadata should have changed").isNotEqualTo(metadataV2);
+ assertThat(metadataFileExists(metadataV3))
+ .as("Current metadata file should still exist")
+ .isTrue();
+ assertThat(metadataFileCount(metadataV3))
+ .as("Commit should have been successful with updated properties at metadataV2")
+ .isEqualTo(2);
+ }
+
+ /**
+ * Pretends we threw an exception while persisting, the commit succeeded, the lock expired, and a
+ * second committer placed a commit on top of ours before the first committer was able to check if
+ * their commit succeeded or not
+ *
+ * Timeline:
+ *
+ *
+ * - Client 1 commits which throws an exception but succeeded
+ *
- Client 1's lock expires while waiting to do the recheck for commit success
+ *
- Client 2 acquires a lock, commits successfully on top of client 1's commit and release
+ * lock
+ *
- Client 1 check's to see if their commit was successful
+ *
+ *
+ * This tests to make sure a disconnected client 1 doesn't think their commit failed just
+ * because it isn't the current one during the recheck phase.
+ */
+ @Test
+ public void testThriftExceptionConcurrentCommit() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k0", "v0").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k0", "v0");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ AtomicReference lock = new AtomicReference<>();
+ doAnswer(
+ l -> {
+ lock.set(ops.lockObject());
+ return lock.get();
+ })
+ .when(spyOps)
+ .lockObject();
+
+ concurrentCommitAndThrowException(ops, spyOps, (BaseView) view, lock);
+
+ // This commit should fail and concurrent commit should succeed even though this commit
+ // throws an exception after the persist operation succeeds
+ assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageContaining("Datacenter on fire");
+
+ ops.refresh();
+ ViewMetadata metadataV3 = ops.current();
+
+ assertThat(metadataV3).as("Current metadata should have changed").isNotEqualTo(metadataV2);
+ assertThat(metadataFileExists(metadataV3))
+ .as("Current metadata file should still exist")
+ .isTrue();
+ assertThat(metadataV3.properties())
+ .as("The new properties from the concurrent commit should have been successful")
+ .hasSize(2);
+ }
+
+ @Test
+ public void testInvalidObjectException() {
+ TableIdentifier badTi = TableIdentifier.of(DB_NAME, "`test_iceberg_view`");
+ assertThatThrownBy(
+ () ->
+ catalog
+ .buildView(badTi)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(NS)
+ .withQuery("hive", "select * from ns.tbl")
+ .create())
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Invalid Hive object for " + DB_NAME + "." + "`test_iceberg_view`");
+ }
+
+ /** Uses NoLock and pretends we throw an error because of a concurrent commit */
+ @Test
+ public void testNoLockThriftExceptionConcurrentCommit() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ // Sets NoLock
+ doReturn(new NoLock()).when(spyOps).lockObject();
+
+ // Simulate a concurrent view modification error
+ doThrow(
+ new RuntimeException(
+ "MetaException(message:The table has been modified. The parameter value for key 'metadata_location' is"))
+ .when(spyOps)
+ .persistTable(any(), anyBoolean(), any());
+
+ // Should throw a CommitFailedException so the commit could be retried
+ assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("The view hivedb.test_iceberg_view has been modified concurrently");
+
+ ops.refresh();
+
+ assertThat(ops.current()).as("Current metadata should not have changed").isEqualTo(metadataV2);
+ assertThat(metadataFileExists(metadataV2)).as("Current metadata should still exist").isTrue();
+ assertThat(metadataFileCount(metadataV2))
+ .as("New metadata files should not exist")
+ .isEqualTo(1);
+ }
+
+ @Test
+ public void testLockExceptionUnknownSuccessCommit() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+ ops.refresh();
+ ViewMetadata metadataV2 = ops.current();
+ assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");
+
+ HiveViewOperations spyOps = spy(ops);
+
+ // Simulate a communication error after a successful commit
+ doAnswer(
+ i -> {
+ org.apache.hadoop.hive.metastore.api.Table tbl =
+ i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
+ String location = i.getArgument(2, String.class);
+ ops.persistTable(tbl, true, location);
+ throw new LockException("Datacenter on fire");
+ })
+ .when(spyOps)
+ .persistTable(any(), anyBoolean(), any());
+
+ assertThatThrownBy(() -> spyOps.commit(metadataV2, metadataV1))
+ .hasMessageContaining("Failed to heartbeat for hive lock while")
+ .isInstanceOf(CommitStateUnknownException.class);
+
+ ops.refresh();
+
+ assertThat(metadataV2.location())
+ .as("Current metadata should have changed to metadata V1")
+ .isEqualTo(metadataV1.location());
+ assertThat(metadataFileExists(metadataV2))
+ .as("Current metadata file should still exist")
+ .isTrue();
+ assertThat(metadataFileCount(metadataV2)).as("New metadata file should exist").isEqualTo(2);
+ }
+
+ @Test
+ public void testCommitExceptionWithoutMessage() throws TException, InterruptedException {
+ HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
+ ViewMetadata metadataV1 = ops.current();
+ assertThat(metadataV1.properties()).hasSize(0);
+
+ view.updateProperties().set("k1", "v1").commit();
+
+ ops.refresh();
+
+ HiveViewOperations spyOps = spy(ops);
+ doThrow(new RuntimeException()).when(spyOps).persistTable(any(), anyBoolean(), any());
+
+ assertThatThrownBy(() -> spyOps.commit(ops.current(), metadataV1))
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageStartingWith("null\nCannot determine whether the commit was successful or not");
+ }
+
+ private void commitAndThrowException(
+ HiveViewOperations realOperations, HiveViewOperations spyOperations)
+ throws TException, InterruptedException {
+ // Simulate a communication error after a successful commit
+ doAnswer(
+ i -> {
+ org.apache.hadoop.hive.metastore.api.Table tbl =
+ i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
+ String location = i.getArgument(2, String.class);
+ realOperations.persistTable(tbl, true, location);
+ throw new TException("Datacenter on fire");
+ })
+ .when(spyOperations)
+ .persistTable(any(), anyBoolean(), any());
+ }
+
+ private void concurrentCommitAndThrowException(
+ HiveViewOperations realOperations,
+ HiveViewOperations spyOperations,
+ BaseView baseView,
+ AtomicReference lock)
+ throws TException, InterruptedException {
+ // Simulate a communication error after a successful commit
+ doAnswer(
+ i -> {
+ org.apache.hadoop.hive.metastore.api.Table tbl =
+ i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
+ String location = i.getArgument(2, String.class);
+ realOperations.persistTable(tbl, true, location);
+ // Simulate lock expiration or removal
+ lock.get().unlock();
+ baseView.operations().refresh();
+ baseView.updateProperties().set("k1", "v1").set("k2", "v2").commit();
+ throw new TException("Datacenter on fire");
+ })
+ .when(spyOperations)
+ .persistTable(any(), anyBoolean(), any());
+ }
+
+ private void failCommitAndThrowException(HiveViewOperations spyOperations)
+ throws TException, InterruptedException {
+ doThrow(new TException("Datacenter on fire"))
+ .when(spyOperations)
+ .persistTable(any(), anyBoolean(), any());
+ }
+
+ private void breakFallbackCatalogCommitCheck(HiveViewOperations spyOperations) {
+ when(spyOperations.refresh())
+ .thenThrow(new RuntimeException("Still on fire")); // Failure on commit check
+ }
+
+ private boolean metadataFileExists(ViewMetadata metadata) {
+ return new File(metadata.metadataFileLocation().replace("file:", "")).exists();
+ }
+
+ private int metadataFileCount(ViewMetadata metadata) {
+ return new File(metadata.metadataFileLocation().replace("file:", ""))
+ .getParentFile()
+ .listFiles(file -> file.getName().endsWith("metadata.json"))
+ .length;
+ }
+}