Skip to content

Commit

Permalink
add hive client pool, co-author: @XuQianJin-Stars
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong committed Mar 26, 2024
1 parent 13de145 commit 47409b1
Show file tree
Hide file tree
Showing 11 changed files with 803 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@

/** Client pool for using multiple clients to execute actions. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
/** Action interface with return object for client. */
interface Action<R, C, E extends Exception> {
R run(C client) throws E;
}

/** Action interface with return void for client. */
interface ExecuteAction<C, E extends Exception> {
void run(C client) throws E;
}

<R> R run(Action<R, C, E> action) throws E, InterruptedException;

<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
Expand Down Expand Up @@ -93,6 +102,36 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
}
}

@Override
public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {
execute(action, retryByDefault);
}

@Override
public void execute(ExecuteAction<C, E> action, boolean retry)
throws E, InterruptedException {
C client = get();
try {
action.run(client);
} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
// if reconnection throws any exception, rethrow the original failure
throw reconnectExc.cast(exc);
}

action.run(client);
}

throw exc;

} finally {
release(client);
}
}

protected abstract C newClient();

protected abstract C reconnect(C client);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;

/** Factory to create {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class);

public static final ConfigOption<String> METASTORE_CLIENT_CLASS =
ConfigOptions.key("metastore.client.class")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.paimon.hive;

import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.TimeUtils;

import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -46,12 +49,15 @@ public class HiveCatalogLock implements CatalogLock {

static final String LOCK_IDENTIFIER = "hive";

private final IMetaStoreClient client;
private final ClientPool<IMetaStoreClient, TException> clients;
private final long checkMaxSleep;
private final long acquireTimeout;

public HiveCatalogLock(IMetaStoreClient client, long checkMaxSleep, long acquireTimeout) {
this.client = client;
public HiveCatalogLock(
ClientPool<IMetaStoreClient, TException> clients,
long checkMaxSleep,
long acquireTimeout) {
this.clients = clients;
this.checkMaxSleep = checkMaxSleep;
this.acquireTimeout = acquireTimeout;
}
Expand All @@ -77,7 +83,7 @@ private long lock(String database, String table)
Collections.singletonList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = this.client.lock(lockRequest);
LockResponse lockResponse = clients.run(client -> client.lock(lockRequest));

long nextSleep = 50;
long startRetry = System.currentTimeMillis();
Expand All @@ -88,7 +94,8 @@ private long lock(String database, String table)
}
Thread.sleep(nextSleep);

lockResponse = client.checkLock(lockResponse.getLockid());
final LockResponse tempLockResponse = lockResponse;
lockResponse = clients.run(client -> client.checkLock(tempLockResponse.getLockid()));
if (System.currentTimeMillis() - startRetry > acquireTimeout) {
break;
}
Expand All @@ -97,21 +104,22 @@ private long lock(String database, String table)

if (lockResponse.getState() != LockState.ACQUIRED) {
if (lockResponse.getState() == LockState.WAITING) {
client.unlock(lockResponse.getLockid());
final LockResponse tempLockResponse = lockResponse;
clients.execute(client -> client.unlock(tempLockResponse.getLockid()));
}
throw new RuntimeException(
"Acquire lock failed with time: " + Duration.ofMillis(retryDuration));
}
return lockResponse.getLockid();
}

private void unlock(long lockId) throws TException {
client.unlock(lockId);
private void unlock(long lockId) throws TException, InterruptedException {
clients.execute(client -> client.unlock(lockId));
}

@Override
public void close() {
this.client.close();
// do nothing
}

/** Catalog lock factory for hive. */
Expand All @@ -125,7 +133,8 @@ public CatalogLock create(LockContext context) {
HiveLockContext hiveLockContext = (HiveLockContext) context;
HiveConf conf = hiveLockContext.hiveConf.conf();
return new HiveCatalogLock(
HiveCatalog.createClient(conf, hiveLockContext.clientClassName),
new CachedClientPool(
conf, hiveLockContext.options, hiveLockContext.clientClassName),
checkMaxSleep(conf),
acquireTimeout(conf));
}
Expand Down Expand Up @@ -155,10 +164,13 @@ public static long acquireTimeout(HiveConf conf) {
static class HiveLockContext implements LockContext {
private final SerializableHiveConf hiveConf;
private final String clientClassName;
private final Options options;

public HiveLockContext(SerializableHiveConf hiveConf, String clientClassName) {
public HiveLockContext(
SerializableHiveConf hiveConf, String clientClassName, Options options) {
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
this.options = options;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.description.Description;

import java.util.concurrent.TimeUnit;

import static org.apache.paimon.options.description.TextElement.text;

/** Options for hive catalog. */
public final class HiveCatalogOptions {
Expand Down Expand Up @@ -52,5 +57,33 @@ public final class HiveCatalogOptions {
+ "If you don't want to access the location by the filesystem of hive when using a object storage such as s3,oss\n"
+ "you can set this option to true.\n");

public static final ConfigOption<Long> CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
ConfigOptions.key("client-pool-cache.eviction-interval-ms")
.longType()
.defaultValue(TimeUnit.MINUTES.toMillis(5))
.withDescription("Setting the client's pool cache eviction interval(ms).\n");

public static final ConfigOption<String> CLIENT_POOL_CACHE_KEYS =
ConfigOptions.key("client-pool-cache.keys")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Specify client cache key, multiple elements separated by commas.")
.linebreak()
.list(
text(
"\"ugi\": the Hadoop UserGroupInformation instance that represents the current user using the cache."))
.list(
text(
"\"user_name\" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName."))
.list(
text(
"\"conf\": name of an arbitrary configuration. "
+ "The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a \"conf:\" prefix which is followed by the configuration name. "
+ "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified."))
.build());

private HiveCatalogOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.paimon.utils.RowDataPartitionComputer;
Expand All @@ -31,6 +34,7 @@
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -42,20 +46,30 @@ public class HiveMetastoreClient implements MetastoreClient {
private final Identifier identifier;
private final RowDataPartitionComputer partitionComputer;

private final IMetaStoreClient client;
private final ClientPool<IMetaStoreClient, TException> clients;
private final StorageDescriptor sd;

private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client)
throws Exception {
private HiveMetastoreClient(
Identifier identifier,
TableSchema schema,
ClientPool<IMetaStoreClient, TException> clients)
throws TException, InterruptedException {
this.identifier = identifier;
this.partitionComputer =
new RowDataPartitionComputer(
new CoreOptions(schema.options()).partitionDefaultName(),
schema.logicalPartitionType(),
schema.partitionKeys().toArray(new String[0]));

this.client = client;
this.sd = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()).getSd();
this.clients = clients;
this.sd =
this.clients
.run(
client ->
client.getTable(
identifier.getDatabaseName(),
identifier.getObjectName()))
.getSd();
}

@Override
Expand All @@ -67,8 +81,12 @@ public void addPartition(BinaryRow partition) throws Exception {
public void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exception {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
try {
client.getPartition(
identifier.getDatabaseName(), identifier.getObjectName(), partitionValues);
clients.execute(
client ->
client.getPartition(
identifier.getDatabaseName(),
identifier.getObjectName(),
partitionValues));
// do nothing if the partition already exists
} catch (NoSuchObjectException e) {
// partition not found, create new partition
Expand All @@ -87,27 +105,29 @@ public void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exc
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);

client.add_partition(hivePartition);
clients.execute(client -> client.add_partition(hivePartition));
}
}

@Override
public void deletePartition(LinkedHashMap<String, String> partitionSpec) throws Exception {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
try {
client.dropPartition(
identifier.getDatabaseName(),
identifier.getObjectName(),
partitionValues,
false);
clients.execute(
client ->
client.dropPartition(
identifier.getDatabaseName(),
identifier.getObjectName(),
partitionValues,
false));
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
}
}

@Override
public void close() throws Exception {
client.close();
// do nothing
}

/** Factory to create {@link HiveMetastoreClient}. */
Expand All @@ -119,26 +139,35 @@ public static class Factory implements MetastoreClient.Factory {
private final TableSchema schema;
private final SerializableHiveConf hiveConf;
private final String clientClassName;
private final Options options;

public Factory(
Identifier identifier,
TableSchema schema,
HiveConf hiveConf,
String clientClassName) {
String clientClassName,
Options options) {
this.identifier = identifier;
this.schema = schema;
this.hiveConf = new SerializableHiveConf(hiveConf);
this.clientClassName = clientClassName;
this.options = options;
}

@Override
public MetastoreClient create() {
HiveConf conf = hiveConf.conf();
try {
return new HiveMetastoreClient(
identifier, schema, HiveCatalog.createClient(conf, clientClassName));
} catch (Exception e) {
throw new RuntimeException(e);
identifier, schema, new CachedClientPool(conf, options, clientClassName));
} catch (TException e) {
throw new RuntimeException(
"Can not get table " + identifier + " info from metastore.", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Interrupted in call to new HiveMetastoreClient for table " + identifier,
e);
}
}
}
Expand Down
Loading

0 comments on commit 47409b1

Please sign in to comment.