diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java index 3b2cf1540b4e..d8c4201c9050 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/CachedClientPool.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -64,6 +65,7 @@ public class CachedClientPool implements ClientPool { private static final String CONF_ELEMENT_PREFIX = "conf:"; + private static final String CONF_OPTIONS_ALL = "*"; private static Cache clientPoolCache; @@ -77,7 +79,7 @@ public CachedClientPool(Configuration conf, Options options, String clientClassN this.conf = conf; this.clientPoolSize = options.get(CLIENT_POOL_SIZE); this.evictionInterval = options.get(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS); - this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf); + this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf, options); this.clientClassName = clientClassName; init(); } @@ -85,7 +87,27 @@ public CachedClientPool(Configuration conf, Options options, String clientClassN @VisibleForTesting HiveClientPool clientPool() { return clientPoolCache.get( - key, k -> new HiveClientPool(clientPoolSize, conf, clientClassName)); + key, + k -> + new HiveClientPool( + clientPoolSize, conf, clientClassName, this.getCurrentUser(k))); + } + + private UserGroupInformation getCurrentUser(Key key) { + try { + for (Object elem : key.elements) { + if (elem instanceof UserGroupInformationConf) { + return ((UserGroupInformationConf) elem).ugi; + } + if (elem instanceof UserNameConf) { + return UserGroupInformation.createProxyUser( + ((UserNameConf) elem).userName, UserGroupInformation.getCurrentUser()); + } + } + return UserGroupInformation.getCurrentUser(); + } catch (Exception e) { + return null; + } } private synchronized void init() { @@ -142,7 +164,7 @@ public void execute(ExecuteAction action, boolean } @VisibleForTesting - static Key extractKey(String cacheKeys, Configuration conf) { + static Key extractKey(String cacheKeys, Configuration conf, Options options) { // generate key elements in a certain order, so that the Key instances are comparable List elements = Lists.newArrayList(); elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "")); @@ -157,12 +179,15 @@ static Key extractKey(String cacheKeys, Configuration conf) { String trimmed = element.trim(); if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) { String key = trimmed.substring(CONF_ELEMENT_PREFIX.length()); - - Preconditions.checkArgument( - !confElements.containsKey(key), - "Conf key element %s already specified", - key); - confElements.put(key, conf.get(key)); + String value = CONF_OPTIONS_ALL; + if (!CONF_OPTIONS_ALL.equals(key)) { + Preconditions.checkArgument( + !confElements.containsKey(key), + "Conf key element %s already specified", + key); + value = confElements.get(key); + } + confElements.put(key, value); } else { KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase()); switch (type) { @@ -183,7 +208,8 @@ static Key extractKey(String cacheKeys, Configuration conf) { switch (type) { case UGI: try { - elements.add(UserGroupInformation.getCurrentUser()); + elements.add( + UserGroupInformationConf.of(UserGroupInformation.getCurrentUser())); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -200,7 +226,11 @@ static Key extractKey(String cacheKeys, Configuration conf) { } } for (String key : confElements.keySet()) { - elements.add(ConfElement.of(key, confElements.get(key))); + if (CONF_OPTIONS_ALL.equals(confElements.get(key))) { + elements.add(options); + } else { + elements.add(ConfElement.of(key, confElements.get(key))); + } } return Key.of(elements); } @@ -219,6 +249,38 @@ public List elements() { public static Key of(List elements) { return new Key(elements); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Key that = (Key) o; + if (this.elements.size() != that.elements.size()) { + return false; + } + for (int i = 0; i < elements.size(); i++) { + if (!Objects.equals(this.elements.get(i), that.elements.get(i))) { + return false; + } + } + return true; + } + + @Override + public int hashCode() { + int hashCode = 0; + synchronized (elements) { + for (Object p : elements) { + hashCode ^= p.hashCode(); + } + } + return hashCode; + } } static class ConfElement { @@ -239,11 +301,87 @@ public String value() { return value; } + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ConfElement other = (ConfElement) obj; + return Objects.equals(key, other.key) && Objects.equals(value, other.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + public static ConfElement of(String key, String value) { return new ConfElement(key, value); } } + static class UserGroupInformationConf { + + private final UserGroupInformation ugi; + + private UserGroupInformationConf(UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + UserGroupInformationConf other = (UserGroupInformationConf) obj; + return Objects.equals(other.ugi.getUserName(), ugi.getUserName()); + } + + @Override + public int hashCode() { + return Objects.hash(ugi.getUserName()); + } + + public static UserGroupInformationConf of(UserGroupInformation ugi) { + return new UserGroupInformationConf(ugi); + } + } + + static class UserNameConf { + private final String userName; + + private UserNameConf(String userName) { + this.userName = userName; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + UserNameConf other = (UserNameConf) obj; + return Objects.equals(other.userName, userName); + } + + @Override + public int hashCode() { + return Objects.hash(userName); + } + + public static UserNameConf of(String userName) { + return new UserNameConf(userName); + } + } + private enum KeyElementType { UGI, USER_NAME, diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java index 6dd086950771..cf5f67a75fd0 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/pool/HiveClientPool.java @@ -26,9 +26,14 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + /** * Pool of Hive Metastore clients. * @@ -38,26 +43,49 @@ public class HiveClientPool extends ClientPool.ClientPoolImpl) + () -> + new RetryingMetaStoreClientFactory() + .createClient(hiveConf, clientClassName)) + : new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName); } @Override protected IMetaStoreClient reconnect(IMetaStoreClient client) { try { - client.close(); - client.reconnect(); - } catch (MetaException e) { + if (this.ugi != null) { + this.ugi.doAs( + (PrivilegedExceptionAction) + () -> { + client.close(); + client.reconnect(); + return null; + }); + } else { + client.close(); + client.reconnect(); + } + } catch (MetaException | IOException | InterruptedException e) { throw new RuntimeException("Failed to reconnect to Hive Metastore", e); } return client; diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java new file mode 100644 index 000000000000..ab703cda213e --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.pool; + +import org.apache.paimon.options.Options; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; +import org.junit.Test; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CachedClientPool}. */ +public class TestCachedClientPool { + + @Test + public void testCacheKeyNotSame() { + // client1 use cache key type:paimon + Options options1 = new Options(); + options1.set("type", "paimon"); + options1.set("paimon.catalog.type", "hive"); + options1.set("hive.metastore.uris", "thrift://localhost:9083"); + options1.set("client-pool-cache.keys", "conf:type"); + CachedClientPool cache1 = + new CachedClientPool( + new Configuration(), options1, HiveMetaStoreClient.class.getName()); + + // client2 use cache key type:paimon + Options options2 = new Options(); + options2.set("type", "paimon"); + options2.set("paimon.catalog.type", "hive"); + options2.set("hive.metastore.uris", "thrift://localhost:9083"); + options2.set("client-pool-cache.keys", "conf:paimon.catalog.type"); + CachedClientPool cache2 = + new CachedClientPool( + new Configuration(), options2, HiveMetaStoreClient.class.getName()); + + // assert return the different object instance + assertThat(cache1.clientPool()).isNotEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeySame() { + Options options = new Options(); + options.set("type", "paimon"); + options.set("paimon.catalog.type", "hive"); + options.set("hive.metastore.uris", "thrift://localhost:9083"); + options.set("client-pool-cache.keys", "conf:type"); + + // client1 & client2 use the same cache key paimon.catalog.type:hive + CachedClientPool cache1 = + new CachedClientPool( + new Configuration(), options, HiveMetaStoreClient.class.getName()); + CachedClientPool cache2 = + new CachedClientPool( + new Configuration(), options, HiveMetaStoreClient.class.getName()); + + // assert return the same object instance + assertThat(cache1.clientPool()).isEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyUGINotSame() { + // user paimon1 login + Options options1 = new Options(); + options1.set("type", "paimon"); + options1.set("paimon.catalog.type", "hive"); + options1.set("hive.metastore.uris", "thrift://localhost:9083"); + options1.set("client-pool-cache.keys", "ugi,conf:type"); + + CachedClientPool cache1 = + UserGroupInformation.createRemoteUser("paimon1") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options1, + HiveMetaStoreClient.class.getName())); + + // user paimon2 login + Options options2 = new Options(); + options2.set("type", "paimon"); + options2.set("paimon.catalog.type", "hive"); + options2.set("hive.metastore.uris", "thrift://localhost:9083"); + options2.set("client-pool-cache.keys", "ugi,conf:type"); + + CachedClientPool cache2 = + UserGroupInformation.createRemoteUser("paimon2") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options2, + HiveMetaStoreClient.class.getName())); + + assertThat(cache1.clientPool()).isNotEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyUGISame() { + Options options = new Options(); + options.set("type", "paimon"); + options.set("paimon.catalog.type", "hive"); + options.set("hive.metastore.uris", "thrift://localhost:9083"); + options.set("client-pool-cache.keys", "ugi,conf:type"); + + // user paimon login + CachedClientPool cache1 = + UserGroupInformation.createRemoteUser("paimon") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options, + HiveMetaStoreClient.class.getName())); + + // user paimon login again + CachedClientPool cache2 = + UserGroupInformation.createRemoteUser("paimon") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options, + HiveMetaStoreClient.class.getName())); + + assertThat(cache1.clientPool()).isEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyOptionNotSame() { + // client1 use option1 instance + Options options1 = new Options(); + options1.set("type", "paimon"); + options1.set("paimon.catalog.type", "hive"); + options1.set("hive.metastore.uris", "thrift://localhost:9083"); + options1.set("client-pool-cache.keys", "conf:*"); + CachedClientPool cache1 = + new CachedClientPool( + new Configuration(), options1, HiveMetaStoreClient.class.getName()); + + // client1 use option2 instance + Options options2 = new Options(); + options2.set("type", "hive"); + options2.set("paimon.catalog.type", "hive"); + options2.set("hive.metastore.uris", "thrift://localhost:9083"); + options1.set("client-pool-cache.keys", "conf:*"); + CachedClientPool cache2 = + new CachedClientPool( + new Configuration(), options2, HiveMetaStoreClient.class.getName()); + + // assert return the different object instance + assertThat(cache1.clientPool()).isNotEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyOptionSame1() { + // need use the same option instance + Options options = new Options(); + options.set("type", "paimon"); + options.set("paimon.catalog.type", "hive"); + options.set("hive.metastore.uris", "thrift://localhost:9083"); + options.set("client-pool-cache.keys", "conf:*"); + + // client1 & client2 use the same option instance + CachedClientPool cache1 = + new CachedClientPool( + new Configuration(), options, HiveMetaStoreClient.class.getName()); + CachedClientPool cache2 = + new CachedClientPool( + new Configuration(), options, HiveMetaStoreClient.class.getName()); + + // assert return the same object instance + assertThat(cache1.clientPool()).isEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyOptionSame2() { + // client1 use option1 instance + Options options1 = new Options(); + options1.set("type", "paimon"); + options1.set("paimon.catalog.type", "hive"); + options1.set("hive.metastore.uris", "thrift://localhost:9083"); + options1.set("client-pool-cache.keys", "conf:*"); + CachedClientPool cache1 = + new CachedClientPool( + new Configuration(), options1, HiveMetaStoreClient.class.getName()); + + // client1 use option2 instance + Options options2 = new Options(); + options2.set("type", "paimon"); + options2.set("paimon.catalog.type", "hive"); + options2.set("hive.metastore.uris", "thrift://localhost:9083"); + options2.set("client-pool-cache.keys", "conf:*"); + CachedClientPool cache2 = + new CachedClientPool( + new Configuration(), options2, HiveMetaStoreClient.class.getName()); + + // assert return the same object instance + assertThat(cache1.clientPool()).isEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyOptionAllNotSame() { + // user paimon1 login + Options options1 = new Options(); + options1.set("type", "paimon"); + options1.set("paimon.catalog.type", "hive"); + options1.set("hive.metastore.uris", "thrift://localhost:9083"); + options1.set("client-pool-cache.keys", "ugi,conf:*"); + + CachedClientPool cache1 = + UserGroupInformation.createRemoteUser("paimon1") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options1, + HiveMetaStoreClient.class.getName())); + + // user paimon2 login + Options options2 = new Options(); + options2.set("type", "hive"); + options2.set("paimon.catalog.type", "hive"); + options2.set("hive.metastore.uris", "thrift://localhost:9083"); + options2.set("client-pool-cache.keys", "ugi,conf:*"); + + CachedClientPool cache2 = + UserGroupInformation.createRemoteUser("paimon2") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options2, + HiveMetaStoreClient.class.getName())); + + // assert return the different object instance + assertThat(cache1.clientPool()).isNotEqualTo(cache2.clientPool()); + } + + @Test + public void testCacheKeyOptionAllSame() { + Options options = new Options(); + options.set("type", "paimon"); + options.set("paimon.catalog.type", "hive"); + options.set("hive.metastore.uris", "thrift://localhost:9083"); + options.set("client-pool-cache.keys", "ugi,conf:type"); + + // user paimon login + CachedClientPool cache1 = + UserGroupInformation.createRemoteUser("paimon") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options, + HiveMetaStoreClient.class.getName())); + + // user paimon login again + CachedClientPool cache2 = + UserGroupInformation.createRemoteUser("paimon") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + new Configuration(), + options, + HiveMetaStoreClient.class.getName())); + + assertThat(cache1.clientPool()).isEqualTo(cache2.clientPool()); + } + + @Test + public void testLoginPaimon() throws TException { + // user paimon login + Options options = new Options(); + options.set("type", "paimon"); + options.set("paimon.catalog.type", "hive"); + options.set("hive.metastore.uris", "thrift://30.150.24.155:9083"); + options.set("client-pool-cache.keys", "ugi,conf:*"); + options.set( + "hive.metastore.filter.hook", MockAuthorizationMetaStoreFilterHook.class.getName()); + + Configuration config = new Configuration(); + config.set("current.user", "paimon"); + config.set( + "hive.metastore.filter.hook", MockAuthorizationMetaStoreFilterHook.class.getName()); + CachedClientPool cache = + UserGroupInformation.createRemoteUser("paimon") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + config, + options, + HiveMetaStoreClient.class.getName())); + + // contain default database + assertThat(cache.clientPool().newClient().getAllDatabases()).contains("default"); + } + + @Test + public void testLoginRoot() throws TException { + // user paimon login + Options options = new Options(); + options.set("type", "paimon"); + options.set("paimon.catalog.type", "hive"); + options.set("hive.metastore.uris", "thrift://30.150.24.155:9083"); + options.set("client-pool-cache.keys", "ugi,conf:*"); + + Configuration config = new Configuration(); + config.set("current.user", "root"); + config.set( + "hive.metastore.filter.hook", MockAuthorizationMetaStoreFilterHook.class.getName()); + CachedClientPool cache = + UserGroupInformation.createRemoteUser("root") + .doAs( + (PrivilegedAction) + () -> + new CachedClientPool( + config, + options, + HiveMetaStoreClient.class.getName())); + + // contain default database + assertThat(cache.clientPool().newClient().getAllDatabases()).contains("default"); + } + + /** Tests for {@link CachedClientPool}. */ + public static class MockAuthorizationMetaStoreFilterHook + extends AuthorizationMetaStoreFilterHook { + HiveConf conf; + UserGroupInformation ugi; + + public MockAuthorizationMetaStoreFilterHook(HiveConf conf) throws IOException { + super(conf); + this.conf = conf; + this.ugi = UserGroupInformation.getCurrentUser(); + this.checkCurrentUser(this.conf); + } + + public List filterDatabases(List dbList) throws MetaException { + return this.ugi.doAs( + (PrivilegedAction>) + () -> { + this.checkCurrentUser(this.conf); + return dbList; + }); + } + + private void checkCurrentUser(HiveConf conf) { + try { + assertThat(UserGroupInformation.getCurrentUser().getUserName()) + .isEqualTo(conf.get("current.user")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +}