diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index ca87d20885c0..d67911bbc4c3 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -40,10 +40,7 @@ import org.apache.flink.table.hive.LegacyHiveClasses; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -61,9 +58,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -72,7 +67,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -551,58 +545,8 @@ private Lock lock(Identifier identifier) { return Lock.fromCatalog(lock, identifier); } - private static final List[]> GET_PROXY_PARAMS = - Arrays.asList( - // for hive 2.x - new Class[] { - HiveConf.class, - HiveMetaHookLoader.class, - ConcurrentHashMap.class, - String.class, - Boolean.TYPE - }, - // for hive 3.x - new Class[] { - Configuration.class, - HiveMetaHookLoader.class, - ConcurrentHashMap.class, - String.class, - Boolean.TYPE - }); - static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) { - Method getProxy = null; - RuntimeException methodNotFound = - new RuntimeException( - "Failed to find desired getProxy method from RetryingMetaStoreClient"); - for (Class[] classes : GET_PROXY_PARAMS) { - try { - getProxy = RetryingMetaStoreClient.class.getMethod("getProxy", classes); - } catch (NoSuchMethodException e) { - methodNotFound.addSuppressed(e); - } - } - if (getProxy == null) { - throw methodNotFound; - } - - IMetaStoreClient client; - try { - client = - (IMetaStoreClient) - getProxy.invoke( - null, - hiveConf, - (HiveMetaHookLoader) (tbl -> null), - new ConcurrentHashMap<>(), - clientClassName, - true); - } catch (Exception e) { - throw new RuntimeException(e); - } - return isNullOrWhitespaceOnly(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)) - ? client - : HiveMetaStoreClient.newSynchronizedClient(client); + return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName); } public static HiveConf createHiveConf( diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/RetryingMetaStoreClientFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/RetryingMetaStoreClientFactory.java new file mode 100644 index 000000000000..abf3598e3cbf --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/RetryingMetaStoreClientFactory.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; + +/** Factory to create {@link RetryingMetaStoreClient}. */ +public class RetryingMetaStoreClientFactory { + + private static final Map[], HiveMetastoreProxySupplier> PROXY_SUPPLIERS = + ImmutableMap.[], HiveMetastoreProxySupplier>builder() + // for hive 1.x + .put( + new Class[] { + HiveConf.class, + HiveMetaHookLoader.class, + ConcurrentHashMap.class, + String.class + }, + (getProxyMethod, hiveConf, clientClassName) -> + (IMetaStoreClient) + getProxyMethod.invoke( + null, + hiveConf, + (HiveMetaHookLoader) (tbl -> null), + new ConcurrentHashMap<>(), + clientClassName)) + // for hive 2.x + .put( + new Class[] { + HiveConf.class, + HiveMetaHookLoader.class, + ConcurrentHashMap.class, + String.class, + Boolean.TYPE + }, + (getProxyMethod, hiveConf, clientClassName) -> + (IMetaStoreClient) + getProxyMethod.invoke( + null, + hiveConf, + (HiveMetaHookLoader) (tbl -> null), + new ConcurrentHashMap<>(), + clientClassName, + true)) + // for hive 3.x + .put( + new Class[] { + Configuration.class, + HiveMetaHookLoader.class, + ConcurrentHashMap.class, + String.class, + Boolean.TYPE + }, + (getProxyMethod, hiveConf, clientClassName) -> + (IMetaStoreClient) + getProxyMethod.invoke( + null, + hiveConf, + (HiveMetaHookLoader) (tbl -> null), + new ConcurrentHashMap<>(), + clientClassName, + true)) + .build(); + + // If clientClassName is HiveMetaStoreClient, + // we can revert to the simplest creation method, + // which allows us to use shaded Hive packages to avoid dependency conflicts, + // such as using apache-hive2.jar in Presto and Trino. + private static final Map[], HiveMetastoreProxySupplier> PROXY_SUPPLIERS_SHADED = + ImmutableMap.[], HiveMetastoreProxySupplier>builder() + .put( + new Class[] {HiveConf.class}, + (getProxyMethod, hiveConf, clientClassName) -> + (IMetaStoreClient) getProxyMethod.invoke(null, hiveConf)) + .put( + new Class[] {HiveConf.class, Boolean.TYPE}, + (getProxyMethod, hiveConf, clientClassName) -> + (IMetaStoreClient) getProxyMethod.invoke(null, hiveConf, true)) + .put( + new Class[] {Configuration.class, Boolean.TYPE}, + (getProxyMethod, hiveConf, clientClassName) -> + (IMetaStoreClient) getProxyMethod.invoke(null, hiveConf, true)) + .build(); + + public IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) { + Method getProxy = null; + HiveMetastoreProxySupplier supplier = null; + RuntimeException methodNotFound = + new RuntimeException( + "Failed to find desired getProxy method from RetryingMetaStoreClient"); + Map[], HiveMetastoreProxySupplier> suppliers = + new LinkedHashMap<>(PROXY_SUPPLIERS); + if (HiveMetaStoreClient.class.getName().equals(clientClassName)) { + suppliers.putAll(PROXY_SUPPLIERS_SHADED); + } + for (Entry[], HiveMetastoreProxySupplier> entry : suppliers.entrySet()) { + Class[] classes = entry.getKey(); + try { + getProxy = RetryingMetaStoreClient.class.getMethod("getProxy", classes); + supplier = entry.getValue(); + } catch (NoSuchMethodException e) { + methodNotFound.addSuppressed(e); + } + } + if (getProxy == null) { + throw methodNotFound; + } + + IMetaStoreClient client; + try { + client = supplier.get(getProxy, hiveConf, clientClassName); + } catch (Exception e) { + throw new RuntimeException(e); + } + return isNullOrWhitespaceOnly(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + ? client + : HiveMetaStoreClient.newSynchronizedClient(client); + } + + /** Function interface for creating hive metastore proxy. */ + public interface HiveMetastoreProxySupplier { + IMetaStoreClient get(Method getProxyMethod, Configuration conf, String clientClassName) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException; + } +}