Skip to content

Commit

Permalink
HADOOP-18889. S3A v2 SDK third party support (#6141)
Browse files Browse the repository at this point in the history
Tune AWS v2 SDK changes based on testing with third party stores
including GCS. 

Contains HADOOP-18889. S3A v2 SDK error translations and troubleshooting docs

* Changes needed to work with multiple third party stores
* New third_party_stores document on how to bind to and test
  third party stores, including google gcs (which works!)
* Troubleshooting docs mostly updated for v2 SDK

Exception translation/resilience

* New AWSUnsupportedFeatureException for unsupported/unavailable errors
* Handle 501 method unimplemented as one of these
* Error codes > 500 mapped to the AWSStatus500Exception if no explicit
  handler.
* Precondition errors handled a bit better
* GCS throttle exception also recognized.
* GCS raises 404 on a delete of a file which doesn't exist: swallow it.
* Error translation uses reflection to create IOE of the right type.
  All IOEs at the bottom of an AWS stack chain are regenerated.
  then a new exception of that specific type is created, with the top level ex
  its cause. This is done to retain the whole stack chain.
* Reduce the number of retries within the AWS SDK
* And those of s3a code.
* S3ARetryPolicy explicitly declare SocketException as connectivity failure
  but subclasses BindException
* SocketTimeoutException also considered connectivity  
* Log at debug whenever retry policies looked up
* Reorder exceptions to alphabetical order, with commentary
* Review use of the Invoke.retry() method 

 The reduction in retries is because its clear when you try to create a bucket
 which doesn't resolve that the time for even an UnknownHostException to
 eventually fail over 90s, which then hit the s3a retry code.
 - Reducing the SDK retries means these escalate to our code better.
 - Cutting back on our own retries makes it a bit more responsive for most real
 deployments.
 - maybeTranslateNetworkException() and s3a retry policy means that
   unknown host exception is recognised and fails fast.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Oct 12, 2023
1 parent 0ed484a commit 81edbeb
Show file tree
Hide file tree
Showing 52 changed files with 1,980 additions and 1,222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1584,8 +1584,14 @@

<property>
<name>fs.s3a.attempts.maximum</name>
<value>20</value>
<description>How many times we should retry commands on transient errors.</description>
<value>5</value>
<description>
Number of times the AWS client library should retry errors before
escalating to the S3A code: {@value}.
The S3A connector does its own selective retries; the only time the AWS
SDK operations are not wrapped is during multipart copy via the AWS SDK
transfer manager.
</description>
</property>

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import software.amazon.awssdk.awscore.exception.AwsServiceException;

import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_400_BAD_REQUEST;

/**
* A 400 "Bad Request" exception was received.
* This is the general "bad parameters, headers, whatever" failure.
Expand All @@ -28,7 +30,7 @@ public class AWSBadRequestException extends AWSServiceIOException {
/**
* HTTP status code which signals this failure mode was triggered: {@value}.
*/
public static final int STATUS_CODE = 400;
public static final int STATUS_CODE = SC_400_BAD_REQUEST;

/**
* Instantiate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import software.amazon.awssdk.awscore.exception.AwsServiceException;

/**
* A 500 response came back from a service.
* This is considered <i>probably</i> retriable, That is, we assume
* <ol>
* <li>whatever error happened in the service itself to have happened
* before the infrastructure committed the operation.</li>
* <li>Nothing else got through either.</li>
* </ol>
* A 5xx response came back from a service.
* The 500 error considered retriable by the AWS SDK, which will have already
* tried it {@code fs.s3a.attempts.maximum} times before reaching s3a
* code.
* How it handles other 5xx errors is unknown: S3A FS code will treat them
* as unrecoverable on the basis that they indicate some third-party store
* or gateway problem.
*/
public class AWSStatus500Exception extends AWSServiceIOException {
public AWSStatus500Exception(String operation,
Expand All @@ -37,6 +37,6 @@ public AWSStatus500Exception(String operation,

@Override
public boolean retryable() {
return true;
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import software.amazon.awssdk.awscore.exception.AwsServiceException;

/**
* A store returned an error indicating that it does not support a
* specific S3 feature such as the chosen ChangeDetectionPolicy or
* other AWS-S3 feature that the third-party store does not support.
* The workaround is to disable use of the feature.
* Unrecoverable.
*/
public class AWSUnsupportedFeatureException extends AWSServiceIOException {

/**
* Instantiate.
* @param operation operation which triggered this
* @param cause the underlying cause
*/
public AWSUnsupportedFeatureException(String operation,
AwsServiceException cause) {
super(operation, cause);
}

@Override
public boolean retryable() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,17 @@ private Constants() {
/**
* Number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
* The S3A connector does its own selective retries; the only time the AWS
* SDK operations are not wrapped is during multipart copy via the AWS SDK
* transfer manager.
*/
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";

/**
* Default number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
public static final int DEFAULT_MAX_ERROR_RETRIES = 5;

/**
* Experimental/Unstable feature: should the AWS client library retry
Expand Down Expand Up @@ -264,7 +267,7 @@ private Constants() {
// milliseconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
public static final int DEFAULT_ESTABLISH_TIMEOUT = 5000;

// milliseconds until we give up on a connection to s3
public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ public static class UploadIterator
/** Iterator over the current listing. */
private ListIterator<MultipartUpload> batchIterator;

/**
* Construct an iterator to list uploads under a path.
* @param storeContext store context
* @param s3 s3 client
* @param maxKeys max # of keys to list per batch
* @param prefix prefix
* @throws IOException listing failure.
*/
@Retries.RetryTranslated
public UploadIterator(
final StoreContext storeContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
/** Exactly once log to warn about setting the region in config to avoid probe. */
private static final LogExactlyOnce SET_REGION_WARNING = new LogExactlyOnce(LOG);

/** Log to warn of storage class configuration problems. */
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);

private static final Logger PROGRESS =
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
private LocalDirAllocator directoryAllocator;
Expand Down Expand Up @@ -1073,7 +1077,8 @@ private Region getS3Region(String region) throws IOException {

if (exception.statusCode() == SC_404_NOT_FOUND) {
throw new UnknownStoreException("s3a://" + bucket + "/",
" Bucket does " + "not exist");
" Bucket does not exist: " + exception,
exception);
}

throw exception;
Expand Down Expand Up @@ -1174,17 +1179,21 @@ protected RequestFactory createRequestFactory() {

// Any encoding type
String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);
if (contentEncoding != null) {
LOG.debug("Using content encoding set in {} = {}", CONTENT_ENCODING, contentEncoding);
}

String storageClassConf = getConf()
.getTrimmed(STORAGE_CLASS, "")
.toUpperCase(Locale.US);
StorageClass storageClass = null;
if (!storageClassConf.isEmpty()) {
storageClass = StorageClass.fromValue(storageClassConf);

LOG.debug("Using storage class {}", storageClass);
if (storageClass.equals(StorageClass.UNKNOWN_TO_SDK_VERSION)) {
LOG.warn("Unknown storage class property {}: {}; falling back to default storage class",
STORAGE_CLASS, storageClassConf);
STORAGE_CLASS_WARNING.warn("Unknown storage class \"{}\" from option: {};"
+ " falling back to default storage class",
storageClassConf, STORAGE_CLASS);
storageClass = null;
}

Expand Down Expand Up @@ -1431,7 +1440,7 @@ public String getBucketLocation() throws IOException {
public String getBucketLocation(String bucketName) throws IOException {
final String region = trackDurationAndSpan(
STORE_EXISTS_PROBE, bucketName, null, () ->
invoker.retry("getBucketLocation()", bucketName, true, () ->
once("getBucketLocation()", bucketName, () ->
// If accessPoint then region is known from Arn
accessPoint != null
? accessPoint.getRegion()
Expand Down Expand Up @@ -2993,7 +3002,7 @@ protected void deleteObject(String key)
"deleting %s", key)) {
invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
DELETE_CONSIDERED_IDEMPOTENT,
()-> {
() -> {
incrementStatistic(OBJECT_DELETE_OBJECTS);
trackDurationOfInvocation(getDurationTrackerFactory(),
OBJECT_DELETE_REQUEST.getSymbol(),
Expand All @@ -3002,6 +3011,12 @@ protected void deleteObject(String key)
.build()));
return null;
});
} catch (AwsServiceException ase) {
// 404 errors get swallowed; this can be raised by
// third party stores (GCS).
if (!isObjectNotFound(ase)) {
throw ase;
}
}
}

Expand Down Expand Up @@ -4287,13 +4302,13 @@ protected synchronized void stopAllServices() {
}

/**
* Verify that the input stream is open. Non blocking; this gives
* Verify that the filesystem has not been closed. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
* @throws PathIOException if the FS is closed.
*/
private void checkNotClosed() throws IOException {
private void checkNotClosed() throws PathIOException {
if (isClosed) {
throw new IOException(uri + ": " + E_FS_CLOSED);
throw new PathIOException(uri.toString(), E_FS_CLOSED);
}
}

Expand Down Expand Up @@ -4443,7 +4458,6 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
// This means the File was deleted since LIST enumerated it.
LOG.debug("getObjectMetadata({}) failed to find an expected file",
srcKey, e);
// We create an exception, but the text depends on the S3Guard state
throw new RemoteFileChangedException(
keyToQualifiedPath(srcKey).toString(),
action,
Expand All @@ -4454,6 +4468,8 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
CopyObjectRequest.Builder copyObjectRequestBuilder =
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
final CopyObjectRequest copyRequest = copyObjectRequestBuilder.build();
LOG.debug("Copy Request: {}", copyRequest);
CopyObjectResponse response;

// transfer manager is skipped if disabled or the file is too small to worry about
Expand All @@ -4468,7 +4484,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,

Copy copy = transferManager.copy(
CopyRequest.builder()
.copyObjectRequest(copyObjectRequestBuilder.build())
.copyObjectRequest(copyRequest)
.build());

try {
Expand All @@ -4477,6 +4493,8 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof SdkException) {
// if this is a 412 precondition failure, it may
// be converted to a RemoteFileChangedException
SdkException awsException = (SdkException)cause;
changeTracker.processException(awsException, "copy");
throw awsException;
Expand All @@ -4493,7 +4511,15 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
() -> {
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
incrementStatistic(OBJECT_COPY_REQUESTS);
return s3Client.copyObject(copyObjectRequestBuilder.build());
try {
return s3Client.copyObject(copyRequest);
} catch (SdkException awsException) {
// if this is a 412 precondition failure, it may
// be converted to a RemoteFileChangedException
changeTracker.processException(awsException, "copy");
// otherwise, rethrow
throw awsException;
}
});
}

Expand Down
Loading

0 comments on commit 81edbeb

Please sign in to comment.