Skip to content

Commit

Permalink
[core] Fix license and minor for JdbcCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Mar 6, 2024
1 parent 9d29737 commit c1abbac
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 180 deletions.
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/net
paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkServer.java
from http://flink.apache.org/ version 1.17.0

paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
Expand Down
143 changes: 142 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,20 @@

package org.apache.paimon.client;

/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;

import static org.apache.paimon.utils.Preconditions.checkState;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

/** Client pool for using multiple clients to execute actions. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
interface Action<R, C, E extends Exception> {
Expand All @@ -28,4 +41,132 @@ interface Action<R, C, E extends Exception> {
<R> R run(Action<R, C, E> action) throws E, InterruptedException;

<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);

private final int poolSize;
private final Deque<C> clients;
private final Class<? extends E> reconnectExc;
private final Object signal = new Object();
private final boolean retryByDefault;
private volatile int currentSize;
private boolean closed;

public ClientPoolImpl(
int poolSize, Class<? extends E> reconnectExc, boolean retryByDefault) {
this.poolSize = poolSize;
this.reconnectExc = reconnectExc;
this.clients = new ArrayDeque<>(poolSize);
this.currentSize = 0;
this.closed = false;
this.retryByDefault = retryByDefault;
}

@Override
public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
return run(action, retryByDefault);
}

@Override
public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException {
C client = get();
try {
return action.run(client);
} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
// if reconnection throws any exception, rethrow the original failure
throw reconnectExc.cast(exc);
}

return action.run(client);
}

throw exc;

} finally {
release(client);
}
}

protected abstract C newClient();

protected abstract C reconnect(C client);

protected boolean isConnectionException(Exception exc) {
return reconnectExc.isInstance(exc);
}

protected abstract void close(C client);

@Override
public void close() {
this.closed = true;
try {
while (currentSize > 0) {
if (!clients.isEmpty()) {
synchronized (this) {
if (!clients.isEmpty()) {
C client = clients.removeFirst();
close(client);
currentSize -= 1;
}
}
}
if (clients.isEmpty() && currentSize > 0) {
synchronized (signal) {
// wake every second in case this missed the signal
signal.wait(1000);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn(
"Interrupted while shutting down pool. Some clients may not be closed.", e);
}
}

private C get() throws InterruptedException {
checkState(!closed, "Cannot get a client from a closed pool");
while (true) {
if (!clients.isEmpty() || currentSize < poolSize) {
synchronized (this) {
if (!clients.isEmpty()) {
return clients.removeFirst();
} else if (currentSize < poolSize) {
C client = newClient();
currentSize += 1;
return client;
}
}
}
synchronized (signal) {
// wake every second in case this missed the signal
signal.wait(1000);
}
}
}

private void release(C client) {
synchronized (this) {
clients.addFirst(client);
}
synchronized (signal) {
signal.notify();
}
}

public int poolSize() {
return poolSize;
}

public boolean isClosed() {
return closed;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ static JdbcDistributedLockDialect create(String protocol) {
/** Supported jdbc protocol. */
enum JdbcProtocol {
SQLITE,
// for mysql.
MARIADB,
MYSQL;
MYSQL
}
}
Loading

0 comments on commit c1abbac

Please sign in to comment.