Skip to content

Commit

Permalink
some refactors to keepAliveCache; each fs would have its own kac; tes…
Browse files Browse the repository at this point in the history
…t refactors
  • Loading branch information
saxenapranav committed Jun 4, 2024
1 parent 53f2165 commit b356ae1
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ public class AbfsAHCHttpOperation extends AbfsHttpOperation {

private final boolean isPayloadRequest;

private final AbfsClient abfsClient;

public AbfsAHCHttpOperation(final URL url,
final String method,
final List<AbfsHttpHeader> requestHeaders,
final int connectionTimeout,
final int readTimeout) {
final int readTimeout, final AbfsClient abfsClient) {
super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
this.isPayloadRequest = isPayloadRequest(method);
this.abfsClient = abfsClient;
}

@VisibleForTesting
Expand Down Expand Up @@ -191,7 +194,7 @@ HttpResponse executeRequest() throws IOException {
AbfsManagedHttpClientContext abfsHttpClientContext
= setFinalAbfsClientContext();
HttpResponse response
= AbfsApacheHttpClient.getClient().execute(httpRequestBase,
= abfsClient.abfsApacheHttpClient.execute(httpRequestBase,
abfsHttpClientContext, getConnectionTimeout(), getReadTimeout());
setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,13 @@ static boolean usable() {
return usable;
}

static synchronized void setClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
final int readTimeout) {
if (abfsApacheHttpClient == null) {
abfsApacheHttpClient = new AbfsApacheHttpClient(
delegatingSSLSocketFactory, readTimeout);
}
}

static AbfsApacheHttpClient getClient() {
return abfsApacheHttpClient;
}

private AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
final int readTimeout) {
public AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
final int readTimeout, final KeepAliveCache keepAliveCache) {
final AbfsConnectionManager connMgr = new AbfsConnectionManager(
createSocketFactoryRegistry(
new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
getDefaultHostnameVerifier())),
new AbfsConnFactory());
new AbfsConnFactory(), keepAliveCache);
final HttpClientBuilder builder = HttpClients.custom();
builder.setConnectionManager(connMgr)
.setRequestExecutor(new AbfsManagedHttpRequestExecutor(readTimeout))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public class AbfsClient implements Closeable {
private boolean isSendMetricCall;
private SharedKeyCredentials metricSharedkeyCredentials = null;

private KeepAliveCache keepAliveCache;

AbfsApacheHttpClient abfsApacheHttpClient;

/**
* logging the rename failure if metadata is in an incomplete state.
*/
Expand Down Expand Up @@ -193,10 +197,13 @@ private AbfsClient(final URL baseUrl,
}
if (abfsConfiguration.getPreferredHttpOperationType()
== HttpOperationType.APACHE_HTTP_CLIENT) {
KeepAliveCache.getInstance().setAbfsConfig(abfsConfiguration);
AbfsApacheHttpClient.setClient(
keepAliveCache = new KeepAliveCache();
keepAliveCache.setAbfsConfig(abfsConfiguration);

abfsApacheHttpClient = new AbfsApacheHttpClient(
DelegatingSSLSocketFactory.getDefaultFactory(),
abfsConfiguration.getHttpReadTimeout());
abfsConfiguration.getHttpReadTimeout(),
keepAliveCache);
}

this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
Expand Down Expand Up @@ -266,6 +273,12 @@ public void close() throws IOException {
runningTimerTask.cancel();
timer.purge();
}
if(keepAliveCache != null) {
keepAliveCache.close();
}
if(abfsApacheHttpClient != null) {
abfsApacheHttpClient.close();
}
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG,
(Closeable) tokenProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@
*/
class AbfsConnectionManager implements HttpClientConnectionManager {

private final KeepAliveCache kac = KeepAliveCache.getInstance();
private final KeepAliveCache kac;

private final AbfsConnFactory httpConnectionFactory;

private final HttpClientConnectionOperator connectionOperator;

AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry,
AbfsConnFactory connectionFactory) {
AbfsConnFactory connectionFactory, KeepAliveCache kac) {
this.httpConnectionFactory = connectionFactory;
this.kac = kac;
connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
}
Expand All @@ -65,7 +66,7 @@ public HttpClientConnection get(final long timeout,
throws InterruptedException, ExecutionException,
ConnectionPoolTimeoutException {
try {
HttpClientConnection clientConn = kac.get(route);
HttpClientConnection clientConn = kac.get();
if (clientConn != null) {
return clientConn;
}
Expand Down Expand Up @@ -100,10 +101,7 @@ public void releaseConnection(final HttpClientConnection conn,
return;
}
if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
HttpRoute route = ((AbfsManagedApacheHttpConnection) conn).getHttpRoute();
if (route != null) {
kac.put(route, conn);
}
kac.put(conn);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ AbfsJdkHttpOperation createAbfsHttpOperation() throws IOException {
AbfsAHCHttpOperation createAbfsAHCHttpOperation() {
return new AbfsAHCHttpOperation(url, method, requestHeaders,
client.getAbfsConfiguration().getHttpConnectionTimeout(),
client.getAbfsConfiguration().getHttpReadTimeout()
client.getAbfsConfiguration().getHttpReadTimeout(), client
);
}

Expand Down
Loading

0 comments on commit b356ae1

Please sign in to comment.