Skip to content

Commit

Permalink
review comments (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
saxenapranav authored Jul 1, 2024
1 parent 8f1892e commit 650c077
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ClosedIOException extends PathIOException {
public ClosedIOException(String path, String message) {
super(path, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand All @@ -32,6 +33,7 @@
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.HttpClientConnectionOperator;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
Expand Down Expand Up @@ -90,15 +92,20 @@ public HttpClientConnection get(final long timeout,
final TimeUnit timeUnit)
throws InterruptedException, ExecutionException,
ConnectionPoolTimeoutException {
LOG.debug("Connection requested");
String requestId = UUID.randomUUID().toString();
logDebug("Connection requested for request {}", requestId);
try {
HttpClientConnection clientConn = kac.get();
if (clientConn != null) {
LOG.debug("Connection retrieved from KAC: {}", clientConn);
logDebug("Connection retrieved from KAC: {} for requestId: {}",
clientConn, requestId);
return clientConn;
}
LOG.debug("Creating new connection");
return httpConnectionFactory.create(route, null);
logDebug("Creating new connection for requestId: {}", requestId);
ManagedHttpClientConnection conn = httpConnectionFactory.create(route,
null);
logDebug("Connection created: {} for requestId: {}", conn, requestId);
return conn;
} catch (IOException ex) {
throw new ExecutionException(ex);
}
Expand Down Expand Up @@ -131,9 +138,9 @@ public void releaseConnection(final HttpClientConnection conn,
if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
boolean connAddedInKac = kac.put(conn);
if (connAddedInKac) {
LOG.debug("Connection cached: {}", conn);
logDebug("Connection cached: {}", conn);
} else {
LOG.debug("Connection not cached, and is released: {}", conn);
logDebug("Connection not cached, and is released: {}", conn);
}
}
}
Expand All @@ -145,11 +152,11 @@ public void connect(final HttpClientConnection conn,
final int connectTimeout,
final HttpContext context) throws IOException {
long start = System.currentTimeMillis();
LOG.debug("Connecting {} to {}", conn, route.getTargetHost());
logDebug("Connecting {} to {}", conn, route.getTargetHost());
connectionOperator.connect((AbfsManagedApacheHttpConnection) conn,
route.getTargetHost(), route.getLocalSocketAddress(),
connectTimeout, SocketConfig.DEFAULT, context);
LOG.debug("Connection established: {}", conn);
logDebug("Connection established: {}", conn);
if (context instanceof AbfsManagedHttpClientContext) {
((AbfsManagedHttpClientContext) context).setConnectTime(
System.currentTimeMillis() - start);
Expand Down Expand Up @@ -191,4 +198,10 @@ public void closeExpiredConnections() {
public void shutdown() {
kac.close();
}

private void logDebug(String message, Object... args) {
if (LOG.isDebugEnabled()) {
LOG.debug(message, args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
* <p>
* For JDK netlib usage, the child class would be {@link AbfsJdkHttpOperation}. <br>
* For ApacheHttpClient netlib usage, the child class would be {@link AbfsAHCHttpOperation}.
* </p>
*/
public abstract class AbfsHttpOperation implements AbfsPerfLoggable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.routing.HttpRoute;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;

/**
* This class wraps the {@link ManagedHttpClientConnection} and provides
* insights onto the connection level activity.
Expand Down Expand Up @@ -223,4 +225,16 @@ public boolean equals(final Object o) {
public int hashCode() {
return hashCode;
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(
httpClientConnection.getRemoteAddress().getHostName())
.append(COLON)
.append(httpClientConnection.getRemotePort())
.append(COLON)
.append(hashCode());
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
Expand All @@ -46,7 +47,6 @@
import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
import org.apache.http.impl.execchain.RequestAbortedException;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
import static org.apache.hadoop.util.Time.now;

Expand Down Expand Up @@ -482,8 +482,7 @@ private boolean executeHttpOperation(final int retryCount,
if (httpOperation instanceof AbfsAHCHttpOperation) {
registerApacheHttpClientIoException();
if (ex instanceof RequestAbortedException
&& ex.getCause() instanceof IOException
&& KEEP_ALIVE_CACHE_CLOSED.equals(ex.getCause().getMessage())) {
&& ex.getCause() instanceof ClosedIOException) {
throw new AbfsDriverException((IOException) ex.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.http.HttpClientConnection;

Expand Down Expand Up @@ -96,6 +97,8 @@ class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
*/
private final AtomicBoolean isPaused = new AtomicBoolean(false);

private final String accountNamePath;

@VisibleForTesting
synchronized void pauseThread() {
isPaused.set(true);
Expand All @@ -122,13 +125,13 @@ public long getConnectionIdleTTL() {
* If the configuration is not set, the system-property {@value org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants#HTTP_MAX_CONN_SYS_PROP}.
* If the system-property is not set or set to 0, the default value
* {@value org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations#DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS} is used.
* </p> <p>
* <p>
* This schedules an eviction thread to run every connectionIdleTTL milliseconds
* given by the configuration {@link AbfsConfiguration#getMaxApacheHttpClientConnectionIdleTime()}.
* </p>
* @param abfsConfiguration Configuration of the filesystem.
*/
KeepAliveCache(AbfsConfiguration abfsConfiguration) {
accountNamePath = abfsConfiguration.getAccountName();
this.timer = new Timer("abfs-kac-" + KAC_COUNTER.getAndIncrement(), true);

int sysPropMaxConn = Integer.parseInt(System.getProperty(HTTP_MAX_CONN_SYS_PROP, "0"));
Expand Down Expand Up @@ -185,7 +188,9 @@ private void closeHttpClientConnection(final HttpClientConnection hc) {
try {
hc.close();
} catch (IOException ex) {
LOG.debug("Close failed for connection: " + hc, ex);
if(LOG.isDebugEnabled()) {
LOG.debug("Close failed for connection: {}", hc, ex);
}
}
}

Expand Down Expand Up @@ -215,18 +220,17 @@ void closeInternal() {
* <p>
* Gets the latest added HttpClientConnection from the cache. The returned connection
* is non-stale and has been in the cache for less than connectionIdleTTL milliseconds.
* </p> <p>
* <p>
* The cache is checked from the top of the stack. If the connection is stale or has been
* in the cache for more than connectionIdleTTL milliseconds, it is closed and the next
* connection is checked. Once a valid connection is found, it is returned.
* </p>
* @return HttpClientConnection: if a valid connection is found, else null.
* @throws IOException if the cache is closed.
*/
public synchronized HttpClientConnection get()
throws IOException {
if (isClosed.get()) {
throw new IOException(KEEP_ALIVE_CACHE_CLOSED);
throw new ClosedIOException(accountNamePath, KEEP_ALIVE_CACHE_CLOSED);
}
if (empty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.junit.Test;

import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
Expand All @@ -28,6 +29,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;

/**
* This test class tests the exception handling in ABFS thrown by the
Expand All @@ -41,13 +43,16 @@ public ITestApacheClientConnectionPool() throws Exception {
}

@Test
public void testKacIsClosed() throws Exception {
try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration())) {
public void testKacIsClosed() throws Throwable {
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
getRawConfiguration())) {
KeepAliveCache kac = fs.getAbfsStore().getClient().getKeepAliveCache();
kac.close();
intercept(AbfsDriverException.class, KEEP_ALIVE_CACHE_CLOSED, () -> {
fs.create(new Path("/test"));
});
AbfsDriverException ex = intercept(AbfsDriverException.class,
KEEP_ALIVE_CACHE_CLOSED, () -> {
fs.create(new Path("/test"));
});
verifyCause(ClosedIOException.class, ex);
}
}
}
Loading

0 comments on commit 650c077

Please sign in to comment.