Skip to content

Commit

Permalink
[Hive] HiveCatalog support hive 1.x (apache#2354)
Browse files Browse the repository at this point in the history
  • Loading branch information
humengyu2012 authored Nov 21, 2023
1 parent 116789c commit 3a35b74
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
import org.apache.flink.table.hive.LegacyHiveClasses;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
Expand All @@ -61,9 +58,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -72,7 +67,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -551,58 +545,8 @@ private Lock lock(Identifier identifier) {
return Lock.fromCatalog(lock, identifier);
}

private static final List<Class<?>[]> GET_PROXY_PARAMS =
Arrays.asList(
// for hive 2.x
new Class<?>[] {
HiveConf.class,
HiveMetaHookLoader.class,
ConcurrentHashMap.class,
String.class,
Boolean.TYPE
},
// for hive 3.x
new Class<?>[] {
Configuration.class,
HiveMetaHookLoader.class,
ConcurrentHashMap.class,
String.class,
Boolean.TYPE
});

static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) {
Method getProxy = null;
RuntimeException methodNotFound =
new RuntimeException(
"Failed to find desired getProxy method from RetryingMetaStoreClient");
for (Class<?>[] classes : GET_PROXY_PARAMS) {
try {
getProxy = RetryingMetaStoreClient.class.getMethod("getProxy", classes);
} catch (NoSuchMethodException e) {
methodNotFound.addSuppressed(e);
}
}
if (getProxy == null) {
throw methodNotFound;
}

IMetaStoreClient client;
try {
client =
(IMetaStoreClient)
getProxy.invoke(
null,
hiveConf,
(HiveMetaHookLoader) (tbl -> null),
new ConcurrentHashMap<>(),
clientClassName,
true);
} catch (Exception e) {
throw new RuntimeException(e);
}
return isNullOrWhitespaceOnly(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
? client
: HiveMetaStoreClient.newSynchronizedClient(client);
return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
}

public static HiveConf createHiveConf(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.hive;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;

/** Factory to create {@link RetryingMetaStoreClient}. */
public class RetryingMetaStoreClientFactory {

private static final Map<Class<?>[], HiveMetastoreProxySupplier> PROXY_SUPPLIERS =
ImmutableMap.<Class<?>[], HiveMetastoreProxySupplier>builder()
// for hive 1.x
.put(
new Class<?>[] {
HiveConf.class,
HiveMetaHookLoader.class,
ConcurrentHashMap.class,
String.class
},
(getProxyMethod, hiveConf, clientClassName) ->
(IMetaStoreClient)
getProxyMethod.invoke(
null,
hiveConf,
(HiveMetaHookLoader) (tbl -> null),
new ConcurrentHashMap<>(),
clientClassName))
// for hive 2.x
.put(
new Class<?>[] {
HiveConf.class,
HiveMetaHookLoader.class,
ConcurrentHashMap.class,
String.class,
Boolean.TYPE
},
(getProxyMethod, hiveConf, clientClassName) ->
(IMetaStoreClient)
getProxyMethod.invoke(
null,
hiveConf,
(HiveMetaHookLoader) (tbl -> null),
new ConcurrentHashMap<>(),
clientClassName,
true))
// for hive 3.x
.put(
new Class<?>[] {
Configuration.class,
HiveMetaHookLoader.class,
ConcurrentHashMap.class,
String.class,
Boolean.TYPE
},
(getProxyMethod, hiveConf, clientClassName) ->
(IMetaStoreClient)
getProxyMethod.invoke(
null,
hiveConf,
(HiveMetaHookLoader) (tbl -> null),
new ConcurrentHashMap<>(),
clientClassName,
true))
.build();

// If clientClassName is HiveMetaStoreClient,
// we can revert to the simplest creation method,
// which allows us to use shaded Hive packages to avoid dependency conflicts,
// such as using apache-hive2.jar in Presto and Trino.
private static final Map<Class<?>[], HiveMetastoreProxySupplier> PROXY_SUPPLIERS_SHADED =
ImmutableMap.<Class<?>[], HiveMetastoreProxySupplier>builder()
.put(
new Class<?>[] {HiveConf.class},
(getProxyMethod, hiveConf, clientClassName) ->
(IMetaStoreClient) getProxyMethod.invoke(null, hiveConf))
.put(
new Class<?>[] {HiveConf.class, Boolean.TYPE},
(getProxyMethod, hiveConf, clientClassName) ->
(IMetaStoreClient) getProxyMethod.invoke(null, hiveConf, true))
.put(
new Class<?>[] {Configuration.class, Boolean.TYPE},
(getProxyMethod, hiveConf, clientClassName) ->
(IMetaStoreClient) getProxyMethod.invoke(null, hiveConf, true))
.build();

public IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) {
Method getProxy = null;
HiveMetastoreProxySupplier supplier = null;
RuntimeException methodNotFound =
new RuntimeException(
"Failed to find desired getProxy method from RetryingMetaStoreClient");
Map<Class<?>[], HiveMetastoreProxySupplier> suppliers =
new LinkedHashMap<>(PROXY_SUPPLIERS);
if (HiveMetaStoreClient.class.getName().equals(clientClassName)) {
suppliers.putAll(PROXY_SUPPLIERS_SHADED);
}
for (Entry<Class<?>[], HiveMetastoreProxySupplier> entry : suppliers.entrySet()) {
Class<?>[] classes = entry.getKey();
try {
getProxy = RetryingMetaStoreClient.class.getMethod("getProxy", classes);
supplier = entry.getValue();
} catch (NoSuchMethodException e) {
methodNotFound.addSuppressed(e);
}
}
if (getProxy == null) {
throw methodNotFound;
}

IMetaStoreClient client;
try {
client = supplier.get(getProxy, hiveConf, clientClassName);
} catch (Exception e) {
throw new RuntimeException(e);
}
return isNullOrWhitespaceOnly(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
? client
: HiveMetaStoreClient.newSynchronizedClient(client);
}

/** Function interface for creating hive metastore proxy. */
public interface HiveMetastoreProxySupplier {
IMetaStoreClient get(Method getProxyMethod, Configuration conf, String clientClassName)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException;
}
}

0 comments on commit 3a35b74

Please sign in to comment.