diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index c4e7e0ec33457..2e32c6ea9940b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -395,7 +395,7 @@ public class AbfsConfiguration{ private int maxApacheHttpClientIoExceptionsRetries; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE, - DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS) + DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS) private int maxApacheHttpClientCacheConnections; @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index ef975dc192d6e..ddf5eb5e6f262 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -165,9 +165,12 @@ public static ApiVersion getCurrentVersion() { */ public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100; + /** + * System property that define maximum number of cached-connection per fileSystem for + * ApacheHttpClient. JDK network library uses the same property to define maximum + * number of cached-connections at JVM level. + */ public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections"; - public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5; - public static final int KAC_DEFAULT_CONN_TTL = 5_000; public static final String JDK_IMPL = "JDK"; public static final String APACHE_IMPL = "Apache"; public static final String JDK_FALLBACK = "JDK_fallback"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index f814c36450ea2..8218f55bbed39 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -319,10 +319,17 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; - /**Defines what network library to use for server IO calls {@value }*/ + /**Defines what network library to use for server IO calls: {@value}*/ public static final String FS_AZURE_NETWORKING_LIBRARY = "fs.azure.networking.library"; + /** + * Maximum number of IOExceptions retries for a single server call on ApacheHttpClient. + * Breach of this count would turn off future uses of the ApacheHttpClient library + * in the JVM lifecycle: {@value} + */ public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = "fs.azure.apache.http.client.max.io.exception.retries"; + /**Maximum ApacheHttpClient-connection cache size at filesystem level: {@value}*/ public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE = "fs.azure.apache.http.client.max.cache.connection.size"; + /**Maximum idle time for a ApacheHttpClient-connection: {@value}*/ public static final String FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL = "fs.azure.apache.http.client.idle.connection.ttl"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index ef12931fae736..bd2d6e4b57334 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -174,7 +174,7 @@ public final class FileSystemConfigurations { public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L; - public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS = 5; + public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 5; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java index dadf0cf0b0392..650ef241c6cad 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java @@ -18,19 +18,17 @@ package org.apache.hadoop.fs.azurebfs.contracts.exceptions; -import java.io.IOException; - import org.apache.http.HttpResponse; -public class AbfsApacheHttpExpect100Exception extends IOException { - private final HttpResponse httpResponse; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; - public AbfsApacheHttpExpect100Exception(final String s, final HttpResponse httpResponse) { - super(s); - this.httpResponse = httpResponse; - } +/** + * Exception that marks expect100 handshake error. This exception is thrown when + * the expect100 handshake fails with ADLS server sending 4xx or 5xx status code. + */ +public class AbfsApacheHttpExpect100Exception extends HttpResponseException { - public HttpResponse getHttpResponse() { - return httpResponse; + public AbfsApacheHttpExpect100Exception(final HttpResponse httpResponse) { + super(EXPECT_100_JDK_ERROR, httpResponse); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java new file mode 100644 index 0000000000000..ff3a5a17df799 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.http.HttpResponse; + +/** + * Encapsulates an exception thrown from ApacheHttpClient response parsing. + */ +public class HttpResponseException extends IOException { + protected final HttpResponse httpResponse; + public HttpResponseException(final String s, final HttpResponse httpResponse) { + super(s); + Objects.requireNonNull(httpResponse, "httpResponse should be non-null"); + this.httpResponse = httpResponse; + } + + public HttpResponse getHttpResponse() { + return httpResponse; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java index 6826b5199905b..71da55c9cce6a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java @@ -23,8 +23,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,7 +60,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; -import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID; import static org.apache.http.entity.ContentType.TEXT_PLAIN; /** @@ -72,33 +71,86 @@ public class AbfsAHCHttpOperation extends AbfsHttpOperation { private static final Logger LOG = LoggerFactory.getLogger( AbfsAHCHttpOperation.class); - private HttpRequestBase httpRequestBase; + /** + * Request object for network call over ApacheHttpClient. + */ + private final HttpRequestBase httpRequestBase; + /** + * Response object received from a server call over ApacheHttpClient. + */ private HttpResponse httpResponse; - private boolean connectionDisconnectedOnError = false; - + /** + * Flag to indicate if the request is a payload request. HTTP methods PUT, POST, + * PATCH qualify for payload requests. + */ private final boolean isPayloadRequest; + /** + * ApacheHttpClient to make network calls. + */ + private final AbfsApacheHttpClient abfsApacheHttpClient; + public AbfsAHCHttpOperation(final URL url, final String method, final List requestHeaders, - final int connectionTimeout, - final int readTimeout) { + final Duration connectionTimeout, + final Duration readTimeout, + final AbfsApacheHttpClient abfsApacheHttpClient) throws IOException { super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout); - this.isPayloadRequest = isPayloadRequest(method); + this.isPayloadRequest = HTTP_METHOD_PUT.equals(method) + || HTTP_METHOD_PATCH.equals(method) + || HTTP_METHOD_POST.equals(method); + this.abfsApacheHttpClient = abfsApacheHttpClient; + LOG.debug("Creating AbfsAHCHttpOperation for URL: {}, method: {}", + url, method); + + final URI requestUri; + try { + requestUri = url.toURI(); + } catch (URISyntaxException e) { + throw new IOException(e); + } + switch (getMethod()) { + case HTTP_METHOD_PUT: + httpRequestBase = new HttpPut(requestUri); + break; + case HTTP_METHOD_PATCH: + httpRequestBase = new HttpPatch(requestUri); + break; + case HTTP_METHOD_POST: + httpRequestBase = new HttpPost(requestUri); + break; + case HTTP_METHOD_GET: + httpRequestBase = new HttpGet(requestUri); + break; + case HTTP_METHOD_DELETE: + httpRequestBase = new HttpDelete(requestUri); + break; + case HTTP_METHOD_HEAD: + httpRequestBase = new HttpHead(requestUri); + break; + default: + /* + * This would not happen as the AbfsClient would always be sending valid + * method. + */ + throw new PathIOException(getUrl().toString(), + "Unsupported HTTP method: " + getMethod()); + } } + /** + * @return AbfsManagedHttpClientContext instance that captures latencies at + * different phases of network call. + */ @VisibleForTesting - AbfsManagedHttpClientContext setFinalAbfsClientContext() { + AbfsManagedHttpClientContext getHttpClientContext() { return new AbfsManagedHttpClientContext(); } - private boolean isPayloadRequest(final String method) { - return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method) - || HTTP_METHOD_POST.equals(method); - } - + /**{@inheritDoc}*/ @Override protected InputStream getErrorStream() throws IOException { HttpEntity entity = httpResponse.getEntity(); @@ -108,6 +160,7 @@ protected InputStream getErrorStream() throws IOException { return entity.getContent(); } + /**{@inheritDoc}*/ @Override String getConnProperty(final String key) { for (AbfsHttpHeader header : getRequestHeaders()) { @@ -118,33 +171,36 @@ String getConnProperty(final String key) { return null; } + /**{@inheritDoc}*/ @Override URL getConnUrl() { return getUrl(); } - @Override - String getConnRequestMethod() { - return getMethod(); - } - + /**{@inheritDoc}*/ @Override Integer getConnResponseCode() throws IOException { return getStatusCode(); } + /**{@inheritDoc}*/ @Override String getConnResponseMessage() throws IOException { return getStatusDescription(); } + /**{@inheritDoc}*/ + @Override public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { try { if (!isPayloadRequest) { prepareRequest(); + LOG.debug("Sending request: {}", httpRequestBase); httpResponse = executeRequest(); + LOG.debug("Request sent: {}; response {}", httpRequestBase, + httpResponse); } parseResponseHeaderAndBody(buffer, offset, length); } finally { @@ -160,62 +216,82 @@ public void processResponse(final byte[] buffer, } } + /** + * Parse response stream for headers and body. + * + * @param buffer byte array to store response body. + * @param offset offset in the buffer to start storing the response body. + * @param length length of the response body. + * + * @throws IOException network error while read response stream + */ @VisibleForTesting void parseResponseHeaderAndBody(final byte[] buffer, final int offset, final int length) throws IOException { - setStatusCode(parseStatusCode(httpResponse)); + statusCode = parseStatusCode(httpResponse); - setStatusDescription(httpResponse.getStatusLine().getReasonPhrase()); + statusDescription = httpResponse.getStatusLine().getReasonPhrase(); - String requestId = getResponseHeader( + requestId = getResponseHeader( HttpHeaderConfigurations.X_MS_REQUEST_ID); if (requestId == null) { requestId = AbfsHttpConstants.EMPTY_STRING; } - setRequestId(requestId); // dump the headers - AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", - getResponseHeaders(httpResponse)); + if (LOG.isDebugEnabled()) { + AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", + getRequestProperties()); + } parseResponse(buffer, offset, length); } + /** + * Parse status code from response + * + * @param httpResponse response object + * @return status code + */ @VisibleForTesting int parseStatusCode(HttpResponse httpResponse) { return httpResponse.getStatusLine().getStatusCode(); } + /** + * Execute network call for the request + * + * @return response object + * @throws IOException network error while executing the request + */ @VisibleForTesting HttpResponse executeRequest() throws IOException { AbfsManagedHttpClientContext abfsHttpClientContext - = setFinalAbfsClientContext(); - HttpResponse response - = AbfsApacheHttpClient.getClient().execute(httpRequestBase, - abfsHttpClientContext, getConnectionTimeout(), getReadTimeout()); - setConnectionTimeMs(abfsHttpClientContext.getConnectTime()); - setSendRequestTimeMs(abfsHttpClientContext.getSendTime()); - setRecvResponseTimeMs(abfsHttpClientContext.getReadTime()); - return response; - } - - private Map> getResponseHeaders(final HttpResponse httpResponse) { - if (httpResponse == null || httpResponse.getAllHeaders() == null) { - return new HashMap<>(); - } - Map> map = new HashMap<>(); - for (Header header : httpResponse.getAllHeaders()) { - map.put(header.getName(), new ArrayList( - Collections.singleton(header.getValue()))); + = getHttpClientContext(); + try { + LOG.debug("Executing request: {}", httpRequestBase); + HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase, + abfsHttpClientContext, getConnectionTimeout(), getReadTimeout()); + connectionTimeMs = abfsHttpClientContext.getConnectTime(); + sendRequestTimeMs = abfsHttpClientContext.getSendTime(); + recvResponseTimeMs = abfsHttpClientContext.getReadTime(); + return response; + } catch (IOException e) { + LOG.debug("Failed to execute request: {}", httpRequestBase, e); + throw e; } - return map; } + /**{@inheritDoc}*/ @Override public void setRequestProperty(final String key, final String value) { - setHeader(key, value); + List headers = getRequestHeaders(); + if (headers != null) { + headers.add(new AbfsHttpHeader(key, value)); + } } + /**{@inheritDoc}*/ @Override Map> getRequestProperties() { Map> map = new HashMap<>(); @@ -228,6 +304,7 @@ Map> getRequestProperties() { return map; } + /**{@inheritDoc}*/ @Override public String getResponseHeader(final String headerName) { if (httpResponse == null) { @@ -240,8 +317,9 @@ public String getResponseHeader(final String headerName) { return null; } + /**{@inheritDoc}*/ @Override - InputStream getContentInputStream() + protected InputStream getContentInputStream() throws IOException { if (httpResponse == null || httpResponse.getEntity() == null) { return null; @@ -249,6 +327,8 @@ InputStream getContentInputStream() return httpResponse.getEntity().getContent(); } + /**{@inheritDoc}*/ + @Override public void sendPayload(final byte[] buffer, final int offset, final int length) @@ -257,25 +337,7 @@ public void sendPayload(final byte[] buffer, return; } - switch (getMethod()) { - case HTTP_METHOD_PUT: - httpRequestBase = new HttpPut(getUri()); - break; - case HTTP_METHOD_PATCH: - httpRequestBase = new HttpPatch(getUri()); - break; - case HTTP_METHOD_POST: - httpRequestBase = new HttpPost(getUri()); - break; - default: - /* - * This should never happen as the method is already validated in - * isPayloadRequest. - */ - return; - } - - setExpectedBytesToBeSent(length); + expectedBytesToBeSent = length; if (buffer != null) { HttpEntity httpEntity = new ByteArrayEntity(buffer, offset, length, TEXT_PLAIN); @@ -283,8 +345,9 @@ public void sendPayload(final byte[] buffer, httpEntity); } - translateHeaders(httpRequestBase, getRequestHeaders()); + prepareRequest(); try { + LOG.debug("Sending request: {}", httpRequestBase); httpResponse = executeRequest(); } catch (AbfsApacheHttpExpect100Exception ex) { LOG.debug( @@ -294,55 +357,32 @@ public void sendPayload(final byte[] buffer, ex); connectionDisconnectedOnError = true; httpResponse = ex.getHttpResponse(); + } catch (IOException ex) { + LOG.debug("Getting output stream failed for uri {}, exception: {}", + getMaskedUrl(), ex); + throw ex; } finally { + if (httpResponse != null) { + LOG.debug("Request sent: {}; response {}", httpRequestBase, + httpResponse); + } if (!connectionDisconnectedOnError && httpRequestBase instanceof HttpEntityEnclosingRequestBase) { - setBytesSent(length); + bytesSent = length; } } } - private void prepareRequest() throws IOException { - switch (getMethod()) { - case HTTP_METHOD_GET: - httpRequestBase = new HttpGet(getUri()); - break; - case HTTP_METHOD_DELETE: - httpRequestBase = new HttpDelete(getUri()); - break; - case HTTP_METHOD_HEAD: - httpRequestBase = new HttpHead(getUri()); - break; - default: - /* - * This would not happen as the AbfsClient would always be sending valid - * method. - */ - throw new PathIOException(getUrl().toString(), - "Unsupported HTTP method: " + getMethod()); - } - translateHeaders(httpRequestBase, getRequestHeaders()); - } - - private URI getUri() throws IOException { - try { - return getUrl().toURI(); - } catch (URISyntaxException e) { - throw new IOException(e); - } - } - - private void translateHeaders(final HttpRequestBase httpRequestBase, - final List requestHeaders) { - for (AbfsHttpHeader header : requestHeaders) { + /** + * Sets the header on the request. + */ + private void prepareRequest() { + for (AbfsHttpHeader header : getRequestHeaders()) { httpRequestBase.setHeader(header.getName(), header.getValue()); } } - private void setHeader(String name, String val) { - addHeaderToRequestHeaderList(new AbfsHttpHeader(name, val)); - } - + /**{@inheritDoc}*/ @Override public String getRequestProperty(String name) { for (AbfsHttpHeader header : getRequestHeaders()) { @@ -353,22 +393,9 @@ public String getRequestProperty(String name) { return EMPTY_STRING; } - @Override - boolean getConnectionDisconnectedOnError() { - return connectionDisconnectedOnError; - } - + /**{@inheritDoc}*/ @Override public String getTracingContextSuffix() { return APACHE_IMPL; } - - public String getClientRequestId() { - for (AbfsHttpHeader header : getRequestHeaders()) { - if (X_MS_CLIENT_REQUEST_ID.equals(header.getName())) { - return header.getValue(); - } - } - return EMPTY_STRING; - } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java index 9257947d7e30c..1484df299765c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; @@ -38,41 +39,44 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTP_SCHEME; import static org.apache.http.conn.ssl.SSLConnectionSocketFactory.getDefaultHostnameVerifier; -final class AbfsApacheHttpClient { +/** + * Client for AzureBlobFileSystem to execute HTTP requests over ApacheHttpClient. + */ +final class AbfsApacheHttpClient implements Closeable { + /** + * ApacheHttpClient instance that executes HTTP request. + */ private final CloseableHttpClient httpClient; - private static AbfsApacheHttpClient abfsApacheHttpClient = null; - + /** + * Flag to indicate if the client is usable. This is a JVM level flag, state of + * this flag is shared across all instances of fileSystems. Once switched off, + * the ApacheHttpClient would not be used for whole JVM lifecycle. + */ private static boolean usable = true; + /** + * Registers the switch off of ApacheHttpClient for all future use in the JVM. + */ static void registerFallback() { usable = false; } + /** + * @return if ApacheHttpClient is usable. + */ static boolean usable() { return usable; } - static synchronized void setClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory, - final int readTimeout) { - if (abfsApacheHttpClient == null) { - abfsApacheHttpClient = new AbfsApacheHttpClient( - delegatingSSLSocketFactory, readTimeout); - } - } - - static AbfsApacheHttpClient getClient() { - return abfsApacheHttpClient; - } - - private AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory, - final int readTimeout) { + public AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory, + final int readTimeout, final KeepAliveCache keepAliveCache) { final AbfsConnectionManager connMgr = new AbfsConnectionManager( createSocketFactoryRegistry( new SSLConnectionSocketFactory(delegatingSSLSocketFactory, getDefaultHostnameVerifier())), - new AbfsConnFactory()); + new AbfsHttpClientConnectionFactory(), keepAliveCache); final HttpClientBuilder builder = HttpClients.custom(); builder.setConnectionManager(connMgr) .setRequestExecutor(new AbfsManagedHttpRequestExecutor(readTimeout)) @@ -88,12 +92,24 @@ private AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFacto httpClient = builder.build(); } + @Override public void close() throws IOException { if (httpClient != null) { httpClient.close(); } } + /** + * Executes the HTTP request. + * + * @param httpRequest HTTP request to execute. + * @param abfsHttpClientContext HttpClient context. + * @param connectTimeout Connection timeout. + * @param readTimeout Read timeout. + * + * @return HTTP response. + * @throws IOException network error. + */ public HttpResponse execute(HttpRequestBase httpRequest, final AbfsManagedHttpClientContext abfsHttpClientContext, final int connectTimeout, @@ -106,8 +122,13 @@ public HttpResponse execute(HttpRequestBase httpRequest, return httpClient.execute(httpRequest, abfsHttpClientContext); } - - private static Registry createSocketFactoryRegistry( + /** + * Creates the socket factory registry for HTTP and HTTPS. + * + * @param sslSocketFactory SSL socket factory. + * @return Socket factory registry. + */ + private Registry createSocketFactoryRegistry( ConnectionSocketFactory sslSocketFactory) { if (sslSocketFactory == null) { return RegistryBuilder.create() diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 74894a4335583..dfa3abdc4418e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -143,6 +143,10 @@ public class AbfsClient implements Closeable { private boolean isSendMetricCall; private SharedKeyCredentials metricSharedkeyCredentials = null; + private KeepAliveCache keepAliveCache; + + private AbfsApacheHttpClient abfsApacheHttpClient; + /** * logging the rename failure if metadata is in an incomplete state. */ @@ -193,10 +197,12 @@ private AbfsClient(final URL baseUrl, } if (abfsConfiguration.getPreferredHttpOperationType() == HttpOperationType.APACHE_HTTP_CLIENT) { - KeepAliveCache.getInstance().setAbfsConfig(abfsConfiguration); - AbfsApacheHttpClient.setClient( + keepAliveCache = new KeepAliveCache(abfsConfiguration); + + abfsApacheHttpClient = new AbfsApacheHttpClient( DelegatingSSLSocketFactory.getDefaultFactory(), - abfsConfiguration.getHttpReadTimeout()); + abfsConfiguration.getHttpReadTimeout(), + keepAliveCache); } this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); @@ -266,6 +272,12 @@ public void close() throws IOException { runningTimerTask.cancel(); timer.purge(); } + if (keepAliveCache != null) { + keepAliveCache.close(); + } + if (abfsApacheHttpClient != null) { + abfsApacheHttpClient.close(); + } if (tokenProvider instanceof Closeable) { IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); @@ -1228,7 +1240,8 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive, this, HTTP_METHOD_DELETE, url, - requestHeaders, abfsConfiguration); + requestHeaders, + abfsConfiguration); try { op.execute(tracingContext); } catch (AzureBlobFileSystemException e) { @@ -2008,4 +2021,9 @@ AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType url, requestHeaders, sasTokenForReuse, abfsConfiguration); } + + @VisibleForTesting + AbfsApacheHttpClient getAbfsApacheHttpClient() { + return abfsApacheHttpClient; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java index 3e6a1b20f77ac..6baa2121203bd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java @@ -22,6 +22,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.http.HttpClientConnection; import org.apache.http.config.Registry; import org.apache.http.config.SocketConfig; @@ -36,39 +39,65 @@ import org.apache.http.protocol.HttpContext; /** - * AbfsConnectionManager is a custom implementation of {@link HttpClientConnectionManager}. + * AbfsConnectionManager is a custom implementation of {@code HttpClientConnectionManager}. * This implementation manages connection-pooling heuristics and custom implementation * of {@link ManagedHttpClientConnectionFactory}. */ class AbfsConnectionManager implements HttpClientConnectionManager { - private final KeepAliveCache kac = KeepAliveCache.getInstance(); + private static final Logger LOG = LoggerFactory.getLogger( + AbfsConnectionManager.class); + + /** + * Connection pool for the ABFS managed connections. + */ + private final KeepAliveCache kac; - private final AbfsConnFactory httpConnectionFactory; + /** + * Factory to create new connections. + */ + private final AbfsHttpClientConnectionFactory httpConnectionFactory; + /** + * Operator to manage the network connection state of ABFS managed connections. + */ private final HttpClientConnectionOperator connectionOperator; AbfsConnectionManager(Registry socketFactoryRegistry, - AbfsConnFactory connectionFactory) { + AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) { this.httpConnectionFactory = connectionFactory; - connectionOperator = new DefaultHttpClientConnectionOperator( + this.kac = kac; + this.connectionOperator = new DefaultHttpClientConnectionOperator( socketFactoryRegistry, null, null); } + /** + * Returns a custom implementation of connection request for the given route. + * The implementation would return a connection from the {@link KeepAliveCache} if available, + * else it would create a new non-connected {@link AbfsManagedApacheHttpConnection}. + */ @Override public ConnectionRequest requestConnection(final HttpRoute route, final Object state) { return new ConnectionRequest() { + + /** + * Synchronously gets a connection from the {@link KeepAliveCache} or + * creates a new un-connected instance of {@link AbfsManagedApacheHttpConnection}. + */ @Override public HttpClientConnection get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException { + LOG.debug("Connection requested"); try { - HttpClientConnection clientConn = kac.get(route); + HttpClientConnection clientConn = kac.get(); if (clientConn != null) { + LOG.debug("Connection retrieved from KAC: {}", clientConn); return clientConn; } + LOG.debug("Creating new connection"); return httpConnectionFactory.create(route, null); } catch (IOException ex) { throw new ExecutionException(ex); @@ -100,28 +129,30 @@ public void releaseConnection(final HttpClientConnection conn, return; } if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) { - HttpRoute route = ((AbfsManagedApacheHttpConnection) conn).getHttpRoute(); - if (route != null) { - kac.put(route, conn); - } + kac.put(conn); + LOG.debug("Connection released: {}", conn); } } + /**{@inheritDoc}*/ @Override public void connect(final HttpClientConnection conn, final HttpRoute route, final int connectTimeout, final HttpContext context) throws IOException { long start = System.currentTimeMillis(); + LOG.debug("Connecting {} to {}", conn, route.getTargetHost()); connectionOperator.connect((AbfsManagedApacheHttpConnection) conn, route.getTargetHost(), route.getLocalSocketAddress(), connectTimeout, SocketConfig.DEFAULT, context); + LOG.debug("Connection established: {}", conn); if (context instanceof AbfsManagedHttpClientContext) { ((AbfsManagedHttpClientContext) context).setConnectTime( System.currentTimeMillis() - start); } } + /**{@inheritDoc}*/ @Override public void upgrade(final HttpClientConnection conn, final HttpRoute route, @@ -130,6 +161,7 @@ public void upgrade(final HttpClientConnection conn, route.getTargetHost(), context); } + /**{@inheritDoc}*/ @Override public void routeComplete(final HttpClientConnection conn, final HttpRoute route, @@ -137,19 +169,22 @@ public void routeComplete(final HttpClientConnection conn, } + /**{@inheritDoc}*/ @Override public void closeIdleConnections(final long idletime, final TimeUnit timeUnit) { - + kac.evictIdleConnection(); } + /**{@inheritDoc}*/ @Override public void closeExpiredConnections() { - + kac.evictIdleConnection(); } + /**{@inheritDoc}*/ @Override public void shutdown() { - + kac.close(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientConnectionFactory.java similarity index 80% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnFactory.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientConnectionFactory.java index 6f440eef005bd..82a2440bca13d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnFactory.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientConnectionFactory.java @@ -28,8 +28,15 @@ * {@link ManagedHttpClientConnectionFactory#create(HttpRoute, ConnectionConfig)} to return * {@link AbfsManagedApacheHttpConnection}. */ -public class AbfsConnFactory extends ManagedHttpClientConnectionFactory { +public class AbfsHttpClientConnectionFactory extends ManagedHttpClientConnectionFactory { + /** + * Creates a new {@link AbfsManagedApacheHttpConnection} instance which has to + * be connected. + * @param route route for which connection is required. + * @param config connection configuration. + * @return new {@link AbfsManagedApacheHttpConnection} instance. + */ @Override public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index f968010cc2a7d..ccde9b03e9595 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,7 +32,9 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; @@ -60,27 +63,41 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable { private final URL url; private String maskedUrl; private String maskedEncodedUrl; - private int statusCode; - private String statusDescription; + protected int statusCode; + protected String statusDescription; private String storageErrorCode = ""; private String storageErrorMessage = ""; - private String requestId = ""; + protected String requestId = ""; private String expectedAppendPos = ""; private ListResultSchema listResultSchema = null; // metrics - private int bytesSent; - private int expectedBytesToBeSent; - private long bytesReceived; + protected int bytesSent; + protected int expectedBytesToBeSent; + protected long bytesReceived; - private long connectionTimeMs; - private long sendRequestTimeMs; - private long recvResponseTimeMs; + protected long connectionTimeMs; + protected long sendRequestTimeMs; + protected long recvResponseTimeMs; private boolean shouldMask = false; + protected boolean connectionDisconnectedOnError = false; + /**Request headers to be sent in the request.*/ private final List requestHeaders; - private final int connectionTimeout, readTimeout; + /** + * Timeout that defines maximum allowed connection establishment time for a request. + * Timeout is in milliseconds. Not all requests need to establish a new connection, + * it depends on the connection pooling-heuristic of the networking library. + */ + private final int connectionTimeout; + + /** + * Timeout in milliseconds that defines maximum allowed time to read the response. + * This timeout starts once request is sent. It includes server reponse time, + * network latency, and time to read the response. + */ + private final int readTimeout; public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, @@ -91,15 +108,18 @@ public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( return httpOp; } - public AbfsHttpOperation(final Logger log, final URL url, final String method, - final List requestHeaders, final int connectionTimeout, - final int readTimeout) { + public AbfsHttpOperation( + final Logger log, + final URL url,final String method, + final List requestHeaders, + final Duration connectionTimeout, + final Duration readTimeout) { this.log = log; this.url = url; this.method = method; this.requestHeaders = requestHeaders; - this.connectionTimeout = connectionTimeout; - this.readTimeout = readTimeout; + this.connectionTimeout = (int) connectionTimeout.toMillis(); + this.readTimeout = (int) readTimeout.toMillis(); } /** @@ -111,7 +131,7 @@ public AbfsHttpOperation(final Logger log, final URL url, final String method, protected AbfsHttpOperation(final URL url, final String method, final int httpStatus) { - this.log = null; + this.log = LoggerFactory.getLogger(AbfsHttpOperation.class); this.url = url; this.method = method; this.statusCode = httpStatus; @@ -132,12 +152,6 @@ List getRequestHeaders() { return requestHeaders; } - void addHeaderToRequestHeaderList(AbfsHttpHeader abfsHttpHeader) { - if (requestHeaders != null) { - requestHeaders.add(abfsHttpHeader); - } - } - public String getMethod() { return method; } @@ -162,7 +176,9 @@ public String getStorageErrorMessage() { return storageErrorMessage; } - public abstract String getClientRequestId(); + public String getClientRequestId() { + return getRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID); + } public String getExpectedAppendPos() { return expectedAppendPos; @@ -196,40 +212,14 @@ public ListResultSchema getListResultSchema() { return listResultSchema; } + /** + * Get response header value for the given headerKey. + * + * @param httpHeader header key. + * @return header value. + */ public abstract String getResponseHeader(String httpHeader); - void setExpectedBytesToBeSent(int expectedBytesToBeSent) { - this.expectedBytesToBeSent = expectedBytesToBeSent; - } - - void setStatusCode(int statusCode) { - this.statusCode = statusCode; - } - - void setStatusDescription(String statusDescription) { - this.statusDescription = statusDescription; - } - - void setBytesSent(int bytesSent) { - this.bytesSent = bytesSent; - } - - void setSendRequestTimeMs(long sendRequestTimeMs) { - this.sendRequestTimeMs = sendRequestTimeMs; - } - - void setRecvResponseTimeMs(long recvResponseTimeMs) { - this.recvResponseTimeMs = recvResponseTimeMs; - } - - void setRequestId(String requestId) { - this.requestId = requestId; - } - - void setConnectionTimeMs(long connectionTimeMs) { - this.connectionTimeMs = connectionTimeMs; - } - // Returns a trace message for the request @Override public String toString() { @@ -292,6 +282,7 @@ public String getLogString() { return sb.toString(); } + @VisibleForTesting public String getMaskedUrl() { if (!shouldMask) { return url.toString(); @@ -303,7 +294,7 @@ public String getMaskedUrl() { return maskedUrl; } - public String getMaskedEncodedUrl() { + public final String getMaskedEncodedUrl() { if (maskedEncodedUrl != null) { return maskedEncodedUrl; } @@ -311,16 +302,52 @@ public String getMaskedEncodedUrl() { return maskedEncodedUrl; } + /** + * Sends the HTTP request. Note that HttpUrlConnection requires that an + * empty buffer be sent in order to set the "Content-Length: 0" header, which + * is required by our endpoint. + * + * @param buffer the request entity body. + * @param offset an offset into the buffer where the data beings. + * @param length the length of the data in the buffer. + * + * @throws IOException if an error occurs. + */ + public abstract void sendPayload(byte[] buffer, int offset, int length) throws IOException; + /** + * Gets and processes the HTTP response. + * + * @param buffer a buffer to hold the response entity body + * @param offset an offset in the buffer where the data will being. + * @param length the number of bytes to be written to the buffer. + * + * @throws IOException if an error occurs. + */ public abstract void processResponse(byte[] buffer, int offset, int length) throws IOException; + /** + * Set request header. + * + * @param key header key. + * @param value header value. + */ public abstract void setRequestProperty(String key, String value); - void parseResponse(final byte[] buffer, + /** + * Parse response body from the connection. + * + * @param buffer byte array to store the response body. + * @param offset offset in the buffer. + * @param length length of the response body. + * + * @throws IOException if network error occurs while reading the response. + */ + final void parseResponse(final byte[] buffer, final int offset, final int length) throws IOException { long startTime; @@ -391,7 +418,12 @@ void parseResponse(final byte[] buffer, } } - abstract InputStream getContentInputStream() throws IOException; + /** + * Get the response stream from the connection. + * @return InputStream: response stream from the connection after network call. + * @throws IOException if the response stream could not be created from the connection. + */ + protected abstract InputStream getContentInputStream() throws IOException; /** * When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex); @@ -410,7 +442,7 @@ void parseResponse(final byte[] buffer, * } * */ - protected void processStorageErrorResponse() { + private void processStorageErrorResponse() { try (InputStream stream = getErrorStream()) { if (stream == null) { return; @@ -452,6 +484,11 @@ protected void processStorageErrorResponse() { } } + /** + * Get the error stream from the connection. + * @return InputStream + * @throws IOException: if the error stream could not be created from the response stream. + */ protected abstract InputStream getErrorStream() throws IOException; /** @@ -460,7 +497,7 @@ protected void processStorageErrorResponse() { * @param stream InputStream contains the list results. * @throws IOException if the response cannot be deserialized. */ - protected void parseListFilesResponse(final InputStream stream) + private void parseListFilesResponse(final InputStream stream) throws IOException { if (stream == null) { return; @@ -484,7 +521,7 @@ protected void parseListFilesResponse(final InputStream stream) /** * Returns the elapsed time in milliseconds. */ - long elapsedTimeMs(final long startTime) { + final long elapsedTimeMs(final long startTime) { return (System.nanoTime() - startTime) / ONE_MILLION; } @@ -492,7 +529,7 @@ long elapsedTimeMs(final long startTime) { * Check null stream, this is to pass findbugs's redundant check for NULL * @param stream InputStream */ - boolean isNullInputStream(InputStream stream) { + private boolean isNullInputStream(InputStream stream) { return stream == null ? true : false; } @@ -509,12 +546,6 @@ boolean isNullInputStream(InputStream stream) { */ abstract URL getConnUrl(); - /** - * Gets the connection request method. - * @return request method. - */ - abstract String getConnRequestMethod(); - /** * Gets the connection response code. * @return response code. @@ -530,19 +561,37 @@ boolean isNullInputStream(InputStream stream) { */ abstract String getConnResponseMessage() throws IOException; + /** + * Get request headers. + * + * @return request headers. + */ abstract Map> getRequestProperties(); + /** + * Get request header value for a header name. + * + * @param headerName header name. + * @return header value. + */ abstract String getRequestProperty(String headerName); - abstract boolean getConnectionDisconnectedOnError(); + boolean getConnectionDisconnectedOnError() { + return connectionDisconnectedOnError; + } + /** + * Get the suffix to add to the tracing context that defines what http-client is + * used to make the network call + * @return the suffix to distinguish http client + */ public abstract String getTracingContextSuffix(); - public long getSendLatency() { + public final long getSendLatency() { return sendRequestTimeMs; } - public long getRecvLatency() { + public final long getRecvLatency() { return recvResponseTimeMs; } @@ -561,11 +610,6 @@ public AbfsHttpOperationWithFixedResult(final URL url, super(url, method, httpStatus); } - @Override - public String getClientRequestId() { - return null; - } - @Override public void processResponse(final byte[] buffer, final int offset, @@ -580,7 +624,7 @@ public void setRequestProperty(final String key, final String value) { } @Override - InputStream getContentInputStream() throws IOException { + protected InputStream getContentInputStream() throws IOException { return null; } @@ -599,11 +643,6 @@ URL getConnUrl() { return null; } - @Override - String getConnRequestMethod() { - return null; - } - @Override Integer getConnResponseCode() throws IOException { return null; @@ -624,11 +663,6 @@ String getRequestProperty(final String headerName) { return null; } - @Override - boolean getConnectionDisconnectedOnError() { - return false; - } - @Override public String getTracingContextSuffix() { return null; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java index eb09153ec6f12..872fd268567d0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java @@ -24,6 +24,7 @@ import java.net.HttpURLConnection; import java.net.ProtocolException; import java.net.URL; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -53,9 +54,7 @@ public class AbfsJdkHttpOperation extends AbfsHttpOperation { private static final Logger LOG = LoggerFactory.getLogger( AbfsJdkHttpOperation.class); - private HttpURLConnection connection; - - private boolean connectionDisconnectedOnError = false; + private final HttpURLConnection connection; /** * Initializes a new HTTP request and opens the connection. @@ -70,8 +69,8 @@ public class AbfsJdkHttpOperation extends AbfsHttpOperation { public AbfsJdkHttpOperation(final URL url, final String method, final List requestHeaders, - final int connectionTimeout, - final int readTimeout) + final Duration connectionTimeout, + final Duration readTimeout) throws IOException { super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout); @@ -85,8 +84,8 @@ public AbfsJdkHttpOperation(final URL url, } } - this.connection.setConnectTimeout(connectionTimeout); - this.connection.setReadTimeout(readTimeout); + this.connection.setConnectTimeout(getConnectionTimeout()); + this.connection.setReadTimeout(getReadTimeout()); this.connection.setRequestMethod(method); for (AbfsHttpHeader header : requestHeaders) { @@ -94,30 +93,12 @@ public AbfsJdkHttpOperation(final URL url, } } - protected HttpURLConnection getConnection() { - return connection; - } - - public String getClientRequestId() { - return this.connection - .getRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID); - } - + /**{@inheritDoc}*/ public String getResponseHeader(String httpHeader) { return connection.getHeaderField(httpHeader); } - /** - * Sends the HTTP request. Note that HttpUrlConnection requires that an - * empty buffer be sent in order to set the "Content-Length: 0" header, which - * is required by our endpoint. - * - * @param buffer the request entity body. - * @param offset an offset into the buffer where the data beings. - * @param length the length of the data in the buffer. - * - * @throws IOException if an error occurs. - */ + /**{@inheritDoc}*/ public void sendPayload(byte[] buffer, int offset, int length) throws IOException { this.connection.setDoOutput(true); @@ -136,7 +117,7 @@ public void sendPayload(byte[] buffer, int offset, int length) startTime = System.nanoTime(); OutputStream outputStream = null; // Updates the expected bytes to be sent based on length. - setExpectedBytesToBeSent(length); + expectedBytesToBeSent = length; try { try { /* Without expect header enabled, if getOutputStream() throws @@ -166,8 +147,8 @@ public void sendPayload(byte[] buffer, int offset, int length) * case of Expect-100 error. Reason being, in JDK, it stores the responseCode * in the HttpUrlConnection object before throwing exception to the caller. */ - setStatusCode(getConnResponseCode()); - setStatusDescription(getConnResponseMessage()); + statusCode = getConnResponseCode(); + statusDescription = getConnResponseMessage(); return; } else { LOG.debug( @@ -178,7 +159,7 @@ public void sendPayload(byte[] buffer, int offset, int length) } // update bytes sent for successful as well as failed attempts via the // accompanying statusCode. - setBytesSent(length); + bytesSent = length; // If this fails with or without expect header enabled, // it throws an IOException. @@ -188,34 +169,29 @@ public void sendPayload(byte[] buffer, int offset, int length) if (outputStream != null) { outputStream.close(); } - setSendRequestTimeMs(elapsedTimeMs(startTime)); + sendRequestTimeMs = elapsedTimeMs(startTime); } } + /**{@inheritDoc}*/ @Override String getRequestProperty(final String headerName) { return connection.getRequestProperty(headerName); } + /**{@inheritDoc}*/ @Override Map> getRequestProperties() { return connection.getRequestProperties(); } + /**{@inheritDoc}*/ @Override - InputStream getContentInputStream() throws IOException { + protected InputStream getContentInputStream() throws IOException { return connection.getInputStream(); } - /** - * Gets and processes the HTTP response. - * - * @param buffer a buffer to hold the response entity body - * @param offset an offset in the buffer where the data will being. - * @param length the number of bytes to be written to the buffer. - * - * @throws IOException if an error occurs. - */ + /**{@inheritDoc}*/ public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { @@ -227,6 +203,16 @@ public void processResponse(final byte[] buffer, processConnHeadersAndInputStreams(buffer, offset, length); } + /** + * Parses headers and body of the response. Execute server call if {@link #sendPayload(byte[], int, int)} + * is not called. + * + * @param buffer buffer to store the response body. + * @param offset offset in the buffer. + * @param length length of the response body. + * + * @throws IOException network error or parsing error. + */ void processConnHeadersAndInputStreams(final byte[] buffer, final int offset, final int length) throws IOException { @@ -234,17 +220,16 @@ void processConnHeadersAndInputStreams(final byte[] buffer, long startTime = 0; startTime = System.nanoTime(); - setStatusCode(getConnResponseCode()); - setRecvResponseTimeMs(elapsedTimeMs(startTime)); + statusCode = getConnResponseCode(); + recvResponseTimeMs = elapsedTimeMs(startTime); - setStatusDescription(getConnResponseMessage()); + statusDescription = getConnResponseMessage(); - String requestId = this.connection.getHeaderField( + requestId = this.connection.getHeaderField( HttpHeaderConfigurations.X_MS_REQUEST_ID); if (requestId == null) { requestId = AbfsHttpConstants.EMPTY_STRING; } - setRequestId(requestId); // dump the headers AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", @@ -258,12 +243,15 @@ void processConnHeadersAndInputStreams(final byte[] buffer, parseResponse(buffer, offset, length); } + /**{@inheritDoc}*/ public void setRequestProperty(String key, String value) { this.connection.setRequestProperty(key, value); } /** - * Open the HTTP connection. + * Creates a new {@link HttpURLConnection} instance. This instance is not connected. + * Any API call on the instance would make it reuse an existing connection or + * establish a new connection. * * @throws IOException if an error occurs. */ @@ -272,45 +260,27 @@ private HttpURLConnection openConnection() throws IOException { try { return (HttpURLConnection) getUrl().openConnection(); } finally { - setConnectionTimeMs(elapsedTimeMs(start)); + connectionTimeMs = (elapsedTimeMs(start)); } } + /**{@inheritDoc}*/ @Override protected InputStream getErrorStream() { return connection.getErrorStream(); } - /** - * Gets the connection request property for a key. - * @param key The request property key. - * @return request peoperty value. - */ + /**{@inheritDoc}*/ String getConnProperty(String key) { return connection.getRequestProperty(key); } - /** - * Gets the connection url. - * @return url. - */ + /**{@inheritDoc}*/ URL getConnUrl() { return connection.getURL(); } - /** - * Gets the connection request method. - * @return request method. - */ - String getConnRequestMethod() { - return connection.getRequestMethod(); - } - - /** - * Gets the connection response code. - * @return response code. - * @throws IOException - */ + /**{@inheritDoc}*/ Integer getConnResponseCode() throws IOException { return connection.getResponseCode(); } @@ -318,26 +288,18 @@ Integer getConnResponseCode() throws IOException { /** * Gets the connection output stream. * @return output stream. - * @throws IOException + * @throws IOException if creating outputStream on connection failed */ OutputStream getConnOutputStream() throws IOException { return connection.getOutputStream(); } - /** - * Gets the connection response message. - * @return response message. - * @throws IOException - */ + /**{@inheritDoc}*/ String getConnResponseMessage() throws IOException { return connection.getResponseMessage(); } - @VisibleForTesting - boolean getConnectionDisconnectedOnError() { - return connectionDisconnectedOnError; - } - + /**{@inheritDoc}*/ @Override public String getTracingContextSuffix() { return AbfsApacheHttpClient.usable() ? JDK_IMPL : JDK_FALLBACK; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedApacheHttpConnection.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedApacheHttpConnection.java index 1c59b5cc99205..9f79ccd4dbe17 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedApacheHttpConnection.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedApacheHttpConnection.java @@ -39,10 +39,14 @@ class AbfsManagedApacheHttpConnection implements ManagedHttpClientConnection { + /** + * Underlying ApacheHttpClient connection that actually does the work over network. + */ private final ManagedHttpClientConnection httpClientConnection; - private final HttpRoute httpRoute; - + /** + * Managed HTTP context to track the connection level activity. + */ private AbfsManagedHttpClientContext managedHttpContext; private final int hashCode; @@ -50,54 +54,61 @@ class AbfsManagedApacheHttpConnection AbfsManagedApacheHttpConnection(ManagedHttpClientConnection conn, final HttpRoute route) { this.httpClientConnection = conn; - this.httpRoute = route; this.hashCode = (UUID.randomUUID().toString() + httpClientConnection.getId()).hashCode(); } - HttpRoute getHttpRoute() { - return httpRoute; - } - + /** + * Sets the managed HTTP context to track the connection level activity. + */ void setManagedHttpContext(AbfsManagedHttpClientContext managedHttpContext) { this.managedHttpContext = managedHttpContext; } + /**{@inheritDoc}*/ @Override public void close() throws IOException { httpClientConnection.close(); } + /**{@inheritDoc}*/ + @Override public boolean isOpen() { return httpClientConnection.isOpen(); } + /**{@inheritDoc}*/ @Override public boolean isStale() { return httpClientConnection.isStale(); } + /**{@inheritDoc}*/ @Override public void setSocketTimeout(final int timeout) { httpClientConnection.setSocketTimeout(timeout); } + /**{@inheritDoc}*/ @Override public int getSocketTimeout() { return httpClientConnection.getSocketTimeout(); } + /**{@inheritDoc}*/ @Override public void shutdown() throws IOException { httpClientConnection.shutdown(); } + /**{@inheritDoc}*/ @Override public HttpConnectionMetrics getMetrics() { return httpClientConnection.getMetrics(); } + /**{@inheritDoc}*/ @Override public boolean isResponseAvailable(final int timeout) throws IOException { long start = System.currentTimeMillis(); @@ -106,6 +117,7 @@ public boolean isResponseAvailable(final int timeout) throws IOException { return val; } + /**{@inheritDoc}*/ @Override public void sendRequestHeader(final HttpRequest request) throws HttpException, IOException { @@ -114,6 +126,7 @@ public void sendRequestHeader(final HttpRequest request) managedHttpContext.addSendTime(System.currentTimeMillis() - start); } + /**{@inheritDoc}*/ @Override public void sendRequestEntity(final HttpEntityEnclosingRequest request) throws HttpException, IOException { @@ -122,6 +135,7 @@ public void sendRequestEntity(final HttpEntityEnclosingRequest request) managedHttpContext.addSendTime(System.currentTimeMillis() - start); } + /**{@inheritDoc}*/ @Override public HttpResponse receiveResponseHeader() throws HttpException, IOException { @@ -131,6 +145,7 @@ public HttpResponse receiveResponseHeader() return response; } + /**{@inheritDoc}*/ @Override public void receiveResponseEntity(final HttpResponse response) throws HttpException, IOException { @@ -139,6 +154,7 @@ public void receiveResponseEntity(final HttpResponse response) managedHttpContext.addReadTime(System.currentTimeMillis() - start); } + /**{@inheritDoc}*/ @Override public void flush() throws IOException { long start = System.currentTimeMillis(); @@ -146,41 +162,49 @@ public void flush() throws IOException { managedHttpContext.addSendTime(System.currentTimeMillis() - start); } + /**{@inheritDoc}*/ @Override public String getId() { return httpClientConnection.getId(); } + /**{@inheritDoc}*/ @Override public void bind(final Socket socket) throws IOException { httpClientConnection.bind(socket); } + /**{@inheritDoc}*/ @Override public Socket getSocket() { return httpClientConnection.getSocket(); } + /**{@inheritDoc}*/ @Override public SSLSession getSSLSession() { return httpClientConnection.getSSLSession(); } + /**Gets the local address to which the socket is bound.*/ @Override public InetAddress getLocalAddress() { return httpClientConnection.getLocalAddress(); } + /**Gets the local port to which the socket is bound.*/ @Override public int getLocalPort() { return httpClientConnection.getLocalPort(); } + /**Returns the address to which the socket is connected.*/ @Override public InetAddress getRemoteAddress() { return httpClientConnection.getRemoteAddress(); } + /**Returns the remote port number to which this socket is connected.*/ @Override public int getRemotePort() { return httpClientConnection.getRemotePort(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java index b827b285ea8ea..ee3fa92159c66 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java @@ -21,12 +21,18 @@ import org.apache.http.HttpClientConnection; import org.apache.http.client.protocol.HttpClientContext; +/** + * Registers the latency of different phases of a network call. + */ public class AbfsManagedHttpClientContext extends HttpClientContext { + /**Connection establishment time*/ private long connectTime = 0L; + /**Time taken to receive and read response*/ private long readTime = 0L; + /***Time taken to send request*/ private long sendTime = 0L; public AbfsManagedHttpClientContext() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpRequestExecutor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpRequestExecutor.java index ecb591317fded..e326effafec07 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpRequestExecutor.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpRequestExecutor.java @@ -31,12 +31,20 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; +/** + * This class extends {@link HttpRequestExecutor} to intercept the connection + * activity and register the latency of different phases of a network call. It + * also overrides the HttpRequestExecutor's expect100 failure handling as the ADLS + * can send any failure statusCode in expect100 hand-shake failure and non + * necessarily 1XX code. + */ public class AbfsManagedHttpRequestExecutor extends HttpRequestExecutor { public AbfsManagedHttpRequestExecutor(final int expect100WaitTimeout) { super(expect100WaitTimeout); } + /**{@inheritDoc}*/ @Override public HttpResponse execute(final HttpRequest request, final HttpClientConnection conn, @@ -49,6 +57,7 @@ public HttpResponse execute(final HttpRequest request, return super.execute(request, conn, context); } + /**{@inheritDoc}*/ @Override protected HttpResponse doSendRequest(final HttpRequest request, final HttpClientConnection conn, @@ -65,23 +74,24 @@ protected HttpResponse doSendRequest(final HttpRequest request, context); /* - * ApacheHttpClient implementation does not raise an exception if the status - * of expect100 hand-shake is not less than 200. Although it sends payload only - * if the statusCode of the expect100 hand-shake is 100. - * - * ADLS can send any failure statusCode in exect100 handshake. So, an exception - * needs to be explicitly raised if expect100 assertion is failure but the - * ApacheHttpClient has not raised an exception. - * - * Response is only returned by this method if there is no expect100 request header - * or the expect100 assertion is failed. - */ + * ApacheHttpClient implementation does not raise an exception if the status + * of expect100 hand-shake is not less than 200. Although it sends payload only + * if the statusCode of the expect100 hand-shake is 100. + * + * ADLS can send any failure statusCode in exect100 handshake. So, an exception + * needs to be explicitly raised if expect100 assertion is failure but the + * ApacheHttpClient has not raised an exception. + * + * Response is only returned by this method if there is no expect100 request header + * or the expect100 assertion is failed. + */ if (request != null && request.containsHeader(EXPECT) && res != null) { - throw new AbfsApacheHttpExpect100Exception(EXPECT_100_JDK_ERROR, res); + throw new AbfsApacheHttpExpect100Exception(res); } return res; } + /**{@inheritDoc}*/ @Override protected HttpResponse doReceiveResponse(final HttpRequest request, final HttpClientConnection conn, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java index 226d720c841b4..313e444fabb91 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java @@ -20,6 +20,10 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsRestOperationType; +/** + * Implementation of {@link AbfsThrottlingIntercept} that does not throttle + * the ABFS process. + */ final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept { public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept(); @@ -27,11 +31,13 @@ final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept { private AbfsNoOpThrottlingIntercept() { } + /**{@inheritDoc}*/ @Override public void updateMetrics(final AbfsRestOperationType operationType, final AbfsHttpOperation httpOperation) { } + /**{@inheritDoc}*/ @Override public void sendingRequest(final AbfsRestOperationType operationType, final AbfsCounters abfsCounters) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index e889df129feec..dbf246e107a7f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -23,6 +23,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.net.UnknownHostException; +import java.time.Duration; import java.util.List; import org.slf4j.Logger; @@ -105,6 +106,9 @@ public class AbfsRestOperation { */ private TracingContext lastUsedTracingContext; + /** + * Number of retries due to IOException. + */ private int apacheHttpClientIoExceptions = 0; /** @@ -370,7 +374,7 @@ private boolean executeHttpOperation(final int retryCount, } catch (IOException e) { LOG.debug("Auth failure: {}, {}", method, url); throw new AbfsRestOperationException(-1, null, - "Auth failure: " + e.getMessage(), e, result); + "Auth failure: " + e.getMessage(), e); } try { @@ -493,6 +497,10 @@ private boolean executeHttpOperation(final int retryCount, return true; } + /** + * Registers switch off of ApacheHttpClient in case of IOException retries increases + * more than the threshold. + */ private void registerApacheHttpClientIoException() { apacheHttpClientIoExceptions++; if (apacheHttpClientIoExceptions @@ -551,24 +559,23 @@ && isApacheClientUsable()) { return createAbfsHttpOperation(); } - @VisibleForTesting - boolean isApacheClientUsable() { + private boolean isApacheClientUsable() { return AbfsApacheHttpClient.usable(); } @VisibleForTesting AbfsJdkHttpOperation createAbfsHttpOperation() throws IOException { return new AbfsJdkHttpOperation(url, method, requestHeaders, - client.getAbfsConfiguration().getHttpConnectionTimeout(), - client.getAbfsConfiguration().getHttpReadTimeout()); + Duration.ofMillis(client.getAbfsConfiguration().getHttpConnectionTimeout()), + Duration.ofMillis(client.getAbfsConfiguration().getHttpReadTimeout())); } @VisibleForTesting - AbfsAHCHttpOperation createAbfsAHCHttpOperation() { + AbfsAHCHttpOperation createAbfsAHCHttpOperation() throws IOException { return new AbfsAHCHttpOperation(url, method, requestHeaders, - client.getAbfsConfiguration().getHttpConnectionTimeout(), - client.getAbfsConfiguration().getHttpReadTimeout() - ); + Duration.ofMillis(client.getAbfsConfiguration().getHttpConnectionTimeout()), + Duration.ofMillis(client.getAbfsConfiguration().getHttpReadTimeout()), + client.getAbfsApacheHttpClient()); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java index 19bf0f25dc2e9..20001b2c70183 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java @@ -1,40 +1,21 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.hadoop.fs.azurebfs.services; +import java.io.Closeable; import java.io.IOException; -import java.io.NotSerializableException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; +import java.util.Stack; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.http.HttpClientConnection; -import org.apache.http.conn.routing.HttpRoute; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL; /** - * Connection-pooling heuristics adapted from JDK's connection pooling `KeepAliveCache` + * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each + * instance of FileSystem has its own KeepAliveCache. *

* Why this implementation is required in comparison to {@link org.apache.http.impl.conn.PoolingHttpClientConnectionManager} * connection-pooling: @@ -48,296 +29,194 @@ * number of connections it can create. * */ -public final class KeepAliveCache - extends HashMap - implements Runnable { +public class KeepAliveCache extends Stack + implements + Closeable { - private int maxConn; + /** + * Scheduled timer that evicts idle connections. + */ + private final Timer timer; - private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL; + /** + * Task provided to the timer that owns eviction logic. + */ + private final TimerTask timerTask; - private Thread keepAliveTimer = null; + /** + * Flag to indicate if the cache is closed. + */ + private boolean isClosed; - private boolean isPaused = false; + /** + * Counter to keep track of the number of KeepAliveCache instances created. + */ + private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0); - private KeepAliveCache() { - setMaxConn(); - } + /** + * Maximum number of connections that can be cached. + */ + private final int maxConn; + + /** + * Time-to-live for an idle connection. + */ + private final long connectionIdleTTL; + + /** + * Flag to indicate if the eviction thread is paused. + */ + private boolean isPaused = false; + @VisibleForTesting synchronized void pauseThread() { isPaused = true; } + @VisibleForTesting synchronized void resumeThread() { isPaused = false; - notify(); - } - - private void setMaxConn() { - String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP); - if (sysPropMaxConn == null) { - maxConn = DEFAULT_MAX_CONN_SYS_PROP; - } else { - maxConn = Integer.parseInt(sysPropMaxConn); - } - } - - public void setAbfsConfig(AbfsConfiguration abfsConfiguration) { - this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections(); - this.connectionIdleTTL = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime(); } + /** + * @return connectionIdleTTL + */ + @VisibleForTesting public long getConnectionIdleTTL() { return connectionIdleTTL; } - private static final KeepAliveCache INSTANCE = new KeepAliveCache(); - - public static KeepAliveCache getInstance() { - return INSTANCE; - } - - @VisibleForTesting - void clearThread() { - clear(); - setMaxConn(); - } - - private int getKacSize() { - return INSTANCE.maxConn; - } - - @Override - public void run() { - do { - synchronized (this) { - while (isPaused) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - } - kacCleanup(); - } while (size() > 0); - } - - private void kacCleanup() { - try { - Thread.sleep(connectionIdleTTL); - } catch (InterruptedException ex) { - return; + public KeepAliveCache(AbfsConfiguration abfsConfiguration) { + this.timer = new Timer( + String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true); + String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP); + if (sysPropMaxConn == null) { + this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections(); + } else { + maxConn = Integer.parseInt(sysPropMaxConn); } - synchronized (this) { - long currentTime = System.currentTimeMillis(); - - ArrayList keysToRemove - = new ArrayList(); - for (Map.Entry entry : entrySet()) { - KeepAliveKey key = entry.getKey(); - ClientVector v = entry.getValue(); - synchronized (v) { - int i; - - for (i = 0; i < v.size(); i++) { - KeepAliveEntry e = v.elementAt(i); - if ((currentTime - e.idleStartTime) > v.nap - || e.httpClientConnection.isStale()) { - HttpClientConnection hc = e.httpClientConnection; - closeHtpClientConnection(hc); - } else { - break; - } - } - v.subList(0, i).clear(); - - if (v.size() == 0) { - keysToRemove.add(key); + this.connectionIdleTTL + = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime(); + this.timerTask = new TimerTask() { + @Override + public void run() { + if (isPaused) { + return; } - } + evictIdleConnection(); } - - for (KeepAliveKey key : keysToRemove) { - removeVector(key); - } - } - } - - synchronized void removeVector(KeepAliveKey k) { - super.remove(k); + }; + timer.schedule(timerTask, 0, connectionIdleTTL); } - public synchronized void put(final HttpRoute httpRoute, - final HttpClientConnection httpClientConnection) { - boolean startThread = (keepAliveTimer == null); - if (!startThread) { - if (!keepAliveTimer.isAlive()) { - startThread = true; - } - } - if (startThread) { - clear(); - final KeepAliveCache cache = this; - ThreadGroup grp = Thread.currentThread().getThreadGroup(); - ThreadGroup parent = null; - while ((parent = grp.getParent()) != null) { - grp = parent; + /** + * Iterate over the cache and evict the idle connections. An idle connection is + * one that has been in the cache for more than connectionIdleTTL milliseconds. + */ + synchronized void evictIdleConnection() { + long currentTime = System.currentTimeMillis(); + int i; + for (i = 0; i < size(); i++) { + KeepAliveEntry e = elementAt(i); + if ((currentTime - e.idleStartTime) > connectionIdleTTL + || e.httpClientConnection.isStale()) { + HttpClientConnection hc = e.httpClientConnection; + closeHtpClientConnection(hc); + } else { + break; } - - keepAliveTimer = new Thread(grp, cache, "Keep-Alive-Timer"); - keepAliveTimer.setDaemon(true); - keepAliveTimer.setPriority(Thread.MAX_PRIORITY - 2); - // Set the context class loader to null in order to avoid - // keeping a strong reference to an application classloader. - keepAliveTimer.setContextClassLoader(null); - keepAliveTimer.start(); - } - - - KeepAliveKey key = new KeepAliveKey(httpRoute); - ClientVector v = super.get(key); - if (v == null) { - v = new ClientVector((int) connectionIdleTTL); - v.put(httpClientConnection); - super.put(key, v); - } else { - v.put(httpClientConnection); } + subList(0, i).clear(); } - public synchronized HttpClientConnection get(HttpRoute httpRoute) - throws IOException { + /** + * Safe close of the HttpClientConnection. + * + * @param hc HttpClientConnection to be closed + */ + private void closeHtpClientConnection(final HttpClientConnection hc) { + try { + hc.close(); + } catch (IOException ignored) { - KeepAliveKey key = new KeepAliveKey(httpRoute); - ClientVector v = super.get(key); - if (v == null) { // nothing in cache yet - return null; } - return v.get(); } - /* - * Do not serialize this class! + /** + * Close all connections in cache and cancel the eviction timer. */ - private void writeObject(java.io.ObjectOutputStream stream) + @Override + public synchronized void close() { + isClosed = true; + timerTask.cancel(); + timer.purge(); + while (!empty()) { + KeepAliveEntry e = pop(); + closeHtpClientConnection(e.httpClientConnection); + } + } + + /** + * Gets the latest added HttpClientConnection from the cache. The returned connection + * is non-stale and has been in the cache for less than connectionIdleTTL milliseconds. + * + * The cache is checked from the top of the stack. If the connection is stale or has been + * in the cache for more than connectionIdleTTL milliseconds, it is closed and the next + * connection is checked. Once a valid connection is found, it is returned. + * + * @return HttpClientConnection: if a valid connection is found, else null. + */ + public synchronized HttpClientConnection get() throws IOException { - throw new NotSerializableException(); - } - - private void readObject(java.io.ObjectInputStream stream) - throws IOException, ClassNotFoundException { - throw new NotSerializableException(); - } - - class ClientVector extends java.util.Stack { - - private static final long serialVersionUID = -8680532108106489459L; - - // sleep time in milliseconds, before cache clear - private int nap; - - ClientVector(int nap) { - this.nap = nap; + if (isClosed) { + throw new IOException("KeepAliveCache is closed"); } - - synchronized HttpClientConnection get() throws IOException { - if (empty()) { - return null; - } else { - // Loop until we find a connection that has not timed out - HttpClientConnection hc = null; - long currentTime = System.currentTimeMillis(); - do { - KeepAliveEntry e = pop(); - if ((currentTime - e.idleStartTime) > nap - || e.httpClientConnection.isStale()) { - e.httpClientConnection.close(); - } else { - hc = e.httpClientConnection; - } - } while ((hc == null) && (!empty())); - return hc; - } + if (empty()) { + return null; } - - /* return a still valid, unused HttpClient */ - synchronized void put(HttpClientConnection h) { - if (size() >= getKacSize()) { - closeHtpClientConnection(h); - return; + HttpClientConnection hc = null; + long currentTime = System.currentTimeMillis(); + do { + KeepAliveEntry e = pop(); + if ((currentTime - e.idleStartTime) > connectionIdleTTL + || e.httpClientConnection.isStale()) { + closeHtpClientConnection(e.httpClientConnection); + } else { + hc = e.httpClientConnection; } - push(new KeepAliveEntry(h, System.currentTimeMillis())); - } - - /* - * Do not serialize this class! - */ - private void writeObject(java.io.ObjectOutputStream stream) - throws IOException { - throw new NotSerializableException(); - } - - private void readObject(java.io.ObjectInputStream stream) - throws IOException, ClassNotFoundException { - throw new NotSerializableException(); - } - - @Override - public synchronized boolean equals(final Object o) { - return super.equals(o); - } - - @Override - public synchronized int hashCode() { - return super.hashCode(); - } - } - - private void closeHtpClientConnection(final HttpClientConnection h) { - try { - h.close(); - } catch (IOException ignored) { - - } + } while ((hc == null) && (!empty())); + return hc; } - - static class KeepAliveKey { - - private final HttpRoute httpRoute; - - - KeepAliveKey(HttpRoute httpRoute) { - this.httpRoute = httpRoute; - } - - /** - * Determine whether or not two objects of this type are equal - */ - @Override - public boolean equals(Object obj) { - return obj instanceof KeepAliveKey && httpRoute.getTargetHost() - .getHostName() - .equals(((KeepAliveKey) obj).httpRoute.getTargetHost().getHostName()); + /** + * Puts the HttpClientConnection in the cache. If the size of cache is equal to + * maxConn, the give HttpClientConnection is closed and not added in cache. + * + * @param httpClientConnection HttpClientConnection to be cached + */ + public synchronized void put(HttpClientConnection httpClientConnection) { + if (isClosed) { + return; } - - /** - * The hashCode() for this object is the string hashCode() of - * concatenation of the protocol, host name and port. - */ - @Override - public int hashCode() { - String str = httpRoute.getTargetHost().getHostName() + ":" - + httpRoute.getTargetHost().getPort(); - return str.hashCode(); + if (size() >= maxConn) { + closeHtpClientConnection(httpClientConnection); + return; } + KeepAliveEntry entry = new KeepAliveEntry(httpClientConnection, + System.currentTimeMillis()); + push(entry); } + /** + * Entry data-structure in the cache. + */ static class KeepAliveEntry { + /**HttpClientConnection in the cache entry.*/ private final HttpClientConnection httpClientConnection; + /**Time at which the HttpClientConnection was added to the cache.*/ private final long idleStartTime; KeepAliveEntry(HttpClientConnection hc, long idleStartTime) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java index 0b0934b5c3333..1a663ec3c93c5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java @@ -22,6 +22,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -107,7 +108,7 @@ private byte[] getUserDelegationKey(String accountName, String appID, String app requestBody.append(""); AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, method, requestHeaders, - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString()); op.sendPayload(requestBuffer, 0, requestBuffer.length); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index e7a28171cc9fd..657b13ce9ca5c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -97,11 +97,10 @@ public static void setMockAbfsRestOperationForListPathOperation( * @throws IOException */ public static void addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperation abfsRestOperation, - final AbfsJdkHttpOperation httpOperation) throws IOException { + final AbfsHttpOperation httpOperation) throws IOException { HttpURLConnection httpURLConnection = Mockito.mock(HttpURLConnection.class); Mockito.doNothing().when(httpURLConnection) .setRequestProperty(nullable(String.class), nullable(String.class)); - Mockito.doReturn(httpURLConnection).when(httpOperation).getConnection(); Mockito.doReturn("").when(abfsRestOperation).getClientLatency(); Mockito.doReturn(httpOperation).when(abfsRestOperation).createHttpOperation(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 574d59c321649..717ae7b98da35 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -418,6 +418,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, abfsConfig.getAccountName().substring(0, abfsConfig.getAccountName().indexOf(DOT)), abfsConfig)); when(client.getAbfsCounters()).thenReturn(abfsCounters); + Mockito.doReturn(baseAbfsClientInstance.getAbfsApacheHttpClient()).when(client).getAbfsApacheHttpClient(); // override baseurl client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration", @@ -645,8 +646,7 @@ public void testExpectHundredContinue() throws Exception { .parseStatusCode(Mockito.nullable( HttpResponse.class)); Mockito.doThrow( - new AbfsApacheHttpExpect100Exception(EXPECT_100_JDK_ERROR, - null)) + new AbfsApacheHttpExpect100Exception(Mockito.mock(HttpResponse.class))) .when((AbfsAHCHttpOperation) httpOperation) .executeRequest(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsHttpClientRequestExecutor.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsHttpClientRequestExecutor.java index 536c16f27aa7a..c8c196b04a0d8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsHttpClientRequestExecutor.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsHttpClientRequestExecutor.java @@ -234,7 +234,7 @@ private void mockHttpOperationBehavior(final ConnectionInfo connectionInfo, HttpClientConnection.class)); return context; }) - .when(httpOperation).setFinalAbfsClientContext(); + .when(httpOperation).getHttpClientContext(); return httpOperation; }).when(op).createHttpOperation(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index ef738e8984568..377ba19edf31b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -286,8 +286,7 @@ private void mockHttpOperation(final AppendRequestParameters appendRequestParame .parseStatusCode(Mockito.nullable( HttpResponse.class)); Mockito.doThrow( - new AbfsApacheHttpExpect100Exception(EXPECT_100_JDK_ERROR, - null)) + new AbfsApacheHttpExpect100Exception(Mockito.mock(HttpResponse.class))) .when((AbfsAHCHttpOperation) httpOperation).executeRequest(); } break; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java index d901dd22b32cf..f45a333fae1f7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java @@ -77,7 +77,7 @@ public void verifyDisablingOfTracker() throws Exception { try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller", "disablingCallee")) { AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); tracker.registerResult(op).registerSuccess(true); } @@ -96,7 +96,7 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception { List> tasks = new ArrayList<>(); AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -136,7 +136,7 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception { List> tasks = new ArrayList<>(); AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -176,7 +176,7 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -212,7 +212,7 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -277,7 +277,7 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -312,7 +312,7 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -373,7 +373,7 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false); AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); verifyNoException(abfsPerfTrackerDisabled); verifyNoException(abfsPerfTrackerEnabled); @@ -382,7 +382,7 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception { Instant testInstant = Instant.now(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList(), - DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); try ( AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index af2edcadd2e50..f3eeac1e65267 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -213,7 +213,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { Mockito.mock(AbfsConfiguration.class) )); - AbfsJdkHttpOperation httpOperation = Mockito.mock(AbfsJdkHttpOperation.class); + AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class); addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation); Stubber stubber = Mockito.doThrow(new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE)); @@ -227,6 +227,8 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { Mockito.doReturn("").when(httpOperation).getStorageErrorMessage(); Mockito.doReturn("").when(httpOperation).getStorageErrorCode(); Mockito.doReturn("HEAD").when(httpOperation).getMethod(); + Mockito.doReturn("").when(httpOperation).getMaskedUrl(); + Mockito.doReturn("").when(httpOperation).getRequestId(); Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()).when(httpOperation).getStorageErrorMessage(); Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any()); @@ -274,7 +276,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { // Assert that intercept.updateMetrics was called 2 times. Both the retried request fails with EGR. Mockito.verify(intercept, Mockito.times(2)) .updateMetrics(nullable(AbfsRestOperationType.class), nullable( - AbfsJdkHttpOperation.class)); + AbfsHttpOperation.class)); } private void testClientRequestIdForStatusRetry(int status, @@ -300,7 +302,7 @@ private void testClientRequestIdForStatusRetry(int status, Mockito.mock(AbfsConfiguration.class) )); - AbfsJdkHttpOperation httpOperation = Mockito.mock(AbfsJdkHttpOperation.class); + AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class); addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation); Mockito.doNothing() @@ -366,7 +368,7 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions, Mockito.mock(AbfsConfiguration.class) )); - AbfsJdkHttpOperation httpOperation = Mockito.mock(AbfsJdkHttpOperation.class); + AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class); addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation); Stubber stubber = Mockito.doThrow(exceptions[0]); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java index 2744c8076e21d..cc71611d5dfb8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java @@ -18,25 +18,20 @@ package org.apache.hadoop.fs.azurebfs.services; -import java.io.IOException; -import java.util.Stack; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout; -import org.apache.hadoop.util.functional.FutureIO; + import org.apache.http.HttpClientConnection; import org.apache.http.HttpHost; import org.apache.http.conn.routing.HttpRoute; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS; public class TestApacheClientConnectionPool extends AbstractAbfsTestWithTimeout { @@ -46,245 +41,147 @@ public TestApacheClientConnectionPool() throws Exception { } @Test - public void testBasicPool() throws IOException { + public void testBasicPool() throws Exception { System.clearProperty(HTTP_MAX_CONN_SYS_PROP); - validatePoolSize(DEFAULT_MAX_CONN_SYS_PROP); + validatePoolSize(DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS); } @Test - public void testSysPropAppliedPool() throws IOException { + public void testSysPropAppliedPool() throws Exception { final String customPoolSize = "10"; System.setProperty(HTTP_MAX_CONN_SYS_PROP, customPoolSize); validatePoolSize(Integer.parseInt(customPoolSize)); } - private void validatePoolSize(int size) throws IOException { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); - final HttpClientConnection[] connections = new HttpClientConnection[size - * 2]; + private void validatePoolSize(int size) throws Exception { + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), ""))) { + keepAliveCache.clear(); + final HttpClientConnection[] connections = new HttpClientConnection[size + * 2]; - for (int i = 0; i < size * 2; i++) { - connections[i] = Mockito.mock(HttpClientConnection.class); - } + for (int i = 0; i < size * 2; i++) { + connections[i] = Mockito.mock(HttpClientConnection.class); + } - for (int i = 0; i < size * 2; i++) { - keepAliveCache.put(routes, connections[i]); - } + for (int i = 0; i < size * 2; i++) { + keepAliveCache.put(connections[i]); + } - for (int i = size; i < size * 2; i++) { - Mockito.verify(connections[i], Mockito.times(1)).close(); - } + for (int i = size; i < size * 2; i++) { + Mockito.verify(connections[i], Mockito.times(1)).close(); + } - for (int i = 0; i < size * 2; i++) { - if (i < size) { - Assert.assertNotNull(keepAliveCache.get(routes)); - } else { - Assert.assertNull(keepAliveCache.get(routes)); + for (int i = 0; i < size * 2; i++) { + if (i < size) { + Assert.assertNotNull(keepAliveCache.get()); + } else { + Assert.assertNull(keepAliveCache.get()); + } } + System.clearProperty(HTTP_MAX_CONN_SYS_PROP); } - System.clearProperty(HTTP_MAX_CONN_SYS_PROP); } @Test - public void testKeepAliveCache() throws IOException { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); - HttpClientConnection connection = Mockito.mock(HttpClientConnection.class); - - keepAliveCache.put(routes, connection); - - Assert.assertNotNull(keepAliveCache.get(routes)); - keepAliveCache.put(routes, connection); - - final HttpRoute routes1 = new HttpRoute(new HttpHost("localhost1")); - Assert.assertNull(keepAliveCache.get(routes1)); + public void testKeepAliveCache() throws Exception { + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), ""))) { + keepAliveCache.clear(); + final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); + HttpClientConnection connection = Mockito.mock( + HttpClientConnection.class); + + keepAliveCache.put(connection); + + Assert.assertNotNull(keepAliveCache.get()); + keepAliveCache.put(connection); + } } @Test public void testKeepAliveCacheCleanup() throws Exception { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); - HttpClientConnection connection = Mockito.mock(HttpClientConnection.class); - keepAliveCache.put(routes, connection); - - Thread.sleep(2 * keepAliveCache.getConnectionIdleTTL()); - Mockito.verify(connection, Mockito.times(1)).close(); - Assert.assertNull(keepAliveCache.get(routes)); - Mockito.verify(connection, Mockito.times(1)).close(); - } - - @Test - public void testKeepAliveCacheCleanupWithConnections() throws Exception { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.pauseThread(); - keepAliveCache.clearThread(); - final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); - HttpClientConnection connection = Mockito.mock(HttpClientConnection.class); - keepAliveCache.put(routes, connection); - - Thread.sleep(2 * keepAliveCache.getConnectionIdleTTL()); - Mockito.verify(connection, Mockito.times(0)).close(); - Assert.assertNull(keepAliveCache.get(routes)); - Mockito.verify(connection, Mockito.times(1)).close(); - keepAliveCache.resumeThread(); - } - - @Test - public void testKeepAliveCacheParallelismAtSingleRoute() throws Exception { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - int parallelism = 4; - ExecutorService executorService = Executors.newFixedThreadPool(parallelism); - /* - * Verify the correctness of KeepAliveCache at single route level state - * in a multi-threaded environment. - */ - try { - Stack stack = new Stack<>(); - HttpRoute routes = new HttpRoute(new HttpHost("host")); - Future[] futures = new Future[parallelism]; - for (int i = 0; i < parallelism; i++) { - final boolean putRequest = i % 2 == 0; - futures[i] = executorService.submit(() -> { - for (int j = 0; j < DEFAULT_MAX_CONN_SYS_PROP * 4; j++) { - synchronized (this) { - if (putRequest) { - HttpClientConnection connection = Mockito.mock( - HttpClientConnection.class); - stack.add(connection); - try { - Mockito.doAnswer(answer -> { - Assertions.assertThat(connection).isEqualTo(stack.pop()); - return null; - }).when(connection).close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - keepAliveCache.put(routes, connection); - } else { - try { - if (stack.empty()) { - Assertions.assertThat(keepAliveCache.get(routes)).isNull(); - } else { - Assertions.assertThat(keepAliveCache.get(routes)) - .isEqualTo(stack.pop()); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - } - }); - } - for (int i = 0; i < parallelism; i++) { - FutureIO.awaitFuture(futures[i]); - } - while (!stack.empty()) { - Assertions.assertThat(keepAliveCache.get(routes)) - .isEqualTo(stack.pop()); - } - } finally { - executorService.shutdownNow(); + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), ""))) { + keepAliveCache.clear(); + HttpClientConnection connection = Mockito.mock( + HttpClientConnection.class); + keepAliveCache.put(connection); + + Thread.sleep(2 * keepAliveCache.getConnectionIdleTTL()); + Mockito.verify(connection, Mockito.times(1)).close(); + Assert.assertNull(keepAliveCache.get()); + Mockito.verify(connection, Mockito.times(1)).close(); } } @Test - public void testKeepAliveCacheParallelismAtWithMultipleRoute() - throws Exception { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - ExecutorService executorService = Executors.newFixedThreadPool(4); - try { - /* - * Verify that KeepAliveCache maintains connection at route level. - */ - Future[] futures = new Future[4]; - for (int i = 0; i < 4; i++) { - /* - * Create a route for each thread. Iteratively call put and get methods - * on the KeepAliveCache for the same route. Verify that the connections - * are being returned in the order they were put in the cache. Also, verify - * that the cache is not holding more than the maximum number of connections. - */ - final HttpRoute routes = new HttpRoute(new HttpHost("host" + i)); - futures[i] = executorService.submit(() -> { - final Stack connections = new Stack<>(); - for (int j = 0; j < 3 * DEFAULT_MAX_CONN_SYS_PROP; j++) { - if ((j / DEFAULT_MAX_CONN_SYS_PROP) % 2 == 0) { - HttpClientConnection connection = Mockito.mock( - HttpClientConnection.class); - connections.add(connection); - try { - Mockito.doAnswer(answer -> { - Assertions.assertThat(connection) - .isEqualTo(connections.pop()); - return null; - }).when(connection).close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - keepAliveCache.put(routes, connection); - } else { - try { - Assertions.assertThat(keepAliveCache.get(routes)) - .isEqualTo(connections.pop()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - } - Assertions.assertThat(connections).hasSize(DEFAULT_MAX_CONN_SYS_PROP); - }); - } - - for (int i = 0; i < 4; i++) { - FutureIO.awaitFuture(futures[i]); - } - } finally { - executorService.shutdownNow(); + public void testKeepAliveCacheCleanupWithConnections() throws Exception { + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), ""))) { + keepAliveCache.pauseThread(); + keepAliveCache.clear(); + HttpClientConnection connection = Mockito.mock( + HttpClientConnection.class); + keepAliveCache.put(connection); + + Thread.sleep(2 * keepAliveCache.getConnectionIdleTTL()); + Mockito.verify(connection, Mockito.times(0)).close(); + Assert.assertNull(keepAliveCache.get()); + Mockito.verify(connection, Mockito.times(1)).close(); + keepAliveCache.resumeThread(); } } @Test public void testKeepAliveCacheConnectionRecache() throws Exception { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); - HttpClientConnection connection = Mockito.mock(HttpClientConnection.class); - keepAliveCache.put(routes, connection); - - Assert.assertNotNull(keepAliveCache.get(routes)); - keepAliveCache.put(routes, connection); - Assert.assertNotNull(keepAliveCache.get(routes)); + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), ""))) { + keepAliveCache.clear(); + HttpClientConnection connection = Mockito.mock( + HttpClientConnection.class); + keepAliveCache.put(connection); + + Assert.assertNotNull(keepAliveCache.get()); + keepAliveCache.put(connection); + Assert.assertNotNull(keepAliveCache.get()); + } } @Test public void testKeepAliveCacheRemoveStaleConnection() throws Exception { - KeepAliveCache keepAliveCache = KeepAliveCache.getInstance(); - keepAliveCache.clearThread(); - final HttpRoute routes = new HttpRoute(new HttpHost("localhost")); - HttpClientConnection[] connections = new HttpClientConnection[5]; - for (int i = 0; i < 5; i++) { - connections[i] = Mockito.mock(HttpClientConnection.class); - keepAliveCache.put(routes, connections[i]); - } + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), ""))) { + keepAliveCache.clear(); + HttpClientConnection[] connections = new HttpClientConnection[5]; + + // Fill up the cache. + for (int i = 0; + i < DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS; + i++) { + connections[i] = Mockito.mock(HttpClientConnection.class); + keepAliveCache.put(connections[i]); + } - for (int i = 0; i < 3; i++) { - Mockito.doReturn(true).when(connections[i]).isStale(); - } + // Mark all but the last two connections as stale. + for (int i = 0; + i < DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS - 2; + i++) { + Mockito.doReturn(true).when(connections[i]).isStale(); + } - for (int i = 4; i >= 0; i--) { - if (i >= 3) { - Assert.assertNotNull(keepAliveCache.get(routes)); - } else { - Assert.assertNull(keepAliveCache.get(routes)); - Mockito.verify(connections[i], Mockito.times(1)).close(); + // Verify that the stale connections are removed. + for (int i = DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS - 1; + i >= 0; + i--) { + // The last two connections are not stale and would be returned. + if (i >= (DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS - 2)) { + Assert.assertNotNull(keepAliveCache.get()); + } else { + // Stale connections are closed and removed. + Assert.assertNull(keepAliveCache.get()); + Mockito.verify(connections[i], Mockito.times(1)).close(); + } } } }