diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 22f5b0b5cf37..7542fd58e19f 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -235,7 +235,7 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) { try { Table table = clients.run(client -> client.getTable(fromDatabase, fromName)); - HiveTableOperations.validateTableIsIceberg(table, fullTableName(name, from)); + HiveMetastoreConnector.validateTableIsIceberg(table, fullTableName(name, from)); table.setDbName(toDatabase); table.setTableName(to.name()); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetastoreConnector.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetastoreConnector.java new file mode 100644 index 000000000000..be142a6e78de --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetastoreConnector.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.Collections; +import java.util.Map; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** All the HMS operations like table,view,materialized_view should implement this. */ +interface HiveMetastoreConnector { + + Logger LOG = LoggerFactory.getLogger(HiveMetastoreConnector.class); + // The max size is based on HMS backend database. For Hive versions below 2.3, the max table + // parameter size is 4000 + // characters, see https://issues.apache.org/jira/browse/HIVE-12274 + // set to 0 to not expose Iceberg metadata in HMS Table properties. + String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size"; + long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; + + TableType tableType(); + + boolean exposeInHmsProperties(); + + default void setSchema(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.CURRENT_SCHEMA); + if (exposeInHmsProperties() && metadata.schema() != null) { + String schema = SchemaParser.toJson(metadata.schema()); + setField(parameters, TableProperties.CURRENT_SCHEMA, schema); + } + } + + default void setField(Map parameters, String key, String value) { + if (value.length() <= getMaxHiveTablePropertySize()) { + parameters.put(key, value); + } else { + LOG.warn( + "Not exposing {} in HMS since it exceeds {} characters", + key, + getMaxHiveTablePropertySize()); + } + } + + static void validateTableIsIceberg(Table table, String fullName) { + String tableType = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); + NoSuchIcebergTableException.check( + tableType != null + && tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE), + "Not an iceberg table: %s (type=%s)", + fullName, + tableType); + } + + default void persistTable(Table hmsTable, boolean updateHiveTable, Map extraEnv) + throws TException, InterruptedException { + if (updateHiveTable) { + getMetaClients() + .run( + client -> { + MetastoreUtil.alterTable(client, database(), table(), hmsTable, extraEnv); + return null; + }); + } else { + getMetaClients() + .run( + client -> { + client.createTable(hmsTable); + return null; + }); + } + } + + default StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { + + final StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema())); + storageDescriptor.setLocation(metadata.location()); + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setParameters(Maps.newHashMap()); + if (hiveEngineEnabled) { + storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat"); + storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"); + serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe"); + } else { + storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat"); + storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); + serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + } + storageDescriptor.setSerdeInfo(serDeInfo); + return storageDescriptor; + } + + default void cleanupMetadata(FileIO fileIO, String commitStatus, String metadataLocation) { + try { + if (commitStatus.equalsIgnoreCase("FAILURE")) { + // If we are sure the commit failed, clean up the uncommitted metadata file + fileIO.deleteFile(metadataLocation); + } + } catch (RuntimeException e) { + LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e); + } + } + + default Table newHmsTable(String hmsTableOwner) { + Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be null"); + final long currentTimeMillis = System.currentTimeMillis(); + + Table newTable = + new Table( + table(), + database(), + hmsTableOwner, + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + null, + Collections.emptyList(), + Maps.newHashMap(), + null, + null, + tableType().name()); + + if (tableType().equals(TableType.EXTERNAL_TABLE)) { + newTable + .getParameters() + .put("EXTERNAL", "TRUE"); // using the external table type also requires this + } + + return newTable; + } + + ClientPool getMetaClients(); + + long getMaxHiveTablePropertySize(); + + String database(); + + String table(); +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index f4b96822d42c..0afd21a0b8d5 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -34,14 +34,11 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.ClientPool; import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SortOrderParser; @@ -50,13 +47,11 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; -import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.BiMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -70,19 +65,15 @@ * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to * avoid code duplication between this class and Metacat Tables. */ -public class HiveTableOperations extends BaseMetastoreTableOperations { +public class HiveTableOperations extends BaseMetastoreTableOperations + implements HiveMetastoreConnector { private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries"; - // the max size is based on HMS backend database. For Hive versions below 2.3, the max table - // parameter size is 4000 - // characters, see https://issues.apache.org/jira/browse/HIVE-12274 - // set to 0 to not expose Iceberg metadata in HMS Table properties. - private static final String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size"; - private static final String NO_LOCK_EXPECTED_KEY = "expected_parameter_key"; - private static final String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value"; - private static final long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672; + + @VisibleForTesting static final String NO_LOCK_EXPECTED_KEY = "expected_parameter_key"; + @VisibleForTesting static final String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value"; private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2; private static final BiMap ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of( @@ -156,7 +147,7 @@ protected void doRefresh() { String metadataLocation = null; try { Table table = metaClients.run(client -> client.getTable(database, tableName)); - validateTableIsIceberg(table, fullName); + HiveMetastoreConnector.validateTableIsIceberg(table, fullName); metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); @@ -207,7 +198,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { updateHiveTable = true; LOG.debug("Committing existing table: {}", fullName); } else { - tbl = newHmsTable(metadata); + tbl = + newHmsTable( + metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); LOG.debug("Committing new table: {}", fullName); } @@ -245,7 +238,15 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { try { persistTable( - tbl, updateHiveTable, hiveLockEnabled(metadata, conf) ? null : baseMetadataLocation); + tbl, + updateHiveTable, + hiveLockEnabled(metadata, conf) || baseMetadataLocation == null + ? ImmutableMap.of() + : ImmutableMap.of( + NO_LOCK_EXPECTED_KEY, + METADATA_LOCATION_PROP, + NO_LOCK_EXPECTED_VALUE, + baseMetadataLocation)); lock.ensureActive(); commitStatus = CommitStatus.SUCCESS; @@ -319,35 +320,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); } - @VisibleForTesting - void persistTable(Table hmsTable, boolean updateHiveTable, String expectedMetadataLocation) - throws TException, InterruptedException { - if (updateHiveTable) { - metaClients.run( - client -> { - MetastoreUtil.alterTable( - client, - database, - tableName, - hmsTable, - expectedMetadataLocation != null - ? ImmutableMap.of( - NO_LOCK_EXPECTED_KEY, - METADATA_LOCATION_PROP, - NO_LOCK_EXPECTED_VALUE, - expectedMetadataLocation) - : ImmutableMap.of()); - return null; - }); - } else { - metaClients.run( - client -> { - client.createTable(hmsTable); - return null; - }); - } - } - @VisibleForTesting Table loadHmsTable() throws TException, InterruptedException { try { @@ -358,31 +330,6 @@ Table loadHmsTable() throws TException, InterruptedException { } } - private Table newHmsTable(TableMetadata metadata) { - Preconditions.checkNotNull(metadata, "'metadata' parameter can't be null"); - final long currentTimeMillis = System.currentTimeMillis(); - - Table newTable = - new Table( - tableName, - database, - metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()), - (int) currentTimeMillis / 1000, - (int) currentTimeMillis / 1000, - Integer.MAX_VALUE, - null, - Collections.emptyList(), - Maps.newHashMap(), - null, - null, - TableType.EXTERNAL_TABLE.toString()); - - newTable - .getParameters() - .put("EXTERNAL", "TRUE"); // using the external table type also requires this - return newTable; - } - private void setHmsTableParameters( String newMetadataLocation, Table tbl, @@ -484,15 +431,6 @@ void setSnapshotSummary(Map parameters, Snapshot currentSnapshot } } - @VisibleForTesting - void setSchema(TableMetadata metadata, Map parameters) { - parameters.remove(TableProperties.CURRENT_SCHEMA); - if (exposeInHmsProperties() && metadata.schema() != null) { - String schema = SchemaParser.toJson(metadata.schema()); - setField(parameters, TableProperties.CURRENT_SCHEMA, schema); - } - } - @VisibleForTesting void setPartitionSpec(TableMetadata metadata, Map parameters) { parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC); @@ -513,62 +451,45 @@ void setSortOrder(TableMetadata metadata, Map parameters) { } } - private void setField(Map parameters, String key, String value) { - if (value.length() <= maxHiveTablePropertySize) { - parameters.put(key, value); - } else { - LOG.warn( - "Not exposing {} in HMS since it exceeds {} characters", key, maxHiveTablePropertySize); - } + @Override + public long getMaxHiveTablePropertySize() { + return maxHiveTablePropertySize; } - private boolean exposeInHmsProperties() { - return maxHiveTablePropertySize > 0; + @Override + public String database() { + return database; } - private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { + @Override + public String table() { + return tableName; + } - final StorageDescriptor storageDescriptor = new StorageDescriptor(); - storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema())); - storageDescriptor.setLocation(metadata.location()); - SerDeInfo serDeInfo = new SerDeInfo(); - serDeInfo.setParameters(Maps.newHashMap()); - if (hiveEngineEnabled) { - storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat"); - storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"); - serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe"); - } else { - storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat"); - storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); - serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); - } - storageDescriptor.setSerdeInfo(serDeInfo); - return storageDescriptor; + @Override + public TableType tableType() { + return TableType.EXTERNAL_TABLE; + } + + @Override + public boolean exposeInHmsProperties() { + return maxHiveTablePropertySize > 0; + } + + @Override + public ClientPool getMetaClients() { + return metaClients; } private void cleanupMetadataAndUnlock( CommitStatus commitStatus, String metadataLocation, HiveLock lock) { try { - if (commitStatus == CommitStatus.FAILURE) { - // If we are sure the commit failed, clean up the uncommitted metadata file - io().deleteFile(metadataLocation); - } - } catch (RuntimeException e) { - LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e); + cleanupMetadata(io(), commitStatus.name(), metadataLocation); } finally { lock.unlock(); } } - static void validateTableIsIceberg(Table table, String fullName) { - String tableType = table.getParameters().get(TABLE_TYPE_PROP); - NoSuchIcebergTableException.check( - tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), - "Not an iceberg table: %s (type=%s)", - fullName, - tableType); - } - /** * Returns if the hive engine related values should be enabled on the table, or not. * diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java index aaa659042118..7646649afae5 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.hive; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.hive.HiveTableOperations.NO_LOCK_EXPECTED_KEY; +import static org.apache.iceberg.hive.HiveTableOperations.NO_LOCK_EXPECTED_VALUE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.any; @@ -39,6 +42,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.apache.thrift.TException; import org.junit.jupiter.api.Test; @@ -377,7 +381,16 @@ public void testLockExceptionUnknownSuccessCommit() throws TException, Interrupt org.apache.hadoop.hive.metastore.api.Table tbl = i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class); String location = i.getArgument(2, String.class); - ops.persistTable(tbl, true, location); + ops.persistTable( + tbl, + true, + location == null + ? ImmutableMap.of() + : ImmutableMap.of( + NO_LOCK_EXPECTED_KEY, + METADATA_LOCATION_PROP, + NO_LOCK_EXPECTED_VALUE, + location)); throw new LockException("Datacenter on fire"); }) .when(spyOps) @@ -406,7 +419,16 @@ private void commitAndThrowException( org.apache.hadoop.hive.metastore.api.Table tbl = i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class); String location = i.getArgument(2, String.class); - realOperations.persistTable(tbl, true, location); + realOperations.persistTable( + tbl, + true, + location == null + ? ImmutableMap.of() + : ImmutableMap.of( + NO_LOCK_EXPECTED_KEY, + METADATA_LOCATION_PROP, + NO_LOCK_EXPECTED_VALUE, + location)); throw new TException("Datacenter on fire"); }) .when(spyOperations) @@ -425,7 +447,16 @@ private void concurrentCommitAndThrowException( org.apache.hadoop.hive.metastore.api.Table tbl = i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class); String location = i.getArgument(2, String.class); - realOperations.persistTable(tbl, true, location); + realOperations.persistTable( + tbl, + true, + location == null + ? ImmutableMap.of() + : ImmutableMap.of( + NO_LOCK_EXPECTED_KEY, + METADATA_LOCATION_PROP, + NO_LOCK_EXPECTED_VALUE, + location)); // Simulate lock expiration or removal lock.get().unlock(); table.refresh();