Skip to content

Commit

Permalink
[core] Make ClientPool simple to reduce thread safe bugs (#4256)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Sep 26, 2024
1 parent d80f514 commit c293fc7
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 413 deletions.
183 changes: 35 additions & 148 deletions paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,12 @@

package org.apache.paimon.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;

import static org.apache.paimon.utils.Preconditions.checkState;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/** Client pool for using multiple clients to execute actions. */
public interface ClientPool<C, E extends Exception> {
Expand All @@ -45,167 +39,60 @@ interface ExecuteAction<C, E extends Exception> {

<R> R run(Action<R, C, E> action) throws E, InterruptedException;

<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);

private final int poolSize;
private final Deque<C> clients;
private final Class<? extends E> reconnectExc;
private final Object signal = new Object();
private final boolean retryByDefault;
private volatile int currentSize;
private boolean closed;
private volatile LinkedBlockingDeque<C> clients;

public ClientPoolImpl(
int poolSize, Class<? extends E> reconnectExc, boolean retryByDefault) {
this.poolSize = poolSize;
this.reconnectExc = reconnectExc;
this.clients = new ArrayDeque<>(poolSize);
this.currentSize = 0;
this.closed = false;
this.retryByDefault = retryByDefault;
protected ClientPoolImpl(int poolSize, Supplier<C> supplier) {
this.clients = new LinkedBlockingDeque<>();
for (int i = 0; i < poolSize; i++) {
this.clients.add(supplier.get());
}
}

@Override
public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
return run(action, retryByDefault);
}

@Override
public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException {
C client = get();
try {
return action.run(client);
} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
// if reconnection throws any exception, rethrow the original failure
throw reconnectExc.cast(exc);
}

while (true) {
LinkedBlockingDeque<C> clients = this.clients;
if (clients == null) {
throw new IllegalStateException("Cannot get a client from a closed pool");
}
C client = clients.pollFirst(10, TimeUnit.SECONDS);
if (client == null) {
continue;
}
try {
return action.run(client);
} finally {
clients.addFirst(client);
}

throw exc;

} finally {
release(client);
}
}

@Override
public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {
execute(action, retryByDefault);
}

@Override
public void execute(ExecuteAction<C, E> action, boolean retry)
throws E, InterruptedException {
C client = get();
try {
action.run(client);
} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
// if reconnection throws any exception, rethrow the original failure
throw reconnectExc.cast(exc);
}

action.run(client);
}

throw exc;

} finally {
release(client);
}
}

protected abstract C newClient();

protected abstract C reconnect(C client);

protected boolean isConnectionException(Exception exc) {
return reconnectExc.isInstance(exc);
run(
(Action<Void, C, E>)
client -> {
action.run(client);
return null;
});
}

protected abstract void close(C client);

@Override
public void close() {
this.closed = true;
try {
while (currentSize > 0) {
if (!clients.isEmpty()) {
synchronized (this) {
if (!clients.isEmpty()) {
C client = clients.removeFirst();
close(client);
currentSize -= 1;
}
}
}
if (clients.isEmpty() && currentSize > 0) {
synchronized (signal) {
// wake every second in case this missed the signal
signal.wait(1000);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn(
"Interrupted while shutting down pool. Some clients may not be closed.", e);
LinkedBlockingDeque<C> clients = this.clients;
this.clients = null;
if (clients != null) {
List<C> drain = new ArrayList<>();
clients.drainTo(drain);
drain.forEach(this::close);
}
}

private C get() throws InterruptedException {
checkState(!closed, "Cannot get a client from a closed pool");
while (true) {
if (!clients.isEmpty() || currentSize < poolSize) {
synchronized (this) {
if (!clients.isEmpty()) {
return clients.removeFirst();
} else if (currentSize < poolSize) {
C client = newClient();
currentSize += 1;
return client;
}
}
}
synchronized (signal) {
// wake every second in case this missed the signal
signal.wait(1000);
}
}
}

private void release(C client) {
synchronized (this) {
clients.addFirst(client);
}
synchronized (signal) {
signal.notify();
}
}

public int poolSize() {
return poolSize;
}

public boolean isClosed() {
return closed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,7 @@ private Lock lock(Identifier identifier) {

@Override
public void close() throws Exception {
if (!connections.isClosed()) {
connections.close();
}
connections.close();
}

private SchemaManager getSchemaManager(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,21 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Client pool for jdbc. */
public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection, SQLException> {

private static final Pattern PROTOCOL_PATTERN = Pattern.compile("jdbc:([^:]+):(.*)");
private final String dbUrl;
private final Map<String, String> properties;

private final String protocol;

public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
super(poolSize, SQLNonTransientConnectionException.class, true);
properties = props;
this.dbUrl = dbUrl;
super(poolSize, clientSupplier(dbUrl, props));
Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl);
if (matcher.matches()) {
this.protocol = matcher.group(1);
Expand All @@ -49,21 +46,16 @@ public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
}
}

@Override
protected Connection newClient() {
try {
Properties dbProps =
JdbcUtils.extractJdbcConfiguration(properties, JdbcCatalog.PROPERTY_PREFIX);
return DriverManager.getConnection(dbUrl, dbProps);
} catch (SQLException e) {
throw new RuntimeException(String.format("Failed to connect: %s", dbUrl), e);
}
}

@Override
protected Connection reconnect(Connection client) {
close(client);
return newClient();
private static Supplier<Connection> clientSupplier(String dbUrl, Map<String, String> props) {
return () -> {
try {
Properties dbProps =
JdbcUtils.extractJdbcConfiguration(props, JdbcCatalog.PROPERTY_PREFIX);
return DriverManager.getConnection(dbUrl, dbProps);
} catch (SQLException e) {
throw new RuntimeException(String.format("Failed to connect: %s", dbUrl), e);
}
};
}

public String getProtocol() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,12 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action)
return clientPool().run(action);
}

@Override
public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
throws TException, InterruptedException {
return clientPool().run(action, retry);
}

@Override
public void execute(ExecuteAction<IMetaStoreClient, TException> action)
throws TException, InterruptedException {
clientPool().execute(action);
}

@Override
public void execute(ExecuteAction<IMetaStoreClient, TException> action, boolean retry)
throws TException, InterruptedException {
clientPool().execute(action, retry);
}

@VisibleForTesting
static Key extractKey(String clientClassName, String cacheKeys, Configuration conf) {
// generate key elements in a certain order, so that the Key instances are comparable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

package org.apache.paimon.hive.pool;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.hive.RetryingMetaStoreClientFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import java.util.function.Supplier;

/**
* Pool of Hive Metastore clients.
Expand All @@ -36,49 +35,19 @@
*/
public class HiveClientPool extends ClientPool.ClientPoolImpl<IMetaStoreClient, TException> {

private final HiveConf hiveConf;
private final String clientClassName;

public HiveClientPool(int poolSize, Configuration conf, String clientClassName) {
// Do not allow retry by default as we rely on RetryingHiveClient
super(poolSize, TTransportException.class, false);
this.hiveConf = new HiveConf(conf, HiveClientPool.class);
this.hiveConf.addResource(conf);
this.clientClassName = clientClassName;
}

@Override
protected IMetaStoreClient newClient() {
return new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
super(poolSize, clientSupplier(conf, clientClassName));
}

@Override
protected IMetaStoreClient reconnect(IMetaStoreClient client) {
try {
client.close();
client.reconnect();
} catch (MetaException e) {
throw new RuntimeException("Failed to reconnect to Hive Metastore", e);
}
return client;
}

@Override
protected boolean isConnectionException(Exception e) {
return super.isConnectionException(e)
|| (e instanceof MetaException
&& e.getMessage()
.contains(
"Got exception: org.apache.thrift.transport.TTransportException"));
private static Supplier<IMetaStoreClient> clientSupplier(
Configuration conf, String clientClassName) {
HiveConf hiveConf = new HiveConf(conf, HiveClientPool.class);
hiveConf.addResource(conf);
return () -> new RetryingMetaStoreClientFactory().createClient(hiveConf, clientClassName);
}

@Override
protected void close(IMetaStoreClient client) {
client.close();
}

@VisibleForTesting
HiveConf hiveConf() {
return hiveConf;
}
}
Loading

0 comments on commit c293fc7

Please sign in to comment.