Skip to content

Commit

Permalink
[hive] add hive client pool (#3004)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Jul 8, 2024
1 parent 131f250 commit c5c73b1
Show file tree
Hide file tree
Showing 12 changed files with 840 additions and 110 deletions.
13 changes: 13 additions & 0 deletions docs/layouts/shortcodes/generated/hive_catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>client-pool-cache.eviction-interval-ms</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>Long</td>
<td>Setting the client's pool cache eviction interval(ms).
</td>
</tr>
<tr>
<td><h5>client-pool-cache.keys</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify client cache key, multiple elements separated by commas.<br /><ul><li>"ugi": the Hadoop UserGroupInformation instance that represents the current user using the cache.</li></ul><ul><li>"user_name" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.</li></ul><ul><li>"conf": name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c" will add "a.b.c" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.</li></ul></td>
</tr>
<tr>
<td><h5>hadoop-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@

/** Client pool for using multiple clients to execute actions. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
/** Action interface with return object for client. */
interface Action<R, C, E extends Exception> {
R run(C client) throws E;
}

/** Action interface with return void for client. */
interface ExecuteAction<C, E extends Exception> {
void run(C client) throws E;
}

<R> R run(Action<R, C, E> action) throws E, InterruptedException;

<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action) throws E, InterruptedException;

void execute(ExecuteAction<C, E> action, boolean retry) throws E, InterruptedException;

/** Default implementation for {@link ClientPool}. */
abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
Expand Down Expand Up @@ -93,6 +102,36 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
}
}

@Override
public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {
execute(action, retryByDefault);
}

@Override
public void execute(ExecuteAction<C, E> action, boolean retry)
throws E, InterruptedException {
C client = get();
try {
action.run(client);
} catch (Exception exc) {
if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
// if reconnection throws any exception, rethrow the original failure
throw reconnectExc.cast(exc);
}

action.run(client);
}

throw exc;

} finally {
release(client);
}
}

protected abstract C newClient();

protected abstract C reconnect(C client);
Expand Down
Loading

0 comments on commit c5c73b1

Please sign in to comment.