Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core]HiveCatalog supports client pool #3106

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>catalog-key</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Custom catalog key.</td>
</tr>
<tr>
<td><h5>client-pool-cache-eviction-interval-ms</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>Long</td>
<td>Client pool cache eviction interval ms.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>catalog-key</h5></td>
<td style="word-wrap: break-word;">"jdbc"</td>
<td>String</td>
<td>Custom jdbc catalog store key.</td>
</tr>
<tr>
<td><h5>lock-key-max-length</h5></td>
<td style="word-wrap: break-word;">255</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@

package org.apache.paimon.client;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkState;

Expand All @@ -43,7 +52,8 @@ interface Action<R, C, E extends Exception> {
<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
abstract class ClientPoolImpl<C, E extends Exception>
implements Closeable, Serializable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);

private final int poolSize;
Expand Down Expand Up @@ -169,4 +179,60 @@ public boolean isClosed() {
return closed;
}
}

/** Cached client pool for {@link ClientPool}. */
abstract class CachedClientPool<C, E extends Exception, CP extends ClientPoolImpl>
implements Closeable, Serializable, ClientPool<C, E> {

protected static final String CONF_KEY_PREFIX = "confKey:";
protected final long evictionInterval;
protected final String key;
protected final String metadata;
private final Options options;

public CachedClientPool(Options options) {
this.options = options;
this.evictionInterval =
options.get(CatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
this.metadata = options.get(CatalogOptions.METASTORE);
this.key = extractKey(options);
init();
}

protected Options options() {
return options;
}

protected abstract void init();

protected abstract ClientPool<C, E> clientPool();

@Override
public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
return clientPool().run(action);
}

@Override
public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException {
return clientPool().run(action, retry);
}

private String extractKey(Options options) {
List<Object> elements = Lists.newArrayList();
elements.add(options.get(CatalogOptions.URI));
String metastore = options.get(CatalogOptions.METASTORE);
elements.add(metastore);
String catalogKey = options.getOptional(CatalogOptions.CATALOG_KEY).orElse(metastore);
elements.add(catalogKey);
elements.addAll(extractKeyElement());
return CONF_KEY_PREFIX.concat(StringUtils.join(elements, "."));
}

protected abstract List<String> extractKeyElement();

@Override
public void close() throws IOException {
// Do nothing, will automatically clean up
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<String> CATALOG_KEY =
ConfigOptions.key("catalog-key")
.stringType()
.noDefaultValue()
.withDescription("Custom catalog key.");

public static final ConfigOption<Long> CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
ConfigOptions.key("client-pool-cache-eviction-interval-ms")
.longType()
.defaultValue(5 * 60 * 1000L)
.withDescription("Client pool cache eviction interval ms.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class JdbcCatalog extends AbstractCatalog {

protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) {
super(fileIO, options);
this.catalogKey = catalogKey;
this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" : catalogKey;
this.options = options;
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties: null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Factory to create {@link JdbcCatalog}. */
Expand All @@ -38,7 +39,7 @@ public String identifier() {
@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
String catalogKey = options.get(CatalogOptions.CATALOG_KEY);
return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
/** Options for jdbc catalog. */
public final class JdbcCatalogOptions {

public static final ConfigOption<String> CATALOG_KEY =
ConfigOptions.key("catalog-key")
.stringType()
.defaultValue("jdbc")
.withDescription("Custom jdbc catalog store key.");

public static final ConfigOption<Integer> LOCK_KEY_MAX_LENGTH =
ConfigOptions.key("lock-key-max-length")
.intType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.hive;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.thrift.TException;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* Cache HiveClientPool, share connection pool between multiple tasks to prevent excessive
* MetadataClient requests.
*/
public class HiveCachedClientPool
extends ClientPool.CachedClientPool<IMetaStoreClient, TException, HiveClientPool> {

protected static Cache<String, ClientPoolImpl> clientPoolCache;
private final SerializableHiveConf hiveConf;
private final String clientClassName;
private final int poolSize;

public HiveCachedClientPool(
int poolSize, SerializableHiveConf hiveConf, String clientClassName, Options options) {
super(options);
this.poolSize = poolSize;
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
}

@Override
protected synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener(
(ignored, value, cause) -> ((ClientPoolImpl) value).close())
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactory() {
final ThreadFactory defaultFactory =
Executors.defaultThreadFactory();

@Override
public Thread newThread(Runnable r) {
Thread thread =
defaultFactory.newThread(r);
thread.setDaemon(true);
return thread;
}
})))
.build();
}
}

@Override
public HiveClientPool clientPool() {
return (HiveClientPool)
clientPoolCache.get(
key, k -> new HiveClientPool(poolSize, hiveConf, clientClassName));
}

@Override
protected List<String> extractKeyElement() {
List<String> elements = Lists.newArrayList();
elements.add(options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS));
return elements;
}
}
Loading
Loading