Skip to content

Commit

Permalink
added javadocs
Browse files Browse the repository at this point in the history
  • Loading branch information
saxenapranav committed Jun 5, 2024
1 parent 45815ab commit 26f2edc
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public AbfsAHCHttpOperation(final URL url,
|| HTTP_METHOD_PATCH.equals(method)
|| HTTP_METHOD_POST.equals(method);
this.abfsApacheHttpClient = abfsApacheHttpClient;
LOG.debug("Creating AbfsAHCHttpOperation for URL: {}, method: {}",
url, method);

final URI requestUri;
try {
Expand Down Expand Up @@ -201,7 +203,10 @@ public void processResponse(final byte[] buffer,
try {
if (!isPayloadRequest) {
prepareRequest();
LOG.debug("Sending request: {}", httpRequestBase);
httpResponse = executeRequest();
LOG.debug("Request sent: {}; response {}", httpRequestBase,
httpResponse);
}
parseResponseHeaderAndBody(buffer, offset, length);
} finally {
Expand Down Expand Up @@ -349,6 +354,7 @@ public void sendPayload(final byte[] buffer,

prepareRequest();
try {
LOG.debug("Sending request: {}", httpRequestBase);
httpResponse = executeRequest();
} catch (AbfsApacheHttpExpect100Exception ex) {
LOG.debug(
Expand All @@ -358,7 +364,16 @@ public void sendPayload(final byte[] buffer,
ex);
connectionDisconnectedOnError = true;
httpResponse = ex.getHttpResponse();
} catch (IOException ex) {
LOG.debug("Getting output stream failed for uri {}, exception: {}",
getMaskedUrl(), ex);
connectionDisconnectedOnError = true;
throw ex;
} finally {
if(httpResponse != null) {
LOG.debug("Request sent: {}; response {}", httpRequestBase,
httpResponse);
}
if (!connectionDisconnectedOnError
&& httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
setBytesSent(length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.Closeable;
import java.io.IOException;

import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
Expand All @@ -38,18 +39,33 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTP_SCHEME;
import static org.apache.http.conn.ssl.SSLConnectionSocketFactory.getDefaultHostnameVerifier;

final class AbfsApacheHttpClient {
/**
* Client for AzureBlobFileSystem to execute HTTP requests over ApacheHttpClient.
*/
final class AbfsApacheHttpClient implements Closeable {

/**
* ApacheHttpClient instance that executes HTTP request.
*/
private final CloseableHttpClient httpClient;

private static AbfsApacheHttpClient abfsApacheHttpClient = null;

/**
* Flag to indicate if the client is usable. This is a JVM level flag, state of
* this flag is shared across all instances of fileSystems. Once switched off,
* the ApacheHttpClient would not be used for whole JVM lifecycle.
*/
private static boolean usable = true;

/**
* Registers the switch off of ApacheHttpClient for all future use in the JVM.
*/
static void registerFallback() {
usable = false;
}

/**
* @return if ApacheHttpClient is usable.
*/
static boolean usable() {
return usable;
}
Expand All @@ -76,12 +92,24 @@ public AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactor
httpClient = builder.build();
}

@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
}
}

/**
* Executes the HTTP request.
*
* @param httpRequest HTTP request to execute.
* @param abfsHttpClientContext HttpClient context.
* @param connectTimeout Connection timeout.
* @param readTimeout Read timeout.
*
* @return HTTP response.
* @throws IOException network error.
*/
public HttpResponse execute(HttpRequestBase httpRequest,
final AbfsManagedHttpClientContext abfsHttpClientContext,
final int connectTimeout,
Expand All @@ -94,8 +122,13 @@ public HttpResponse execute(HttpRequestBase httpRequest,
return httpClient.execute(httpRequest, abfsHttpClientContext);
}


private static Registry<ConnectionSocketFactory> createSocketFactoryRegistry(
/**
* Creates the socket factory registry for HTTP and HTTPS.
*
* @param sslSocketFactory SSL socket factory.
* @return Socket factory registry.
*/
private Registry<ConnectionSocketFactory> createSocketFactoryRegistry(
ConnectionSocketFactory sslSocketFactory) {
if (sslSocketFactory == null) {
return RegistryBuilder.<ConnectionSocketFactory>create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
class AbfsConnectionManager implements HttpClientConnectionManager {

private static final Logger LOG = LoggerFactory.getLogger(
AbfsAHCHttpOperation.class);
AbfsConnectionManager.class);
private final KeepAliveCache kac;

private final AbfsHttpClientConnectionFactory httpConnectionFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
import org.apache.http.HttpClientConnection;
import org.apache.http.client.protocol.HttpClientContext;

/**
* Registers the latency of different phases of a network call.
*/
public class AbfsManagedHttpClientContext extends HttpClientContext {

/**Connection establishment time*/
private long connectTime = 0L;

/**Time taken to receive and read response*/
private long readTime = 0L;

/***Time taken to send request*/
private long sendTime = 0L;

public AbfsManagedHttpClientContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,81 @@
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.http.HttpClientConnection;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;

/**
* Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
* instance of FileSystem has its own KeepAliveCache.
* <p>
* Why this implementation is required in comparison to {@link org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
* connection-pooling:
* <ol>
* <li>PoolingHttpClientConnectionManager heuristic caches all the reusable connections it has created.
* JDK's implementation only caches limited number of connections. The limit is given by JVM system
* property "http.maxConnections". If there is no system-property, it defaults to 5.</li>
* <li>In PoolingHttpClientConnectionManager, it expects the application to provide `setMaxPerRoute` and `setMaxTotal`,
* which the implementation uses as the total number of connections it can create. For application using ABFS, it is not
* feasible to provide a value in the initialisation of the connectionManager. JDK's implementation has no cap on the
* number of connections it can create.</li>
* </ol>
*/
public class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
implements
Closeable {

/**
* Scheduled timer that evicts idle connections.
*/
private final Timer timer;

/**
* Task provided to the timer that owns eviction logic.
*/
private final TimerTask timerTask;

/**
* Flag to indicate if the cache is closed.
*/
private boolean isClosed;

/**
* Counter to keep track of the number of KeepAliveCache instances created.
*/
private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);

/**
* Maximum number of connections that can be cached.
*/
private final int maxConn;

/**
* Time-to-live for an idle connection.
*/
private final long connectionIdleTTL;

/**
* Flag to indicate if the eviction thread is paused.
*/
private boolean isPaused = false;

@VisibleForTesting
synchronized void pauseThread() {
isPaused = true;
}

@VisibleForTesting
synchronized void resumeThread() {
isPaused = false;
}

/**
* @return connectionIdleTTL
*/
@VisibleForTesting
public long getConnectionIdleTTL() {
return connectionIdleTTL;
}
Expand Down Expand Up @@ -66,6 +110,10 @@ public void run() {
timer.schedule(timerTask, 0, connectionIdleTTL);
}

/**
* Iterate over the cache and evict the idle connections. An idle connection is
* one that has been in the cache for more than connectionIdleTTL milliseconds.
*/
synchronized void evictIdleConnection() {
long currentTime = System.currentTimeMillis();
int i;
Expand All @@ -82,6 +130,11 @@ synchronized void evictIdleConnection() {
subList(0, i).clear();
}

/**
* Safe close of the HttpClientConnection.
*
* @param hc HttpClientConnection to be closed
*/
private void closeHtpClientConnection(final HttpClientConnection hc) {
try {
hc.close();
Expand All @@ -90,6 +143,9 @@ private void closeHtpClientConnection(final HttpClientConnection hc) {
}
}

/**
* Close all connections in cache and cancel the eviction timer.
*/
@Override
public synchronized void close() {
isClosed = true;
Expand All @@ -101,6 +157,11 @@ public synchronized void close() {
}
}

/**
* Gets the latest added HttpClientConnection from the cache.
*
* @return HttpClientConnection
*/
public synchronized HttpClientConnection get()
throws IOException {
if (isClosed) {
Expand All @@ -115,14 +176,20 @@ public synchronized HttpClientConnection get()
KeepAliveEntry e = pop();
if ((currentTime - e.idleStartTime) > connectionIdleTTL
|| e.httpClientConnection.isStale()) {
e.httpClientConnection.close();
closeHtpClientConnection(e.httpClientConnection);
} else {
hc = e.httpClientConnection;
}
} while ((hc == null) && (!empty()));
return hc;
}

/**
* Puts the HttpClientConnection in the cache. If the size of cache is equal to
* maxConn, the give HttpClientConnection is closed and not added in cache.
*
* @param httpClientConnection HttpClientConnection to be cached
*/
public synchronized void put(HttpClientConnection httpClientConnection) {
if (isClosed) {
return;
Expand All @@ -136,10 +203,15 @@ public synchronized void put(HttpClientConnection httpClientConnection) {
push(entry);
}

/**
* Entry data-structure in the cache.
*/
static class KeepAliveEntry {

/**HttpClientConnection in the cache entry.*/
private final HttpClientConnection httpClientConnection;

/**Time at which the HttpClientConnection was added to the cache.*/
private final long idleStartTime;

KeepAliveEntry(HttpClientConnection hc, long idleStartTime) {
Expand Down

0 comments on commit 26f2edc

Please sign in to comment.