From 650c077e68285438ed7bf2e21ea44c14ee6b88a7 Mon Sep 17 00:00:00 2001 From: Pranav Saxena <108325433+saxenapranav@users.noreply.github.com> Date: Mon, 1 Jul 2024 17:01:51 +0530 Subject: [PATCH] review comments (#20) --- .../apache/hadoop/fs/ClosedIOException.java | 29 +++++++ .../services/AbfsConnectionManager.java | 29 +++++-- .../azurebfs/services/AbfsHttpOperation.java | 1 - .../AbfsManagedApacheHttpConnection.java | 14 ++++ .../azurebfs/services/AbfsRestOperation.java | 5 +- .../fs/azurebfs/services/KeepAliveCache.java | 16 ++-- .../ITestApacheClientConnectionPool.java | 15 ++-- .../TestApacheClientConnectionPool.java | 84 ++++++++++++++----- 8 files changed, 147 insertions(+), 46 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClosedIOException.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClosedIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClosedIOException.java new file mode 100644 index 0000000000000..80aa8d34049e1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClosedIOException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ClosedIOException extends PathIOException { + public ClosedIOException(String path, String message) { + super(path, message); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java index 5e7aaae800263..9b0e69accbd6f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -32,6 +33,7 @@ import org.apache.http.conn.ConnectionRequest; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.HttpClientConnectionOperator; +import org.apache.http.conn.ManagedHttpClientConnection; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator; @@ -90,15 +92,20 @@ public HttpClientConnection get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException { - LOG.debug("Connection requested"); + String requestId = UUID.randomUUID().toString(); + logDebug("Connection requested for request {}", requestId); try { HttpClientConnection clientConn = kac.get(); if (clientConn != null) { - LOG.debug("Connection retrieved from KAC: {}", clientConn); + logDebug("Connection retrieved from KAC: {} for requestId: {}", + clientConn, requestId); return clientConn; } - LOG.debug("Creating new connection"); - return httpConnectionFactory.create(route, null); + logDebug("Creating new connection for requestId: {}", requestId); + ManagedHttpClientConnection conn = httpConnectionFactory.create(route, + null); + logDebug("Connection created: {} for requestId: {}", conn, requestId); + return conn; } catch (IOException ex) { throw new ExecutionException(ex); } @@ -131,9 +138,9 @@ public void releaseConnection(final HttpClientConnection conn, if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) { boolean connAddedInKac = kac.put(conn); if (connAddedInKac) { - LOG.debug("Connection cached: {}", conn); + logDebug("Connection cached: {}", conn); } else { - LOG.debug("Connection not cached, and is released: {}", conn); + logDebug("Connection not cached, and is released: {}", conn); } } } @@ -145,11 +152,11 @@ public void connect(final HttpClientConnection conn, final int connectTimeout, final HttpContext context) throws IOException { long start = System.currentTimeMillis(); - LOG.debug("Connecting {} to {}", conn, route.getTargetHost()); + logDebug("Connecting {} to {}", conn, route.getTargetHost()); connectionOperator.connect((AbfsManagedApacheHttpConnection) conn, route.getTargetHost(), route.getLocalSocketAddress(), connectTimeout, SocketConfig.DEFAULT, context); - LOG.debug("Connection established: {}", conn); + logDebug("Connection established: {}", conn); if (context instanceof AbfsManagedHttpClientContext) { ((AbfsManagedHttpClientContext) context).setConnectTime( System.currentTimeMillis() - start); @@ -191,4 +198,10 @@ public void closeExpiredConnections() { public void shutdown() { kac.close(); } + + private void logDebug(String message, Object... args) { + if (LOG.isDebugEnabled()) { + LOG.debug(message, args); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index cee11a44c05a1..e2ce5c628a4b6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -47,7 +47,6 @@ *
* For JDK netlib usage, the child class would be {@link AbfsJdkHttpOperation}.
* For ApacheHttpClient netlib usage, the child class would be {@link AbfsAHCHttpOperation}.
- *
+ *
* This schedules an eviction thread to run every connectionIdleTTL milliseconds * given by the configuration {@link AbfsConfiguration#getMaxApacheHttpClientConnectionIdleTime()}. - *
* @param abfsConfiguration Configuration of the filesystem. */ KeepAliveCache(AbfsConfiguration abfsConfiguration) { + accountNamePath = abfsConfiguration.getAccountName(); this.timer = new Timer("abfs-kac-" + KAC_COUNTER.getAndIncrement(), true); int sysPropMaxConn = Integer.parseInt(System.getProperty(HTTP_MAX_CONN_SYS_PROP, "0")); @@ -185,7 +188,9 @@ private void closeHttpClientConnection(final HttpClientConnection hc) { try { hc.close(); } catch (IOException ex) { - LOG.debug("Close failed for connection: " + hc, ex); + if(LOG.isDebugEnabled()) { + LOG.debug("Close failed for connection: {}", hc, ex); + } } } @@ -215,18 +220,17 @@ void closeInternal() { ** Gets the latest added HttpClientConnection from the cache. The returned connection * is non-stale and has been in the cache for less than connectionIdleTTL milliseconds. - *
+ *
* The cache is checked from the top of the stack. If the connection is stale or has been * in the cache for more than connectionIdleTTL milliseconds, it is closed and the next * connection is checked. Once a valid connection is found, it is returned. - *
* @return HttpClientConnection: if a valid connection is found, else null. * @throws IOException if the cache is closed. */ public synchronized HttpClientConnection get() throws IOException { if (isClosed.get()) { - throw new IOException(KEEP_ALIVE_CACHE_CLOSED); + throw new ClosedIOException(accountNamePath, KEEP_ALIVE_CACHE_CLOSED); } if (empty()) { return null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java index 2cd545fb8e9f6..9998bc82f1cd6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java @@ -20,6 +20,7 @@ import org.junit.Test; +import org.apache.hadoop.fs.ClosedIOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; @@ -28,6 +29,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.test.LambdaTestUtils.verifyCause; /** * This test class tests the exception handling in ABFS thrown by the @@ -41,13 +43,16 @@ public ITestApacheClientConnectionPool() throws Exception { } @Test - public void testKacIsClosed() throws Exception { - try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration())) { + public void testKacIsClosed() throws Throwable { + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + getRawConfiguration())) { KeepAliveCache kac = fs.getAbfsStore().getClient().getKeepAliveCache(); kac.close(); - intercept(AbfsDriverException.class, KEEP_ALIVE_CACHE_CLOSED, () -> { - fs.create(new Path("/test")); - }); + AbfsDriverException ex = intercept(AbfsDriverException.class, + KEEP_ALIVE_CACHE_CLOSED, () -> { + fs.create(new Path("/test")); + }); + verifyCause(ClosedIOException.class, ex); } } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java index 13d057778e0ee..24365447f2188 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheClientConnectionPool.java @@ -22,11 +22,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ClosedIOException; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout; @@ -35,6 +35,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME; @@ -76,14 +77,46 @@ public void testPoolWithZeroSysProp() throws Exception { @Test public void testEmptySizePool() throws Exception { Configuration configuration = new Configuration(); - configuration.set(FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE, "0"); - AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, EMPTY_STRING); - try (KeepAliveCache keepAliveCache = new KeepAliveCache(abfsConfiguration)) { - Assertions.assertThat(keepAliveCache.put(Mockito.mock(HttpClientConnection.class))).isFalse(); - Assertions.assertThat(keepAliveCache.get()).isNull(); + configuration.set(FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE, + "0"); + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, + EMPTY_STRING); + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + abfsConfiguration)) { + assertCachePutFail(keepAliveCache, + Mockito.mock(HttpClientConnection.class)); + assertCacheGetNull(keepAliveCache); } } + private void assertCacheGetNull(final KeepAliveCache keepAliveCache) + throws IOException { + Assertions.assertThat(keepAliveCache.get()) + .describedAs("cache.get()") + .isNull(); + } + + private void assertCacheGetNonNull(final KeepAliveCache keepAliveCache) + throws IOException { + Assertions.assertThat(keepAliveCache.get()) + .describedAs("cache.get()") + .isNotNull(); + } + + private void assertCachePutFail(final KeepAliveCache keepAliveCache, + final HttpClientConnection mock) { + Assertions.assertThat(keepAliveCache.put(mock)) + .describedAs("cache.put()") + .isFalse(); + } + + private void assertCachePutSuccess(final KeepAliveCache keepAliveCache, + final HttpClientConnection connections) { + Assertions.assertThat(keepAliveCache.put(connections)) + .describedAs("cache.put()") + .isTrue(); + } + private void validatePoolSize(int size) throws Exception { try (KeepAliveCache keepAliveCache = new KeepAliveCache( new AbfsConfiguration(new Configuration(), EMPTY_STRING))) { @@ -96,20 +129,20 @@ private void validatePoolSize(int size) throws Exception { } for (int i = 0; i < size; i++) { - Assertions.assertThat(keepAliveCache.put(connections[i])).isTrue(); + assertCachePutSuccess(keepAliveCache, connections[i]); Mockito.verify(connections[i], Mockito.times(0)).close(); } for (int i = size; i < size * 2; i++) { - Assertions.assertThat(keepAliveCache.put(connections[i])).isTrue(); + assertCachePutSuccess(keepAliveCache, connections[i]); Mockito.verify(connections[i - size], Mockito.times(1)).close(); } for (int i = 0; i < size * 2; i++) { if (i < size) { - Assert.assertNotNull(keepAliveCache.get()); + assertCacheGetNonNull(keepAliveCache); } else { - Assert.assertNull(keepAliveCache.get()); + assertCacheGetNull(keepAliveCache); } } System.clearProperty(HTTP_MAX_CONN_SYS_PROP); @@ -126,15 +159,17 @@ public void testKeepAliveCache() throws Exception { keepAliveCache.put(connection); - Assert.assertNotNull(keepAliveCache.get()); - keepAliveCache.put(connection); + assertCacheGetNonNull(keepAliveCache); } } @Test public void testKeepAliveCacheCleanup() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL, + HUNDRED + EMPTY_STRING); try (KeepAliveCache keepAliveCache = new KeepAliveCache( - new AbfsConfiguration(new Configuration(), EMPTY_STRING))) { + new AbfsConfiguration(configuration, EMPTY_STRING))) { keepAliveCache.clear(); HttpClientConnection connection = Mockito.mock( HttpClientConnection.class); @@ -153,15 +188,18 @@ public void testKeepAliveCacheCleanup() throws Exception { } // Assert that the closed connection is removed from the cache. - Assert.assertNull(keepAliveCache.get()); + assertCacheGetNull(keepAliveCache); Mockito.verify(connection, Mockito.times(1)).close(); } } @Test public void testKeepAliveCacheCleanupWithConnections() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL, + HUNDRED + EMPTY_STRING); try (KeepAliveCache keepAliveCache = new KeepAliveCache( - new AbfsConfiguration(new Configuration(), EMPTY_STRING))) { + new AbfsConfiguration(configuration, EMPTY_STRING))) { keepAliveCache.pauseThread(); keepAliveCache.clear(); HttpClientConnection connection = Mockito.mock( @@ -174,7 +212,7 @@ public void testKeepAliveCacheCleanupWithConnections() throws Exception { * remove the TTL-elapsed connection. */ Mockito.verify(connection, Mockito.times(0)).close(); - Assert.assertNull(keepAliveCache.get()); + assertCacheGetNull(keepAliveCache); Mockito.verify(connection, Mockito.times(1)).close(); keepAliveCache.resumeThread(); } @@ -189,9 +227,9 @@ public void testKeepAliveCacheConnectionRecache() throws Exception { HttpClientConnection.class); keepAliveCache.put(connection); - Assert.assertNotNull(keepAliveCache.get()); + assertCacheGetNonNull(keepAliveCache); keepAliveCache.put(connection); - Assert.assertNotNull(keepAliveCache.get()); + assertCacheGetNonNull(keepAliveCache); } } @@ -223,10 +261,10 @@ public void testKeepAliveCacheRemoveStaleConnection() throws Exception { i--) { // The last two connections are not stale and would be returned. if (i >= (DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS - 2)) { - Assert.assertNotNull(keepAliveCache.get()); + assertCacheGetNonNull(keepAliveCache); } else { // Stale connections are closed and removed. - Assert.assertNull(keepAliveCache.get()); + assertCacheGetNull(keepAliveCache); Mockito.verify(connections[i], Mockito.times(1)).close(); } } @@ -239,12 +277,12 @@ public void testKeepAliveCacheClosed() throws Exception { new AbfsConfiguration(new Configuration(), EMPTY_STRING))); keepAliveCache.put(Mockito.mock(HttpClientConnection.class)); keepAliveCache.close(); - IOException ex = intercept(IOException.class, + intercept(ClosedIOException.class, + KEEP_ALIVE_CACHE_CLOSED, () -> keepAliveCache.get()); - Assertions.assertThat(ex.getMessage()).isEqualTo(KEEP_ALIVE_CACHE_CLOSED); HttpClientConnection conn = Mockito.mock(HttpClientConnection.class); - Assertions.assertThat(keepAliveCache.put(conn)).isFalse(); + assertCachePutFail(keepAliveCache, conn); Mockito.verify(conn, Mockito.times(1)).close(); keepAliveCache.close(); Mockito.verify(keepAliveCache, Mockito.times(1)).closeInternal();