Skip to content

Commit

Permalink
Hadoop-18759: [ABFS][Backoff-Optimization] Have a Static retry policy…
Browse files Browse the repository at this point in the history
… for connection timeout. (#5881)


Contributed By: Anuj Modi
  • Loading branch information
anujmodi2021 authored Feb 20, 2024
1 parent 03d9aca commit 1336c36
Show file tree
Hide file tree
Showing 28 changed files with 935 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
private int maxBackoffInterval;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED,
DefaultValue = DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED)
private boolean staticRetryForConnectionTimeoutEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_INTERVAL,
DefaultValue = DEFAULT_STATIC_RETRY_INTERVAL)
private int staticRetryInterval;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
DefaultValue = DEFAULT_BACKOFF_INTERVAL)
private int backoffInterval;
Expand All @@ -166,6 +174,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
private int customTokenFetchRetryCount;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT,
DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT)
private int httpConnectionTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT,
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
private int httpReadTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
Expand Down Expand Up @@ -669,6 +685,14 @@ public int getMaxBackoffIntervalMilliseconds() {
return this.maxBackoffInterval;
}

public boolean getStaticRetryForConnectionTimeoutEnabled() {
return staticRetryForConnectionTimeoutEnabled;
}

public int getStaticRetryInterval() {
return staticRetryInterval;
}

public int getBackoffIntervalMilliseconds() {
return this.backoffInterval;
}
Expand All @@ -681,6 +705,14 @@ public int getCustomTokenFetchRetryCount() {
return this.customTokenFetchRetryCount;
}

public int getHttpConnectionTimeout() {
return this.httpConnectionTimeout;
}

public int getHttpReadTimeout() {
return this.httpReadTimeout;
}

public long getAzureBlockSize() {
return this.azureBlockSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
Expand Down Expand Up @@ -1781,6 +1782,8 @@ private AbfsClientContext populateAbfsClientContext() {
return new AbfsClientContextBuilder()
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfiguration))
.withStaticRetryPolicy(
new StaticRetryPolicy(abfsConfiguration))
.withAbfsCounters(abfsCounters)
.withAbfsPerfTracker(abfsPerfTracker)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,23 @@ public final class ConfigurationKeys {
// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
public static final String AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = "fs.azure.static.retry.for.connection.timeout.enabled";
public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval";
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";

/**
* Config to set HTTP Connection Timeout Value for Rest Operations.
* Value: {@value}.
*/
public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout";
/**
* Config to set HTTP Read Timeout Value for Rest Operations.
* Value: {@value}.
*/
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";

// Retry strategy for getToken calls
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,28 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60 * 1000;
private static final int SIXTY_SECONDS = 60_000;

// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30_000; // 30s
public static final boolean DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = true;
public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s
public static final int DEFAULT_BACKOFF_INTERVAL = 3_000; // 3s
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;

/**
* Default value of connection timeout to be used while setting up HTTP Connection.
* Value: {@value}.
*/
public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 2_000; // 2s
/**
* Default value of read timeout to be used while setting up HTTP Connection.
* Value: {@value}.
*/
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs

// Retry parameter defaults.
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;

/**
* AbfsClient.
Expand All @@ -93,7 +94,8 @@ public class AbfsClient implements Closeable {
private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private String xMsVersion = DECEMBER_2019_API_VERSION;
private final ExponentialRetryPolicy retryPolicy;
private final ExponentialRetryPolicy exponentialRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final String filesystem;
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
Expand Down Expand Up @@ -131,7 +133,8 @@ private AbfsClient(final URL baseUrl,
String baseUrlString = baseUrl.toString();
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
Expand Down Expand Up @@ -213,8 +216,24 @@ protected AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}

ExponentialRetryPolicy getRetryPolicy() {
return retryPolicy;
ExponentialRetryPolicy getExponentialRetryPolicy() {
return exponentialRetryPolicy;
}

StaticRetryPolicy getStaticRetryPolicy() {
return staticRetryPolicy;
}

/**
* Returns the retry policy to be used for Abfs Rest Operation Failure.
* @param failureReason helps to decide which type of retryPolicy to be used.
* @return retry policy to be used.
*/
public AbfsRetryPolicy getRetryPolicy(final String failureReason) {
return CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason)
&& getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()
? getStaticRetryPolicy()
: getExponentialRetryPolicy();
}

SharedKeyCredentials getSharedKeyCredentials() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
public class AbfsClientContext {

private final ExponentialRetryPolicy exponentialRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final AbfsPerfTracker abfsPerfTracker;
private final AbfsCounters abfsCounters;

AbfsClientContext(
ExponentialRetryPolicy exponentialRetryPolicy,
StaticRetryPolicy staticRetryPolicy,
AbfsPerfTracker abfsPerfTracker,
AbfsCounters abfsCounters) {
this.exponentialRetryPolicy = exponentialRetryPolicy;

this.staticRetryPolicy = staticRetryPolicy;
this.abfsPerfTracker = abfsPerfTracker;
this.abfsCounters = abfsCounters;
}
Expand All @@ -41,6 +45,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() {
return exponentialRetryPolicy;
}

public StaticRetryPolicy getStaticRetryPolicy() {
return staticRetryPolicy;
}

public AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class AbfsClientContextBuilder {

private ExponentialRetryPolicy exponentialRetryPolicy;
private StaticRetryPolicy staticRetryPolicy;
private AbfsPerfTracker abfsPerfTracker;
private AbfsCounters abfsCounters;

Expand All @@ -34,6 +35,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy(
return this;
}

public AbfsClientContextBuilder withStaticRetryPolicy(
final StaticRetryPolicy staticRetryPolicy) {
this.staticRetryPolicy = staticRetryPolicy;
return this;
}

public AbfsClientContextBuilder withAbfsPerfTracker(
final AbfsPerfTracker abfsPerfTracker) {
this.abfsPerfTracker = abfsPerfTracker;
Expand All @@ -52,7 +59,10 @@ public AbfsClientContextBuilder withAbfsCounters(final AbfsCounters abfsCounters
*/
public AbfsClientContext build() {
//validate the values
return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker,
return new AbfsClientContext(
exponentialRetryPolicy,
staticRetryPolicy,
abfsPerfTracker,
abfsCounters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@
public class AbfsHttpOperation implements AbfsPerfLoggable {
private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);

private static final int CONNECT_TIMEOUT = 30 * 1000;
private static final int READ_TIMEOUT = 30 * 1000;

private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;

private static final int ONE_THOUSAND = 1000;
Expand Down Expand Up @@ -263,10 +260,12 @@ public String getMaskedEncodedUrl() {
* @param url The full URL including query string parameters.
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
* @param requestHeaders The HTTP request headers.READ_TIMEOUT
*
* @param connectionTimeout The Connection Timeout value to be used while establishing http connection
* @param readTimeout The Read Timeout value to be used with http connection while making a request
* @throws IOException if an error occurs.
*/
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders,
final int connectionTimeout, final int readTimeout)
throws IOException {
this.url = url;
this.method = method;
Expand All @@ -280,9 +279,8 @@ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttp
}
}

this.connection.setConnectTimeout(CONNECT_TIMEOUT);
this.connection.setReadTimeout(READ_TIMEOUT);

this.connection.setConnectTimeout(connectionTimeout);
this.connection.setReadTimeout(readTimeout);
this.connection.setRequestMethod(method);

for (AbfsHttpHeader header : requestHeaders) {
Expand Down
Loading

0 comments on commit 1336c36

Please sign in to comment.