Skip to content

Commit

Permalink
Refactor HiveTableOperations with common code for View.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Nov 9, 2023
1 parent 1fb8e4f commit b26cc38
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {

try {
Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
HiveTableOperations.validateTableIsIceberg(table, fullTableName(name, from));
HiveMetastoreConnector.validateTableIsIceberg(table, fullTableName(name, from));

table.setDbName(toDatabase);
table.setTableName(to.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hive;

import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** All the HMS operations like table,view,materialized_view should implement this. */
interface HiveMetastoreConnector {

Logger LOG = LoggerFactory.getLogger(HiveMetastoreConnector.class);
// The max size is based on HMS backend database. For Hive versions below 2.3, the max table
// parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
// set to 0 to not expose Iceberg metadata in HMS Table properties.
String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
long HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT = 32672;

TableType tableType();

boolean exposeInHmsProperties();

ClientPool<IMetaStoreClient, TException> getMetaClients();

long getMaxHiveTablePropertySize();

String database();

String table();

Map<String, String> hmsEnvContext(String metadataLocation);

default void setSchema(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SCHEMA);
if (exposeInHmsProperties() && metadata.schema() != null) {
String schema = SchemaParser.toJson(metadata.schema());
setField(parameters, TableProperties.CURRENT_SCHEMA, schema);
}
}

default void setField(Map<String, String> parameters, String key, String value) {
if (value.length() <= getMaxHiveTablePropertySize()) {
parameters.put(key, value);
} else {
LOG.warn(
"Not exposing {} in HMS since it exceeds {} characters",
key,
getMaxHiveTablePropertySize());
}
}

static void validateTableIsIceberg(Table table, String fullName) {
String tableType = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
tableType != null
&& tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE),
"Not an iceberg table: %s (type=%s)",
fullName,
tableType);
}

default void persistTable(Table hmsTable, boolean updateHiveTable, String metadataLocation)
throws TException, InterruptedException {
if (updateHiveTable) {
getMetaClients()
.run(
client -> {
MetastoreUtil.alterTable(
client, database(), table(), hmsTable, hmsEnvContext(metadataLocation));
return null;
});
} else {
getMetaClients()
.run(
client -> {
client.createTable(hmsTable);
return null;
});
}
}

default StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema()));
storageDescriptor.setLocation(metadata.location());
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setParameters(Maps.newHashMap());
if (hiveEngineEnabled) {
storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat");
storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe");
} else {
storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
}
storageDescriptor.setSerdeInfo(serDeInfo);
return storageDescriptor;
}

default void cleanupMetadata(FileIO fileIO, String commitStatus, String metadataLocation) {
try {
if (commitStatus.equalsIgnoreCase("FAILURE")) {
// If we are sure the commit failed, clean up the uncommitted metadata file
fileIO.deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
}
}

default Table newHmsTable(String hmsTableOwner) {
Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be null");
final long currentTimeMillis = System.currentTimeMillis();

Table newTable =
new Table(
table(),
database(),
hmsTableOwner,
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
null,
Collections.emptyList(),
Maps.newHashMap(),
null,
null,
tableType().name());

if (tableType().equals(TableType.EXTERNAL_TABLE)) {
newTable
.getParameters()
.put("EXTERNAL", "TRUE"); // using the external table type also requires this
}

return newTable;
}
}
Loading

0 comments on commit b26cc38

Please sign in to comment.