Skip to content

Commit

Permalink
add hive client pool, co-author: @XuQianJin-Stars
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong committed Mar 20, 2024
1 parent 907525d commit 355641a
Show file tree
Hide file tree
Showing 14 changed files with 828 additions and 105 deletions.
13 changes: 13 additions & 0 deletions docs/layouts/shortcodes/generated/hive_catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>client-pool-cache.eviction-interval-ms</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>Long</td>
<td>Setting the client's pool cache eviction interval(ms).
</td>
</tr>
<tr>
<td><h5>client-pool-cache.keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify client cache key, multiple elements separated by commas.<br /><ul><li>"ugi": the Hadoop UserGroupInformation instance that represents the current user using the cache.</li></ul><ul><li>"user_name" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.</li></ul><ul><li>"conf": name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c" will add "a.b.c" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.</li></ul></td>
</tr>
<tr>
<td><h5>hadoop-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>catalog.create-underlying-session-catalog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog.</td>
</tr>
<tr>
<td><h5>read.changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -74,11 +80,5 @@
<td>Boolean</td>
<td>If true, allow to merge data types if the two types meet the rules for explicit casting.</td>
</tr>
<tr>
<td><h5>catalog.create-underlying-session-catalog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@

/** Client pool for using multiple clients to execute actions. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
/** Action interface with return object for client. */
interface Action<R, C, E extends Exception> {
R run(C client) throws E;
}

/** Action interface with return void for client. */
interface ExecuteAction<C, E extends Exception> {
void run(C client) throws E;
}

<R> R run(Action<R, C, E> action) throws E, InterruptedException;

<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
Expand Down Expand Up @@ -93,6 +102,36 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
}
}

@Override
public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {
execute(action, retryByDefault);
}

@Override
public void execute(ExecuteAction<C, E> action, boolean retry)
throws E, InterruptedException {
C client = get();
try {
action.run(client);
} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
// if reconnection throws any exception, rethrow the original failure
throw reconnectExc.cast(exc);
}

action.run(client);
}

throw exc;

} finally {
release(client);
}
}

protected abstract C newClient();

protected abstract C reconnect(C client);
Expand Down
7 changes: 7 additions & 0 deletions paimon-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-hive-catalog</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-test-utils</artifactId>
Expand Down
Loading

0 comments on commit 355641a

Please sign in to comment.